~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication/replication.cc

  • Committer: Monty Taylor
  • Date: 2008-11-16 23:47:43 UTC
  • mto: (584.1.10 devel)
  • mto: This revision was merged to the branch mainline in revision 589.
  • Revision ID: monty@inaugust.com-20081116234743-c38gmv0pa2kdefaj
BrokeĀ outĀ cached_item.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2006 MySQL AB & Sasha
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <drizzled/server_includes.h>
 
17
 
 
18
#include <drizzled/replication/mi.h>
 
19
#include <drizzled/replication/replication.h>
 
20
#include <drizzled/log_event.h>
 
21
#include <drizzled/replication/filter.h>
 
22
#include <drizzled/error.h>
 
23
#include <drizzled/gettext.h>
 
24
#include <drizzled/data_home.h>
 
25
 
 
26
int max_binlog_dump_events = 0; // unlimited
 
27
 
 
28
/*
 
29
    fake_rotate_event() builds a fake (=which does not exist physically in any
 
30
    binlog) Rotate event, which contains the name of the binlog we are going to
 
31
    send to the slave (because the slave may not know it if it just asked for
 
32
    MASTER_LOG_FILE='', MASTER_LOG_POS=4).
 
33
    < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
 
34
    After this version we always call it, so that a 3.23.58 slave can rely on
 
35
    it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
 
36
    zeros in the good positions which, by chance, make it possible for the 3.23
 
37
    slave to detect that this event is unexpected) (this is luck which happens
 
38
    because the master and slave disagree on the size of the header of
 
39
    Log_event).
 
40
 
 
41
    Relying on the event length of the Rotate event instead of these
 
42
    well-placed zeros was not possible as Rotate events have a variable-length
 
43
    part.
 
44
*/
 
45
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
 
46
                             uint64_t position, const char** errmsg)
 
47
{
 
48
  char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
 
49
  /*
 
50
    'when' (the timestamp) is set to 0 so that slave could distinguish between
 
51
    real and fake Rotate events (if necessary)
 
52
  */
 
53
  memset(header, 0, 4);
 
54
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
 
55
 
 
56
  char* p = log_file_name+dirname_length(log_file_name);
 
57
  uint32_t ident_len = (uint32_t) strlen(p);
 
58
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
 
59
  int4store(header + SERVER_ID_OFFSET, server_id);
 
60
  int4store(header + EVENT_LEN_OFFSET, event_len);
 
61
  int2store(header + FLAGS_OFFSET, 0);
 
62
 
 
63
  // TODO: check what problems this may cause and fix them
 
64
  int4store(header + LOG_POS_OFFSET, 0);
 
65
 
 
66
  packet->append(header, sizeof(header));
 
67
  int8store(buf+R_POS_OFFSET,position);
 
68
  packet->append(buf, ROTATE_HEADER_LEN);
 
69
  packet->append(p,ident_len);
 
70
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
71
  {
 
72
    *errmsg = "failed on my_net_write()";
 
73
    return(-1);
 
74
  }
 
75
  return(0);
 
76
}
 
77
 
 
78
static int send_file(Session *session)
 
79
{
 
80
  NET* net = &session->net;
 
81
  int fd = -1, error = 1;
 
82
  size_t bytes;
 
83
  char fname[FN_REFLEN+1];
 
84
  const char *errmsg = 0;
 
85
  int old_timeout;
 
86
  unsigned long packet_len;
 
87
  unsigned char buf[IO_SIZE];                           // It's safe to alloc this
 
88
 
 
89
  /*
 
90
    The client might be slow loading the data, give him wait_timeout to do
 
91
    the job
 
92
  */
 
93
  old_timeout= net->read_timeout;
 
94
  my_net_set_read_timeout(net, session->variables.net_wait_timeout);
 
95
 
 
96
  /*
 
97
    We need net_flush here because the client will not know it needs to send
 
98
    us the file name until it has processed the load event entry
 
99
  */
 
100
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
 
101
  {
 
102
    errmsg = _("Failed in send_file() while reading file name");
 
103
    goto err;
 
104
  }
 
105
 
 
106
  // terminate with \0 for fn_format
 
107
  *((char*)net->read_pos +  packet_len) = 0;
 
108
  fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
 
109
  // this is needed to make replicate-ignore-db
 
110
  if (!strcmp(fname,"/dev/null"))
 
111
    goto end;
 
112
 
 
113
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
 
114
  {
 
115
    errmsg = _("Failed in send_file() on open of file");
 
116
    goto err;
 
117
  }
 
118
 
 
119
  while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0)
 
120
  {
 
121
    if (my_net_write(net, buf, bytes))
 
122
    {
 
123
      errmsg = _("Failed in send_file() while writing data to client");
 
124
      goto err;
 
125
    }
 
126
  }
 
127
 
 
128
 end:
 
129
  if (my_net_write(net, (unsigned char*) "", 0) || net_flush(net) ||
 
130
      (my_net_read(net) == packet_error))
 
131
  {
 
132
    errmsg = _("Failed in send_file() while negotiating file transfer close");
 
133
    goto err;
 
134
  }
 
135
  error = 0;
 
136
 
 
137
 err:
 
138
  my_net_set_read_timeout(net, old_timeout);
 
139
  if (fd >= 0)
 
140
    (void) my_close(fd, MYF(0));
 
141
  if (errmsg)
 
142
  {
 
143
    sql_print_error("%s",errmsg);
 
144
  }
 
145
  return(error);
 
146
}
 
147
 
 
148
 
 
149
/*
 
150
  Adjust the position pointer in the binary log file for all running slaves
 
151
 
 
152
  SYNOPSIS
 
153
    adjust_linfo_offsets()
 
154
    purge_offset        Number of bytes removed from start of log index file
 
155
 
 
156
  NOTES
 
157
    - This is called when doing a PURGE when we delete lines from the
 
158
      index log file
 
159
 
 
160
  REQUIREMENTS
 
161
    - Before calling this function, we have to ensure that no threads are
 
162
      using any binary log file before purge_offset.a
 
163
 
 
164
  TODO
 
165
    - Inform the slave threads that they should sync the position
 
166
      in the binary log file with flush_relay_log_info.
 
167
      Now they sync is done for next read.
 
168
*/
 
169
 
 
170
void adjust_linfo_offsets(my_off_t purge_offset)
 
171
{
 
172
  Session *tmp;
 
173
 
 
174
  pthread_mutex_lock(&LOCK_thread_count);
 
175
  I_List_iterator<Session> it(threads);
 
176
 
 
177
  while ((tmp=it++))
 
178
  {
 
179
    LOG_INFO* linfo;
 
180
    if ((linfo = tmp->current_linfo))
 
181
    {
 
182
      pthread_mutex_lock(&linfo->lock);
 
183
      /*
 
184
        Index file offset can be less that purge offset only if
 
185
        we just started reading the index file. In that case
 
186
        we have nothing to adjust
 
187
      */
 
188
      if (linfo->index_file_offset < purge_offset)
 
189
        linfo->fatal = (linfo->index_file_offset != 0);
 
190
      else
 
191
        linfo->index_file_offset -= purge_offset;
 
192
      pthread_mutex_unlock(&linfo->lock);
 
193
    }
 
194
  }
 
195
  pthread_mutex_unlock(&LOCK_thread_count);
 
196
}
 
197
 
 
198
 
 
199
bool log_in_use(const char* log_name)
 
200
{
 
201
  int log_name_len = strlen(log_name) + 1;
 
202
  Session *tmp;
 
203
  bool result = 0;
 
204
 
 
205
  pthread_mutex_lock(&LOCK_thread_count);
 
206
  I_List_iterator<Session> it(threads);
 
207
 
 
208
  while ((tmp=it++))
 
209
  {
 
210
    LOG_INFO* linfo;
 
211
    if ((linfo = tmp->current_linfo))
 
212
    {
 
213
      pthread_mutex_lock(&linfo->lock);
 
214
      result = !memcmp(log_name, linfo->log_file_name, log_name_len);
 
215
      pthread_mutex_unlock(&linfo->lock);
 
216
      if (result)
 
217
        break;
 
218
    }
 
219
  }
 
220
 
 
221
  pthread_mutex_unlock(&LOCK_thread_count);
 
222
  return result;
 
223
}
 
224
 
 
225
bool purge_error_message(Session* session, int res)
 
226
{
 
227
  uint32_t errmsg= 0;
 
228
 
 
229
  switch (res)  {
 
230
  case 0: break;
 
231
  case LOG_INFO_EOF:    errmsg= ER_UNKNOWN_TARGET_BINLOG; break;
 
232
  case LOG_INFO_IO:     errmsg= ER_IO_ERR_LOG_INDEX_READ; break;
 
233
  case LOG_INFO_INVALID:errmsg= ER_BINLOG_PURGE_PROHIBITED; break;
 
234
  case LOG_INFO_SEEK:   errmsg= ER_FSEEK_FAIL; break;
 
235
  case LOG_INFO_MEM:    errmsg= ER_OUT_OF_RESOURCES; break;
 
236
  case LOG_INFO_FATAL:  errmsg= ER_BINLOG_PURGE_FATAL_ERR; break;
 
237
  case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break;
 
238
  case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break;
 
239
  default:              errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break;
 
240
  }
 
241
 
 
242
  if (errmsg)
 
243
  {
 
244
    my_message(errmsg, ER(errmsg), MYF(0));
 
245
    return true;
 
246
  }
 
247
  my_ok(session);
 
248
  return false;
 
249
}
 
250
 
 
251
 
 
252
bool purge_master_logs(Session* session, const char* to_log)
 
253
{
 
254
  char search_file_name[FN_REFLEN];
 
255
  if (!mysql_bin_log.is_open())
 
256
  {
 
257
    my_ok(session);
 
258
    return false;
 
259
  }
 
260
 
 
261
  mysql_bin_log.make_log_name(search_file_name, to_log);
 
262
  return purge_error_message(session,
 
263
                             mysql_bin_log.purge_logs(search_file_name, 0, 1,
 
264
                                                      1, NULL));
 
265
}
 
266
 
 
267
 
 
268
bool purge_master_logs_before_date(Session* session, time_t purge_time)
 
269
{
 
270
  if (!mysql_bin_log.is_open())
 
271
  {
 
272
    my_ok(session);
 
273
    return 0;
 
274
  }
 
275
  return purge_error_message(session,
 
276
                             mysql_bin_log.purge_logs_before_date(purge_time));
 
277
}
 
278
 
 
279
int test_for_non_eof_log_read_errors(int error, const char **errmsg)
 
280
{
 
281
  if (error == LOG_READ_EOF)
 
282
    return 0;
 
283
  my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
284
  switch (error) {
 
285
  case LOG_READ_BOGUS:
 
286
    *errmsg = "bogus data in log event";
 
287
    break;
 
288
  case LOG_READ_TOO_LARGE:
 
289
    *errmsg = "log event entry exceeded max_allowed_packet; \
 
290
Increase max_allowed_packet on master";
 
291
    break;
 
292
  case LOG_READ_IO:
 
293
    *errmsg = "I/O error reading log event";
 
294
    break;
 
295
  case LOG_READ_MEM:
 
296
    *errmsg = "memory allocation failed reading log event";
 
297
    break;
 
298
  case LOG_READ_TRUNC:
 
299
    *errmsg = "binlog truncated in the middle of event";
 
300
    break;
 
301
  default:
 
302
    *errmsg = "unknown error reading log event on the master";
 
303
    break;
 
304
  }
 
305
  return error;
 
306
}
 
307
 
 
308
 
 
309
/**
 
310
  An auxiliary function for calling in mysql_binlog_send
 
311
  to initialize the heartbeat timeout in waiting for a binlogged event.
 
312
 
 
313
  @param[in]    session  Session to access a user variable
 
314
 
 
315
  @return        heartbeat period an uint64_t of nanoseconds
 
316
                 or zero if heartbeat was not demanded by slave
 
317
*/ 
 
318
static uint64_t get_heartbeat_period(Session * session)
 
319
{
 
320
  bool null_value;
 
321
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
 
322
  user_var_entry *entry= 
 
323
    (user_var_entry*) hash_search(&session->user_vars, (unsigned char*) name.str,
 
324
                                  name.length);
 
325
  return entry? entry->val_int(&null_value) : 0;
 
326
}
 
327
 
 
328
/*
 
329
  Function prepares and sends repliation heartbeat event.
 
330
 
 
331
  @param net                net object of Session
 
332
  @param packet             buffer to store the heartbeat instance
 
333
  @param event_coordinates  binlog file name and position of the last
 
334
                            real event master sent from binlog
 
335
 
 
336
  @note 
 
337
    Among three essential pieces of heartbeat data Log_event::when
 
338
    is computed locally.
 
339
    The  error to send is serious and should force terminating
 
340
    the dump thread.
 
341
*/
 
342
static int send_heartbeat_event(NET* net, String* packet,
 
343
                                const struct event_coordinates *coord)
 
344
{
 
345
  char header[LOG_EVENT_HEADER_LEN];
 
346
  /*
 
347
    'when' (the timestamp) is set to 0 so that slave could distinguish between
 
348
    real and fake Rotate events (if necessary)
 
349
  */
 
350
  memset(header, 0, 4);  // when
 
351
 
 
352
  header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
 
353
 
 
354
  char* p= coord->file_name + dirname_length(coord->file_name);
 
355
 
 
356
  uint32_t ident_len = strlen(p);
 
357
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN;
 
358
  int4store(header + SERVER_ID_OFFSET, server_id);
 
359
  int4store(header + EVENT_LEN_OFFSET, event_len);
 
360
  int2store(header + FLAGS_OFFSET, 0);
 
361
 
 
362
  int4store(header + LOG_POS_OFFSET, coord->pos);  // log_pos
 
363
 
 
364
  packet->append(header, sizeof(header));
 
365
  packet->append(p, ident_len);             // log_file_name
 
366
 
 
367
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) ||
 
368
      net_flush(net))
 
369
  {
 
370
    return(-1);
 
371
  }
 
372
  packet->set("\0", 1, &my_charset_bin);
 
373
  return(0);
 
374
}
 
375
 
 
376
/*
 
377
  TODO: Clean up loop to only have one call to send_file()
 
378
*/
 
379
 
 
380
void mysql_binlog_send(Session* session, char* log_ident, my_off_t pos,
 
381
                       uint16_t flags)
 
382
{
 
383
  LOG_INFO linfo;
 
384
  char *log_file_name = linfo.log_file_name;
 
385
  char search_file_name[FN_REFLEN], *name;
 
386
  IO_CACHE log;
 
387
  File file = -1;
 
388
  String* packet = &session->packet;
 
389
  int error;
 
390
  const char *errmsg = "Unknown error";
 
391
  NET* net = &session->net;
 
392
  pthread_mutex_t *log_lock;
 
393
  bool binlog_can_be_corrupted= false;
 
394
 
 
395
  memset(&log, 0, sizeof(log));
 
396
  /* 
 
397
     heartbeat_period from @master_heartbeat_period user variable
 
398
  */
 
399
  uint64_t heartbeat_period= get_heartbeat_period(session);
 
400
  struct timespec heartbeat_buf;
 
401
  struct event_coordinates coord_buf;
 
402
  struct timespec *heartbeat_ts= NULL;
 
403
  struct event_coordinates *coord= NULL;
 
404
  if (heartbeat_period != 0L)
 
405
  {
 
406
    heartbeat_ts= &heartbeat_buf;
 
407
    set_timespec_nsec(*heartbeat_ts, 0);
 
408
    coord= &coord_buf;
 
409
    coord->file_name= log_file_name; // initialization basing on what slave remembers
 
410
    coord->pos= pos;
 
411
  }
 
412
 
 
413
  if (!mysql_bin_log.is_open())
 
414
  {
 
415
    errmsg = "Binary log is not open";
 
416
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
417
    goto err;
 
418
  }
 
419
  if (!server_id_supplied)
 
420
  {
 
421
    errmsg = "Misconfigured master - server id was not set";
 
422
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
423
    goto err;
 
424
  }
 
425
 
 
426
  name=search_file_name;
 
427
  if (log_ident[0])
 
428
    mysql_bin_log.make_log_name(search_file_name, log_ident);
 
429
  else
 
430
    name=0;                                     // Find first log
 
431
 
 
432
  linfo.index_file_offset = 0;
 
433
 
 
434
  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
 
435
  {
 
436
    errmsg = "Could not find first log file name in binary log index file";
 
437
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
438
    goto err;
 
439
  }
 
440
 
 
441
  pthread_mutex_lock(&LOCK_thread_count);
 
442
  session->current_linfo = &linfo;
 
443
  pthread_mutex_unlock(&LOCK_thread_count);
 
444
 
 
445
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
 
446
  {
 
447
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
448
    goto err;
 
449
  }
 
450
  if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
 
451
  {
 
452
    errmsg= "Client requested master to start replication from \
 
453
impossible position";
 
454
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
455
    goto err;
 
456
  }
 
457
 
 
458
  /*
 
459
    We need to start a packet with something other than 255
 
460
    to distinguish it from error
 
461
  */
 
462
  packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
 
463
 
 
464
  /*
 
465
    Tell the client about the log name with a fake Rotate event;
 
466
    this is needed even if we also send a Format_description_log_event
 
467
    just after, because that event does not contain the binlog's name.
 
468
    Note that as this Rotate event is sent before
 
469
    Format_description_log_event, the slave cannot have any info to
 
470
    understand this event's format, so the header len of
 
471
    Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
 
472
    than other events except FORMAT_DESCRIPTION_EVENT).
 
473
    Before 4.0.14 we called fake_rotate_event below only if (pos ==
 
474
    BIN_LOG_HEADER_SIZE), because if this is false then the slave
 
475
    already knows the binlog's name.
 
476
    Since, we always call fake_rotate_event; if the slave already knew
 
477
    the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
 
478
    useless but does not harm much. It is nice for 3.23 (>=.58) slaves
 
479
    which test Rotate events to see if the master is 4.0 (then they
 
480
    choose to stop because they can't replicate 4.0); by always calling
 
481
    fake_rotate_event we are sure that 3.23.58 and newer will detect the
 
482
    problem as soon as replication starts (BUG#198).
 
483
    Always calling fake_rotate_event makes sending of normal
 
484
    (=from-binlog) Rotate events a priori unneeded, but it is not so
 
485
    simple: the 2 Rotate events are not equivalent, the normal one is
 
486
    before the Stop event, the fake one is after. If we don't send the
 
487
    normal one, then the Stop event will be interpreted (by existing 4.0
 
488
    slaves) as "the master stopped", which is wrong. So for safety,
 
489
    given that we want minimum modification of 4.0, we send the normal
 
490
    and fake Rotates.
 
491
  */
 
492
  if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
 
493
  {
 
494
    /*
 
495
       This error code is not perfect, as fake_rotate_event() does not
 
496
       read anything from the binlog; if it fails it's because of an
 
497
       error in my_net_write(), fortunately it will say so in errmsg.
 
498
    */
 
499
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
500
    goto err;
 
501
  }
 
502
  packet->set("\0", 1, &my_charset_bin);
 
503
  /*
 
504
    Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
 
505
    this larger than the corresponding packet (query) sent 
 
506
    from client to master.
 
507
  */
 
508
  session->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
 
509
 
 
510
  /*
 
511
    We can set log_lock now, it does not move (it's a member of
 
512
    mysql_bin_log, and it's already inited, and it will be destroyed
 
513
    only at shutdown).
 
514
  */
 
515
  log_lock = mysql_bin_log.get_log_lock();
 
516
  if (pos > BIN_LOG_HEADER_SIZE)
 
517
  {
 
518
     /*
 
519
       Try to find a Format_description_log_event at the beginning of
 
520
       the binlog
 
521
     */
 
522
     if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
 
523
     {
 
524
       /*
 
525
         The packet has offsets equal to the normal offsets in a binlog
 
526
         event +1 (the first character is \0).
 
527
       */
 
528
       if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
 
529
       {
 
530
         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
 
531
                                       LOG_EVENT_BINLOG_IN_USE_F);
 
532
         (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
 
533
         /*
 
534
           mark that this event with "log_pos=0", so the slave
 
535
           should not increment master's binlog position
 
536
           (rli->group_master_log_pos)
 
537
         */
 
538
         int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
 
539
         /*
 
540
           if reconnect master sends FD event with `created' as 0
 
541
           to avoid destroying temp tables.
 
542
          */
 
543
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
 
544
                   ST_CREATED_OFFSET+1, (uint32_t) 0);
 
545
         /* send it */
 
546
         if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
547
         {
 
548
           errmsg = "Failed on my_net_write()";
 
549
           my_errno= ER_UNKNOWN_ERROR;
 
550
           goto err;
 
551
         }
 
552
 
 
553
         /*
 
554
           No need to save this event. We are only doing simple reads
 
555
           (no real parsing of the events) so we don't need it. And so
 
556
           we don't need the artificial Format_description_log_event of
 
557
           3.23&4.x.
 
558
         */
 
559
       }
 
560
     }
 
561
     else
 
562
     {
 
563
       if (test_for_non_eof_log_read_errors(error, &errmsg))
 
564
         goto err;
 
565
       /*
 
566
         It's EOF, nothing to do, go on reading next events, the
 
567
         Format_description_log_event will be found naturally if it is written.
 
568
       */
 
569
     }
 
570
     /* reset the packet as we wrote to it in any case */
 
571
     packet->set("\0", 1, &my_charset_bin);
 
572
  } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
 
573
  else
 
574
  {
 
575
    /* The Format_description_log_event event will be found naturally. */
 
576
  }
 
577
 
 
578
  /* seek to the requested position, to start the requested dump */
 
579
  my_b_seek(&log, pos);                 // Seek will done on next read
 
580
 
 
581
  while (!net->error && net->vio != 0 && !session->killed)
 
582
  {
 
583
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
 
584
    {
 
585
      /*
 
586
        log's filename does not change while it's active
 
587
      */
 
588
      if (coord)
 
589
        coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
 
590
 
 
591
      if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
 
592
      {
 
593
        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
 
594
                                      LOG_EVENT_BINLOG_IN_USE_F);
 
595
        (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
 
596
      }
 
597
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
 
598
        binlog_can_be_corrupted= false;
 
599
 
 
600
      if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
601
      {
 
602
        errmsg = "Failed on my_net_write()";
 
603
        my_errno= ER_UNKNOWN_ERROR;
 
604
        goto err;
 
605
      }
 
606
 
 
607
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
 
608
      {
 
609
        if (send_file(session))
 
610
        {
 
611
          errmsg = "failed in send_file()";
 
612
          my_errno= ER_UNKNOWN_ERROR;
 
613
          goto err;
 
614
        }
 
615
      }
 
616
      packet->set("\0", 1, &my_charset_bin);
 
617
    }
 
618
 
 
619
    /*
 
620
      here we were reading binlog that was not closed properly (as a result
 
621
      of a crash ?). treat any corruption as EOF
 
622
    */
 
623
    if (binlog_can_be_corrupted && error != LOG_READ_MEM)
 
624
      error=LOG_READ_EOF;
 
625
    /*
 
626
      TODO: now that we are logging the offset, check to make sure
 
627
      the recorded offset and the actual match.
 
628
      Guilhem 2003-06: this is not true if this master is a slave
 
629
      <4.0.15 running with --log-slave-updates, because then log_pos may
 
630
      be the offset in the-master-of-this-master's binlog.
 
631
    */
 
632
    if (test_for_non_eof_log_read_errors(error, &errmsg))
 
633
      goto err;
 
634
 
 
635
    if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
 
636
        mysql_bin_log.is_active(log_file_name))
 
637
    {
 
638
      /*
 
639
        Block until there is more data in the log
 
640
      */
 
641
      if (net_flush(net))
 
642
      {
 
643
        errmsg = "failed on net_flush()";
 
644
        my_errno= ER_UNKNOWN_ERROR;
 
645
        goto err;
 
646
      }
 
647
 
 
648
      /*
 
649
        We may have missed the update broadcast from the log
 
650
        that has just happened, let's try to catch it if it did.
 
651
        If we did not miss anything, we just wait for other threads
 
652
        to signal us.
 
653
      */
 
654
      {
 
655
        log.error=0;
 
656
        bool read_packet = 0, fatal_error = 0;
 
657
 
 
658
        /*
 
659
          No one will update the log while we are reading
 
660
          now, but we'll be quick and just read one record
 
661
 
 
662
          TODO:
 
663
          Add an counter that is incremented for each time we update the
 
664
          binary log.  We can avoid the following read if the counter
 
665
          has not been updated since last read.
 
666
        */
 
667
 
 
668
        pthread_mutex_lock(log_lock);
 
669
        switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
 
670
        case 0:
 
671
          /* we read successfully, so we'll need to send it to the slave */
 
672
          pthread_mutex_unlock(log_lock);
 
673
          read_packet = 1;
 
674
          if (coord)
 
675
            coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
 
676
          break;
 
677
 
 
678
        case LOG_READ_EOF:
 
679
        {
 
680
          int ret;
 
681
          if (session->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
 
682
          {
 
683
            pthread_mutex_unlock(log_lock);
 
684
            goto end;
 
685
          }
 
686
 
 
687
          do 
 
688
          {
 
689
            if (coord)
 
690
            {
 
691
              assert(heartbeat_ts && heartbeat_period != 0L);
 
692
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
 
693
            }
 
694
            ret= mysql_bin_log.wait_for_update_bin_log(session, heartbeat_ts);
 
695
            assert(ret == 0 || (heartbeat_period != 0L && coord != NULL));
 
696
            if (ret == ETIMEDOUT || ret == ETIME)
 
697
            {
 
698
              if (send_heartbeat_event(net, packet, coord))
 
699
              {
 
700
                errmsg = "Failed on my_net_write()";
 
701
                my_errno= ER_UNKNOWN_ERROR;
 
702
                pthread_mutex_unlock(log_lock);
 
703
                goto err;
 
704
              }
 
705
            }
 
706
            else
 
707
            {
 
708
              assert(ret == 0);
 
709
            }
 
710
          } while (ret != 0 && coord != NULL && !session->killed);
 
711
          pthread_mutex_unlock(log_lock);
 
712
        }    
 
713
        break;
 
714
            
 
715
        default:
 
716
          pthread_mutex_unlock(log_lock);
 
717
          fatal_error = 1;
 
718
          break;
 
719
        }
 
720
 
 
721
        if (read_packet)
 
722
        {
 
723
          session->set_proc_info("Sending binlog event to slave");
 
724
          if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) )
 
725
          {
 
726
            errmsg = "Failed on my_net_write()";
 
727
            my_errno= ER_UNKNOWN_ERROR;
 
728
            goto err;
 
729
          }
 
730
 
 
731
          if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
 
732
          {
 
733
            if (send_file(session))
 
734
            {
 
735
              errmsg = "failed in send_file()";
 
736
              my_errno= ER_UNKNOWN_ERROR;
 
737
              goto err;
 
738
            }
 
739
          }
 
740
          packet->set("\0", 1, &my_charset_bin);
 
741
          /*
 
742
            No need to net_flush because we will get to flush later when
 
743
            we hit EOF pretty quick
 
744
          */
 
745
        }
 
746
 
 
747
        if (fatal_error)
 
748
        {
 
749
          errmsg = "error reading log entry";
 
750
          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
751
          goto err;
 
752
        }
 
753
        log.error=0;
 
754
      }
 
755
    }
 
756
    else
 
757
    {
 
758
      bool loop_breaker = 0;
 
759
      /* need this to break out of the for loop from switch */
 
760
 
 
761
      session->set_proc_info("Finished reading one binlog; switching to next binlog");
 
762
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
 
763
      case LOG_INFO_EOF:
 
764
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
 
765
        break;
 
766
      case 0:
 
767
        break;
 
768
      default:
 
769
        errmsg = "could not find next log";
 
770
        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
771
        goto err;
 
772
      }
 
773
 
 
774
      if (loop_breaker)
 
775
        break;
 
776
 
 
777
      end_io_cache(&log);
 
778
      (void) my_close(file, MYF(MY_WME));
 
779
 
 
780
      /*
 
781
        Call fake_rotate_event() in case the previous log (the one which
 
782
        we have just finished reading) did not contain a Rotate event
 
783
        (for example (I don't know any other example) the previous log
 
784
        was the last one before the master was shutdown & restarted).
 
785
        This way we tell the slave about the new log's name and
 
786
        position.  If the binlog is 5.0, the next event we are going to
 
787
        read and send is Format_description_log_event.
 
788
      */
 
789
      if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
 
790
          fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
 
791
                            &errmsg))
 
792
      {
 
793
        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
794
        goto err;
 
795
      }
 
796
 
 
797
      packet->length(0);
 
798
      packet->append('\0');
 
799
      if (coord)
 
800
        coord->file_name= log_file_name; // reset to the next
 
801
    }
 
802
  }
 
803
 
 
804
end:
 
805
  end_io_cache(&log);
 
806
  (void)my_close(file, MYF(MY_WME));
 
807
 
 
808
  my_eof(session);
 
809
  session->set_proc_info("Waiting to finalize termination");
 
810
  pthread_mutex_lock(&LOCK_thread_count);
 
811
  session->current_linfo = 0;
 
812
  pthread_mutex_unlock(&LOCK_thread_count);
 
813
  return;
 
814
 
 
815
err:
 
816
  session->set_proc_info("Waiting to finalize termination");
 
817
  end_io_cache(&log);
 
818
  /*
 
819
    Exclude  iteration through thread list
 
820
    this is needed for purge_logs() - it will iterate through
 
821
    thread list and update session->current_linfo->index_file_offset
 
822
    this mutex will make sure that it never tried to update our linfo
 
823
    after we return from this stack frame
 
824
  */
 
825
  pthread_mutex_lock(&LOCK_thread_count);
 
826
  session->current_linfo = 0;
 
827
  pthread_mutex_unlock(&LOCK_thread_count);
 
828
  if (file >= 0)
 
829
    (void) my_close(file, MYF(MY_WME));
 
830
 
 
831
  my_message(my_errno, errmsg, MYF(0));
 
832
  return;
 
833
}
 
834
 
 
835
int start_slave(Session* session , Master_info* mi,  bool net_report)
 
836
{
 
837
  int slave_errno= 0;
 
838
  int thread_mask;
 
839
 
 
840
  lock_slave_threads(mi);  // this allows us to cleanly read slave_running
 
841
  // Get a mask of _stopped_ threads
 
842
  init_thread_mask(&thread_mask,mi,1 /* inverse */);
 
843
  /*
 
844
    Below we will start all stopped threads.  But if the user wants to
 
845
    start only one thread, do as if the other thread was running (as we
 
846
    don't wan't to touch the other thread), so set the bit to 0 for the
 
847
    other thread
 
848
  */
 
849
  if (session->lex->slave_session_opt)
 
850
    thread_mask&= session->lex->slave_session_opt;
 
851
  if (thread_mask) //some threads are stopped, start them
 
852
  {
 
853
    if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
 
854
      slave_errno=ER_MASTER_INFO;
 
855
    else if (server_id_supplied && *mi->getHostname())
 
856
    {
 
857
      /*
 
858
        If we will start SQL thread we will care about UNTIL options If
 
859
        not and they are specified we will ignore them and warn user
 
860
        about this fact.
 
861
      */
 
862
      if (thread_mask & SLAVE_SQL)
 
863
      {
 
864
        pthread_mutex_lock(&mi->rli.data_lock);
 
865
 
 
866
        if (session->lex->mi.pos)
 
867
        {
 
868
          mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
 
869
          mi->rli.until_log_pos= session->lex->mi.pos;
 
870
          /*
 
871
             We don't check session->lex->mi.log_file_name for NULL here
 
872
             since it is checked in sql_yacc.yy
 
873
          */
 
874
          strmake(mi->rli.until_log_name, session->lex->mi.log_file_name,
 
875
                  sizeof(mi->rli.until_log_name)-1);
 
876
        }
 
877
        else if (session->lex->mi.relay_log_pos)
 
878
        {
 
879
          mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
 
880
          mi->rli.until_log_pos= session->lex->mi.relay_log_pos;
 
881
          strmake(mi->rli.until_log_name, session->lex->mi.relay_log_name,
 
882
                  sizeof(mi->rli.until_log_name)-1);
 
883
        }
 
884
        else
 
885
          mi->rli.clear_until_condition();
 
886
 
 
887
        if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
 
888
        {
 
889
          /* Preparing members for effective until condition checking */
 
890
          const char *p= fn_ext(mi->rli.until_log_name);
 
891
          char *p_end;
 
892
          if (*p)
 
893
          {
 
894
            //p points to '.'
 
895
            mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
 
896
            /*
 
897
              p_end points to the first invalid character. If it equals
 
898
              to p, no digits were found, error. If it contains '\0' it
 
899
              means  conversion went ok.
 
900
            */
 
901
            if (p_end==p || *p_end)
 
902
              slave_errno=ER_BAD_SLAVE_UNTIL_COND;
 
903
          }
 
904
          else
 
905
            slave_errno=ER_BAD_SLAVE_UNTIL_COND;
 
906
 
 
907
          /* mark the cached result of the UNTIL comparison as "undefined" */
 
908
          mi->rli.until_log_names_cmp_result=
 
909
            Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
 
910
 
 
911
          /* Issuing warning then started without --skip-slave-start */
 
912
          if (!opt_skip_slave_start)
 
913
            push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
914
                         ER_MISSING_SKIP_SLAVE,
 
915
                         ER(ER_MISSING_SKIP_SLAVE));
 
916
        }
 
917
 
 
918
        pthread_mutex_unlock(&mi->rli.data_lock);
 
919
      }
 
920
      else if (session->lex->mi.pos || session->lex->mi.relay_log_pos)
 
921
        push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
 
922
                     ER(ER_UNTIL_COND_IGNORED));
 
923
 
 
924
      if (!slave_errno)
 
925
        slave_errno = start_slave_threads(0 /*no mutex */,
 
926
                                        1 /* wait for start */,
 
927
                                        mi,
 
928
                                        master_info_file,relay_log_info_file,
 
929
                                        thread_mask);
 
930
    }
 
931
    else
 
932
      slave_errno = ER_BAD_SLAVE;
 
933
  }
 
934
  else
 
935
  {
 
936
    /* no error if all threads are already started, only a warning */
 
937
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
 
938
                 ER(ER_SLAVE_WAS_RUNNING));
 
939
  }
 
940
 
 
941
  unlock_slave_threads(mi);
 
942
 
 
943
  if (slave_errno)
 
944
  {
 
945
    if (net_report)
 
946
      my_message(slave_errno, ER(slave_errno), MYF(0));
 
947
    return(1);
 
948
  }
 
949
  else if (net_report)
 
950
    my_ok(session);
 
951
 
 
952
  return(0);
 
953
}
 
954
 
 
955
 
 
956
int stop_slave(Session* session, Master_info* mi, bool net_report )
 
957
{
 
958
  int slave_errno;
 
959
  if (!session)
 
960
    session = current_session;
 
961
 
 
962
  session->set_proc_info("Killing slave");
 
963
  int thread_mask;
 
964
  lock_slave_threads(mi);
 
965
  // Get a mask of _running_ threads
 
966
  init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
 
967
  /*
 
968
    Below we will stop all running threads.
 
969
    But if the user wants to stop only one thread, do as if the other thread
 
970
    was stopped (as we don't wan't to touch the other thread), so set the
 
971
    bit to 0 for the other thread
 
972
  */
 
973
  if (session->lex->slave_session_opt)
 
974
    thread_mask &= session->lex->slave_session_opt;
 
975
 
 
976
  if (thread_mask)
 
977
  {
 
978
    slave_errno= terminate_slave_threads(mi,thread_mask,
 
979
                                         1 /*skip lock */);
 
980
  }
 
981
  else
 
982
  {
 
983
    //no error if both threads are already stopped, only a warning
 
984
    slave_errno= 0;
 
985
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
 
986
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
 
987
  }
 
988
  unlock_slave_threads(mi);
 
989
  session->set_proc_info(0);
 
990
 
 
991
  if (slave_errno)
 
992
  {
 
993
    if (net_report)
 
994
      my_message(slave_errno, ER(slave_errno), MYF(0));
 
995
    return(1);
 
996
  }
 
997
  else if (net_report)
 
998
    my_ok(session);
 
999
 
 
1000
  return(0);
 
1001
}
 
1002
 
 
1003
 
 
1004
/*
 
1005
  Remove all relay logs and start replication from the start
 
1006
 
 
1007
  SYNOPSIS
 
1008
    reset_slave()
 
1009
    session                     Thread handler
 
1010
    mi                  Master info for the slave
 
1011
 
 
1012
  RETURN
 
1013
    0   ok
 
1014
    1   error
 
1015
*/
 
1016
 
 
1017
 
 
1018
int reset_slave(Session *session, Master_info* mi)
 
1019
{
 
1020
  struct stat stat_area;
 
1021
  char fname[FN_REFLEN];
 
1022
  int thread_mask= 0, error= 0;
 
1023
  uint32_t sql_errno=0;
 
1024
  const char* errmsg=0;
 
1025
 
 
1026
  lock_slave_threads(mi);
 
1027
  init_thread_mask(&thread_mask,mi,0 /* not inverse */);
 
1028
  if (thread_mask) // We refuse if any slave thread is running
 
1029
  {
 
1030
    sql_errno= ER_SLAVE_MUST_STOP;
 
1031
    error=1;
 
1032
    goto err;
 
1033
  }
 
1034
 
 
1035
  // delete relay logs, clear relay log coordinates
 
1036
  if ((error= purge_relay_logs(&mi->rli, session,
 
1037
                               1 /* just reset */,
 
1038
                               &errmsg)))
 
1039
    goto err;
 
1040
 
 
1041
  /* Clear master's log coordinates */
 
1042
  mi->reset();
 
1043
  /*
 
1044
     Reset errors (the idea is that we forget about the
 
1045
     old master).
 
1046
  */
 
1047
  mi->rli.clear_error();
 
1048
  mi->rli.clear_until_condition();
 
1049
 
 
1050
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
 
1051
  mi->end_master_info();
 
1052
  // and delete these two files
 
1053
  fn_format(fname, master_info_file, drizzle_data_home, "", 4+32);
 
1054
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
 
1055
  {
 
1056
    error=1;
 
1057
    goto err;
 
1058
  }
 
1059
  // delete relay_log_info_file
 
1060
  fn_format(fname, relay_log_info_file, drizzle_data_home, "", 4+32);
 
1061
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
 
1062
  {
 
1063
    error=1;
 
1064
    goto err;
 
1065
  }
 
1066
 
 
1067
err:
 
1068
  unlock_slave_threads(mi);
 
1069
  if (error)
 
1070
    my_error(sql_errno, MYF(0), errmsg);
 
1071
  return(error);
 
1072
}
 
1073
 
 
1074
/*
 
1075
 
 
1076
  Kill all Binlog_dump threads which previously talked to the same slave
 
1077
  ("same" means with the same server id). Indeed, if the slave stops, if the
 
1078
  Binlog_dump thread is waiting (pthread_cond_wait) for binlog update, then it
 
1079
  will keep existing until a query is written to the binlog. If the master is
 
1080
  idle, then this could last long, and if the slave reconnects, we could have 2
 
1081
  Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
 
1082
  binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
 
1083
  the master kills any existing thread with the slave's server id (if this id is
 
1084
  not zero; it will be true for real slaves, but false for mysqlbinlog when it
 
1085
  sends COM_BINLOG_DUMP to get a remote binlog dump).
 
1086
 
 
1087
  SYNOPSIS
 
1088
    kill_zombie_dump_threads()
 
1089
    slave_server_id     the slave's server id
 
1090
 
 
1091
*/
 
1092
 
 
1093
 
 
1094
void kill_zombie_dump_threads(uint32_t slave_server_id)
 
1095
{
 
1096
  pthread_mutex_lock(&LOCK_thread_count);
 
1097
  I_List_iterator<Session> it(threads);
 
1098
  Session *tmp;
 
1099
 
 
1100
  while ((tmp=it++))
 
1101
  {
 
1102
    if (tmp->command == COM_BINLOG_DUMP &&
 
1103
       tmp->server_id == slave_server_id)
 
1104
    {
 
1105
      pthread_mutex_lock(&tmp->LOCK_delete);    // Lock from delete
 
1106
      break;
 
1107
    }
 
1108
  }
 
1109
  pthread_mutex_unlock(&LOCK_thread_count);
 
1110
  if (tmp)
 
1111
  {
 
1112
    /*
 
1113
      Here we do not call kill_one_thread() as
 
1114
      it will be slow because it will iterate through the list
 
1115
      again. We just to do kill the thread ourselves.
 
1116
    */
 
1117
    tmp->awake(Session::KILL_QUERY);
 
1118
    pthread_mutex_unlock(&tmp->LOCK_delete);
 
1119
  }
 
1120
}
 
1121
 
 
1122
 
 
1123
bool change_master(Session* session, Master_info* mi)
 
1124
{
 
1125
  int thread_mask;
 
1126
  const char* errmsg= 0;
 
1127
  bool need_relay_log_purge= 1;
 
1128
 
 
1129
  lock_slave_threads(mi);
 
1130
  init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
 
1131
  if (thread_mask) // We refuse if any slave thread is running
 
1132
  {
 
1133
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
 
1134
    unlock_slave_threads(mi);
 
1135
    return(true);
 
1136
  }
 
1137
 
 
1138
  session->set_proc_info("Changing master");
 
1139
  LEX_MASTER_INFO* lex_mi= &session->lex->mi;
 
1140
  // TODO: see if needs re-write
 
1141
  if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
 
1142
  {
 
1143
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
 
1144
    unlock_slave_threads(mi);
 
1145
    return(true);
 
1146
  }
 
1147
 
 
1148
  /*
 
1149
    Data lock not needed since we have already stopped the running threads,
 
1150
    and we have the hold on the run locks which will keep all threads that
 
1151
    could possibly modify the data structures from running
 
1152
  */
 
1153
 
 
1154
  /*
 
1155
    If the user specified host or port without binlog or position,
 
1156
    reset binlog's name to FIRST and position to 4.
 
1157
  */
 
1158
 
 
1159
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
 
1160
    mi->reset();
 
1161
 
 
1162
  if (lex_mi->log_file_name)
 
1163
    mi->setLogName(lex_mi->log_file_name);
 
1164
  if (lex_mi->pos)
 
1165
  {
 
1166
    mi->setLogPosition(lex_mi->pos);
 
1167
  }
 
1168
 
 
1169
  if (lex_mi->host)
 
1170
    mi->setHost(lex_mi->host, lex_mi->port);
 
1171
  if (lex_mi->user)
 
1172
    mi->setUsername(lex_mi->user);
 
1173
  if (lex_mi->password)
 
1174
    mi->setPassword(lex_mi->password);
 
1175
  if (lex_mi->connect_retry)
 
1176
    mi->connect_retry = lex_mi->connect_retry;
 
1177
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1178
    mi->heartbeat_period = lex_mi->heartbeat_period;
 
1179
  else
 
1180
    mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
 
1181
                                      (slave_net_timeout/2.0));
 
1182
  mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
 
1183
 
 
1184
  if (lex_mi->relay_log_name)
 
1185
  {
 
1186
    need_relay_log_purge= 0;
 
1187
    mi->rli.event_relay_log_name.assign(lex_mi->relay_log_name);
 
1188
  }
 
1189
 
 
1190
  if (lex_mi->relay_log_pos)
 
1191
  {
 
1192
    need_relay_log_purge= 0;
 
1193
    mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
 
1194
  }
 
1195
 
 
1196
  /*
 
1197
    If user did specify neither host nor port nor any log name nor any log
 
1198
    pos, i.e. he specified only user/password/master_connect_retry, he probably
 
1199
    wants replication to resume from where it had left, i.e. from the
 
1200
    coordinates of the **SQL** thread (imagine the case where the I/O is ahead
 
1201
    of the SQL; restarting from the coordinates of the I/O would lose some
 
1202
    events which is probably unwanted when you are just doing minor changes
 
1203
    like changing master_connect_retry).
 
1204
    A side-effect is that if only the I/O thread was started, this thread may
 
1205
    restart from ''/4 after the CHANGE MASTER. That's a minor problem (it is a
 
1206
    much more unlikely situation than the one we are fixing here).
 
1207
    Note: coordinates of the SQL thread must be read here, before the
 
1208
    'if (need_relay_log_purge)' block which resets them.
 
1209
  */
 
1210
  if (!lex_mi->host && !lex_mi->port &&
 
1211
      !lex_mi->log_file_name && !lex_mi->pos &&
 
1212
      need_relay_log_purge)
 
1213
   {
 
1214
     /*
 
1215
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
 
1216
       not initialized), so we use a cmax().
 
1217
       What happens to mi->rli.master_log_pos during the initialization stages
 
1218
       of replication is not 100% clear, so we guard against problems using
 
1219
       cmax().
 
1220
      */
 
1221
     mi->setLogPosition(((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
 
1222
                         ? BIN_LOG_HEADER_SIZE
 
1223
                         : mi->rli.group_master_log_pos));
 
1224
     mi->setLogName(mi->rli.group_master_log_name.c_str());
 
1225
  }
 
1226
  /*
 
1227
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
 
1228
    a slave before).
 
1229
  */
 
1230
  if (mi->flush())
 
1231
  {
 
1232
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
 
1233
    unlock_slave_threads(mi);
 
1234
    return(true);
 
1235
  }
 
1236
  if (need_relay_log_purge)
 
1237
  {
 
1238
    relay_log_purge= 1;
 
1239
    session->set_proc_info("Purging old relay logs");
 
1240
    if (purge_relay_logs(&mi->rli, session,
 
1241
                         0 /* not only reset, but also reinit */,
 
1242
                         &errmsg))
 
1243
    {
 
1244
      my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
 
1245
      unlock_slave_threads(mi);
 
1246
      return(true);
 
1247
    }
 
1248
  }
 
1249
  else
 
1250
  {
 
1251
    const char* msg;
 
1252
    relay_log_purge= 0;
 
1253
    /* Relay log is already initialized */
 
1254
    if (init_relay_log_pos(&mi->rli,
 
1255
                           mi->rli.group_relay_log_name.c_str(),
 
1256
                           mi->rli.group_relay_log_pos,
 
1257
                           0 /*no data lock*/,
 
1258
                           &msg, 0))
 
1259
    {
 
1260
      my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
 
1261
      unlock_slave_threads(mi);
 
1262
      return(true);
 
1263
    }
 
1264
  }
 
1265
  /*
 
1266
    Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block,
 
1267
    so restore them to good values. If we left them to ''/0, that would work;
 
1268
    but that would fail in the case of 2 successive CHANGE MASTER (without a
 
1269
    START SLAVE in between): because first one would set the coords in mi to
 
1270
    the good values of those in rli, the set those in rli to ''/0, then
 
1271
    second CHANGE MASTER would set the coords in mi to those of rli, i.e. to
 
1272
    ''/0: we have lost all copies of the original good coordinates.
 
1273
    That's why we always save good coords in rli.
 
1274
  */
 
1275
  mi->rli.group_master_log_pos= mi->getLogPosition();
 
1276
  mi->rli.group_master_log_name.assign(mi->getLogName());
 
1277
 
 
1278
  if (mi->rli.group_master_log_name.size() == 0) // uninitialized case
 
1279
    mi->rli.group_master_log_pos= 0;
 
1280
 
 
1281
  pthread_mutex_lock(&mi->rli.data_lock);
 
1282
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
 
1283
  /* Clear the errors, for a clean start */
 
1284
  mi->rli.clear_error();
 
1285
  mi->rli.clear_until_condition();
 
1286
  /*
 
1287
    If we don't write new coordinates to disk now, then old will remain in
 
1288
    relay-log.info until START SLAVE is issued; but if mysqld is shutdown
 
1289
    before START SLAVE, then old will remain in relay-log.info, and will be the
 
1290
    in-memory value at restart (thus causing errors, as the old relay log does
 
1291
    not exist anymore).
 
1292
  */
 
1293
  flush_relay_log_info(&mi->rli);
 
1294
  pthread_cond_broadcast(&mi->data_cond);
 
1295
  pthread_mutex_unlock(&mi->rli.data_lock);
 
1296
 
 
1297
  unlock_slave_threads(mi);
 
1298
  session->set_proc_info(0);
 
1299
  my_ok(session);
 
1300
  return(false);
 
1301
}
 
1302
 
 
1303
int reset_master(Session* session)
 
1304
{
 
1305
  if (!mysql_bin_log.is_open())
 
1306
  {
 
1307
    my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
 
1308
               ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
 
1309
    return 1;
 
1310
  }
 
1311
  return mysql_bin_log.reset_logs(session);
 
1312
}
 
1313
 
 
1314
int cmp_master_pos(const char* log_file_name1, uint64_t log_pos1,
 
1315
                   const char* log_file_name2, uint64_t log_pos2)
 
1316
{
 
1317
  int res;
 
1318
  uint32_t log_file_name1_len=  strlen(log_file_name1);
 
1319
  uint32_t log_file_name2_len=  strlen(log_file_name2);
 
1320
 
 
1321
  //  We assume that both log names match up to '.'
 
1322
  if (log_file_name1_len == log_file_name2_len)
 
1323
  {
 
1324
    if ((res= strcmp(log_file_name1, log_file_name2)))
 
1325
      return res;
 
1326
    return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
 
1327
  }
 
1328
  return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
 
1329
}
 
1330
 
 
1331
 
 
1332
bool show_binlog_info(Session* session)
 
1333
{
 
1334
  Protocol *protocol= session->protocol;
 
1335
  List<Item> field_list;
 
1336
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
 
1337
  field_list.push_back(new Item_return_int("Position",20,
 
1338
                                           DRIZZLE_TYPE_LONGLONG));
 
1339
  field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
 
1340
  field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
 
1341
 
 
1342
  if (protocol->send_fields(&field_list,
 
1343
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1344
    return(true);
 
1345
  protocol->prepare_for_resend();
 
1346
 
 
1347
  if (mysql_bin_log.is_open())
 
1348
  {
 
1349
    LOG_INFO li;
 
1350
    mysql_bin_log.get_current_log(&li);
 
1351
    int dir_len = dirname_length(li.log_file_name);
 
1352
    protocol->store(li.log_file_name + dir_len, &my_charset_bin);
 
1353
    protocol->store((uint64_t) li.pos);
 
1354
    protocol->store(binlog_filter->get_do_db());
 
1355
    protocol->store(binlog_filter->get_ignore_db());
 
1356
    if (protocol->write())
 
1357
      return(true);
 
1358
  }
 
1359
  my_eof(session);
 
1360
  return(false);
 
1361
}
 
1362
 
 
1363
 
 
1364
/*
 
1365
  Send a list of all binary logs to client
 
1366
 
 
1367
  SYNOPSIS
 
1368
    show_binlogs()
 
1369
    session             Thread specific variable
 
1370
 
 
1371
  RETURN VALUES
 
1372
    false OK
 
1373
    true  error
 
1374
*/
 
1375
 
 
1376
bool show_binlogs(Session* session)
 
1377
{
 
1378
  IO_CACHE *index_file;
 
1379
  LOG_INFO cur;
 
1380
  File file;
 
1381
  char fname[FN_REFLEN];
 
1382
  List<Item> field_list;
 
1383
  uint32_t length;
 
1384
  int cur_dir_len;
 
1385
  Protocol *protocol= session->protocol;
 
1386
 
 
1387
  if (!mysql_bin_log.is_open())
 
1388
  {
 
1389
    my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0));
 
1390
    return 1;
 
1391
  }
 
1392
 
 
1393
  field_list.push_back(new Item_empty_string("Log_name", 255));
 
1394
  field_list.push_back(new Item_return_int("File_size", 20,
 
1395
                                           DRIZZLE_TYPE_LONGLONG));
 
1396
  if (protocol->send_fields(&field_list,
 
1397
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1398
    return(true);
 
1399
  
 
1400
  pthread_mutex_lock(mysql_bin_log.get_log_lock());
 
1401
  mysql_bin_log.lock_index();
 
1402
  index_file=mysql_bin_log.get_index_file();
 
1403
  
 
1404
  mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
 
1405
  pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
 
1406
  
 
1407
  cur_dir_len= dirname_length(cur.log_file_name);
 
1408
 
 
1409
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
 
1410
 
 
1411
  /* The file ends with EOF or empty line */
 
1412
  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
 
1413
  {
 
1414
    int dir_len;
 
1415
    uint64_t file_length= 0;                   // Length if open fails
 
1416
    fname[--length] = '\0';                     // remove the newline
 
1417
 
 
1418
    protocol->prepare_for_resend();
 
1419
    dir_len= dirname_length(fname);
 
1420
    length-= dir_len;
 
1421
    protocol->store(fname + dir_len, length, &my_charset_bin);
 
1422
 
 
1423
    if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
 
1424
      file_length= cur.pos;  /* The active log, use the active position */
 
1425
    else
 
1426
    {
 
1427
      /* this is an old log, open it and find the size */
 
1428
      if ((file= my_open(fname, O_RDONLY,
 
1429
                         MYF(0))) >= 0)
 
1430
      {
 
1431
        file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));
 
1432
        my_close(file, MYF(0));
 
1433
      }
 
1434
    }
 
1435
    protocol->store(file_length);
 
1436
    if (protocol->write())
 
1437
      goto err;
 
1438
  }
 
1439
  mysql_bin_log.unlock_index();
 
1440
  my_eof(session);
 
1441
  return(false);
 
1442
 
 
1443
err:
 
1444
  mysql_bin_log.unlock_index();
 
1445
  return(true);
 
1446
}
 
1447
 
 
1448
/**
 
1449
   Load data's io cache specific hook to be executed
 
1450
   before a chunk of data is being read into the cache's buffer
 
1451
   The fuction instantianates and writes into the binlog
 
1452
   replication events along LOAD DATA processing.
 
1453
   
 
1454
   @param file  pointer to io-cache
 
1455
   @return 0
 
1456
*/
 
1457
int log_loaded_block(IO_CACHE* file)
 
1458
{
 
1459
  LOAD_FILE_INFO *lf_info;
 
1460
  uint32_t block_len;
 
1461
  /* buffer contains position where we started last read */
 
1462
  unsigned char* buffer= (unsigned char*) my_b_get_buffer_start(file);
 
1463
  uint32_t max_event_size= current_session->variables.max_allowed_packet;
 
1464
  lf_info= (LOAD_FILE_INFO*) file->arg;
 
1465
  if (lf_info->session->current_stmt_binlog_row_based)
 
1466
    return(0);
 
1467
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
 
1468
      lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
 
1469
    return(0);
 
1470
  
 
1471
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
 
1472
       buffer += cmin(block_len, max_event_size),
 
1473
       block_len -= cmin(block_len, max_event_size))
 
1474
  {
 
1475
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
 
1476
    if (lf_info->wrote_create_file)
 
1477
    {
 
1478
      Append_block_log_event a(lf_info->session, lf_info->session->db, buffer,
 
1479
                               cmin(block_len, max_event_size),
 
1480
                               lf_info->log_delayed);
 
1481
      mysql_bin_log.write(&a);
 
1482
    }
 
1483
    else
 
1484
    {
 
1485
      Begin_load_query_log_event b(lf_info->session, lf_info->session->db,
 
1486
                                   buffer,
 
1487
                                   cmin(block_len, max_event_size),
 
1488
                                   lf_info->log_delayed);
 
1489
      mysql_bin_log.write(&b);
 
1490
      lf_info->wrote_create_file= 1;
 
1491
    }
 
1492
  }
 
1493
  return(0);
 
1494
}
 
1495
 
 
1496
/*
 
1497
  Replication System Variables
 
1498
*/
 
1499
 
 
1500
class sys_var_slave_skip_counter :public sys_var
 
1501
{
 
1502
public:
 
1503
  sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
 
1504
    :sys_var(name_arg)
 
1505
  { chain_sys_var(chain); }
 
1506
  bool check(Session *session, set_var *var);
 
1507
  bool update(Session *session, set_var *var);
 
1508
  bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
 
1509
  /*
 
1510
    We can't retrieve the value of this, so we don't have to define
 
1511
    type() or value_ptr()
 
1512
  */
 
1513
};
 
1514
 
 
1515
class sys_var_sync_binlog_period :public sys_var_long_ptr
 
1516
{
 
1517
public:
 
1518
  sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
 
1519
                             ulong *value_ptr)
 
1520
    :sys_var_long_ptr(chain, name_arg,value_ptr) {}
 
1521
  bool update(Session *session, set_var *var);
 
1522
};
 
1523
 
 
1524
static void fix_slave_net_timeout(Session *session,
 
1525
                                  enum_var_type type __attribute__((unused)))
 
1526
{
 
1527
  pthread_mutex_lock(&LOCK_active_mi);
 
1528
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
 
1529
    push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1530
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
 
1531
                        "The currect value for master_heartbeat_period"
 
1532
                        " exceeds the new value of `slave_net_timeout' sec."
 
1533
                        " A sensible value for the period should be"
 
1534
                        " less than the timeout.");
 
1535
  pthread_mutex_unlock(&LOCK_active_mi);
 
1536
  return;
 
1537
}
 
1538
 
 
1539
static sys_var_chain vars = { NULL, NULL };
 
1540
 
 
1541
static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
 
1542
                                            &relay_log_purge);
 
1543
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
 
1544
                                              &slave_net_timeout,
 
1545
                                              fix_slave_net_timeout);
 
1546
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
 
1547
                                                &slave_trans_retries);
 
1548
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
 
1549
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
 
1550
 
 
1551
static int show_slave_skip_errors(Session *session, SHOW_VAR *var, char *buff);
 
1552
 
 
1553
 
 
1554
static int show_slave_skip_errors(Session *session __attribute__((unused)),
 
1555
                                  SHOW_VAR *var, char *buff)
 
1556
{
 
1557
  var->type=SHOW_CHAR;
 
1558
  var->value= buff;
 
1559
  if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
 
1560
  {
 
1561
    var->value= const_cast<char *>("OFF");
 
1562
  }
 
1563
  else if (bitmap_is_set_all(&slave_error_mask))
 
1564
  {
 
1565
    var->value= const_cast<char *>("ALL");
 
1566
  }
 
1567
  else
 
1568
  {
 
1569
    /* 10 is enough assuming errors are max 4 digits */
 
1570
    int i;
 
1571
    var->value= buff;
 
1572
    for (i= 1;
 
1573
         i < MAX_SLAVE_ERROR &&
 
1574
         (buff - var->value) < SHOW_VAR_FUNC_BUFF_SIZE;
 
1575
         i++)
 
1576
    {
 
1577
      if (bitmap_is_set(&slave_error_mask, i))
 
1578
      {
 
1579
        buff= int10_to_str(i, buff, 10);
 
1580
        *buff++= ',';
 
1581
      }
 
1582
    }
 
1583
    if (var->value != buff)
 
1584
      buff--;                           // Remove last ','
 
1585
    if (i < MAX_SLAVE_ERROR)
 
1586
      buff= my_stpcpy(buff, "...");  // Couldn't show all errors
 
1587
    *buff=0;
 
1588
  }
 
1589
  return 0;
 
1590
}
 
1591
 
 
1592
static st_show_var_func_container
 
1593
show_slave_skip_errors_cont = { &show_slave_skip_errors };
 
1594
 
 
1595
 
 
1596
static SHOW_VAR fixed_vars[]= {
 
1597
  {"log_slave_updates",       (char*) &opt_log_slave_updates,       SHOW_MY_BOOL},
 
1598
  {"relay_log" , (char*) &opt_relay_logname, SHOW_CHAR_PTR},
 
1599
  {"relay_log_index", (char*) &opt_relaylog_index_name, SHOW_CHAR_PTR},
 
1600
  {"relay_log_info_file", (char*) &relay_log_info_file, SHOW_CHAR_PTR},
 
1601
  {"relay_log_space_limit",   (char*) &relay_log_space_limit,       SHOW_LONGLONG},
 
1602
  {"slave_load_tmpdir",       (char*) &slave_load_tmpdir,           SHOW_CHAR_PTR},
 
1603
  {"slave_skip_errors",       (char*) &show_slave_skip_errors_cont,      SHOW_FUNC},
 
1604
};
 
1605
 
 
1606
bool sys_var_slave_skip_counter::check(Session *session __attribute__((unused)),
 
1607
                                       set_var *var)
 
1608
{
 
1609
  int result= 0;
 
1610
  pthread_mutex_lock(&LOCK_active_mi);
 
1611
  pthread_mutex_lock(&active_mi->rli.run_lock);
 
1612
  if (active_mi->rli.slave_running)
 
1613
  {
 
1614
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
 
1615
    result=1;
 
1616
  }
 
1617
  pthread_mutex_unlock(&active_mi->rli.run_lock);
 
1618
  pthread_mutex_unlock(&LOCK_active_mi);
 
1619
  var->save_result.uint32_t_value= (ulong) var->value->val_int();
 
1620
  return result;
 
1621
}
 
1622
 
 
1623
 
 
1624
bool sys_var_slave_skip_counter::update(Session *session __attribute__((unused)),
 
1625
                                        set_var *var)
 
1626
{
 
1627
  pthread_mutex_lock(&LOCK_active_mi);
 
1628
  pthread_mutex_lock(&active_mi->rli.run_lock);
 
1629
  /*
 
1630
    The following test should normally never be true as we test this
 
1631
    in the check function;  To be safe against multiple
 
1632
    SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
 
1633
  */
 
1634
  if (!active_mi->rli.slave_running)
 
1635
  {
 
1636
    pthread_mutex_lock(&active_mi->rli.data_lock);
 
1637
    active_mi->rli.slave_skip_counter= var->save_result.uint32_t_value;
 
1638
    pthread_mutex_unlock(&active_mi->rli.data_lock);
 
1639
  }
 
1640
  pthread_mutex_unlock(&active_mi->rli.run_lock);
 
1641
  pthread_mutex_unlock(&LOCK_active_mi);
 
1642
  return 0;
 
1643
}
 
1644
 
 
1645
 
 
1646
bool sys_var_sync_binlog_period::update(Session *session __attribute__((unused)),
 
1647
                                        set_var *var)
 
1648
{
 
1649
  sync_binlog_period= (uint32_t) var->save_result.uint64_t_value;
 
1650
  return 0;
 
1651
}
 
1652
 
 
1653
int init_replication_sys_vars()
 
1654
{
 
1655
  mysql_append_static_vars(fixed_vars, sizeof(fixed_vars) / sizeof(SHOW_VAR));
 
1656
 
 
1657
  if (mysql_add_sys_var_chain(vars.first, my_long_options))
 
1658
  {
 
1659
    /* should not happen */
 
1660
    fprintf(stderr, "failed to initialize replication system variables");
 
1661
    unireg_abort(1);
 
1662
  }
 
1663
  return 0;
 
1664
}