~drizzle-trunk/drizzle/development

1 by brian
clean slate
1
/* Copyright (C) 2000-2006 MySQL AB & Sasha
2
3
   This program is free software; you can redistribute it and/or modify
4
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
6
7
   This program is distributed in the hope that it will be useful,
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
   GNU General Public License for more details.
11
12
   You should have received a copy of the GNU General Public License
13
   along with this program; if not, write to the Free Software
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
16
#include "mysql_priv.h"
17
#ifdef HAVE_REPLICATION
18
19
#include "rpl_mi.h"
20
#include "sql_repl.h"
21
#include "log_event.h"
22
#include "rpl_filter.h"
23
#include <my_dir.h>
24
25
int max_binlog_dump_events = 0; // unlimited
26
my_bool opt_sporadic_binlog_dump_fail = 0;
27
#ifndef DBUG_OFF
28
static int binlog_dump_count = 0;
29
#endif
30
31
/*
32
    fake_rotate_event() builds a fake (=which does not exist physically in any
33
    binlog) Rotate event, which contains the name of the binlog we are going to
34
    send to the slave (because the slave may not know it if it just asked for
35
    MASTER_LOG_FILE='', MASTER_LOG_POS=4).
36
    < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
37
    After this version we always call it, so that a 3.23.58 slave can rely on
38
    it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
39
    zeros in the good positions which, by chance, make it possible for the 3.23
40
    slave to detect that this event is unexpected) (this is luck which happens
41
    because the master and slave disagree on the size of the header of
42
    Log_event).
43
44
    Relying on the event length of the Rotate event instead of these
45
    well-placed zeros was not possible as Rotate events have a variable-length
46
    part.
47
*/
48
49
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
50
                             ulonglong position, const char** errmsg)
51
{
52
  DBUG_ENTER("fake_rotate_event");
53
  char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
54
  /*
55
    'when' (the timestamp) is set to 0 so that slave could distinguish between
56
    real and fake Rotate events (if necessary)
57
  */
58
  memset(header, 0, 4);
59
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
60
61
  char* p = log_file_name+dirname_length(log_file_name);
62
  uint ident_len = (uint) strlen(p);
63
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
64
  int4store(header + SERVER_ID_OFFSET, server_id);
65
  int4store(header + EVENT_LEN_OFFSET, event_len);
66
  int2store(header + FLAGS_OFFSET, 0);
67
68
  // TODO: check what problems this may cause and fix them
69
  int4store(header + LOG_POS_OFFSET, 0);
70
71
  packet->append(header, sizeof(header));
72
  int8store(buf+R_POS_OFFSET,position);
73
  packet->append(buf, ROTATE_HEADER_LEN);
74
  packet->append(p,ident_len);
75
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
76
  {
77
    *errmsg = "failed on my_net_write()";
78
    DBUG_RETURN(-1);
79
  }
80
  DBUG_RETURN(0);
81
}
82
83
static int send_file(THD *thd)
84
{
85
  NET* net = &thd->net;
86
  int fd = -1, error = 1;
87
  size_t bytes;
88
  char fname[FN_REFLEN+1];
89
  const char *errmsg = 0;
90
  int old_timeout;
91
  unsigned long packet_len;
92
  uchar buf[IO_SIZE];				// It's safe to alloc this
93
  DBUG_ENTER("send_file");
94
95
  /*
96
    The client might be slow loading the data, give him wait_timeout to do
97
    the job
98
  */
99
  old_timeout= net->read_timeout;
100
  my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
101
102
  /*
103
    We need net_flush here because the client will not know it needs to send
104
    us the file name until it has processed the load event entry
105
  */
106
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
107
  {
108
    errmsg = "while reading file name";
109
    goto err;
110
  }
111
112
  // terminate with \0 for fn_format
113
  *((char*)net->read_pos +  packet_len) = 0;
114
  fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
115
  // this is needed to make replicate-ignore-db
116
  if (!strcmp(fname,"/dev/null"))
117
    goto end;
118
119
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
120
  {
121
    errmsg = "on open of file";
122
    goto err;
123
  }
124
125
  while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0)
126
  {
127
    if (my_net_write(net, buf, bytes))
128
    {
129
      errmsg = "while writing data to client";
130
      goto err;
131
    }
132
  }
133
134
 end:
135
  if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
136
      (my_net_read(net) == packet_error))
137
  {
138
    errmsg = "while negotiating file transfer close";
139
    goto err;
140
  }
141
  error = 0;
142
143
 err:
144
  my_net_set_read_timeout(net, old_timeout);
145
  if (fd >= 0)
146
    (void) my_close(fd, MYF(0));
147
  if (errmsg)
148
  {
149
    sql_print_error("Failed in send_file() %s", errmsg);
150
    DBUG_PRINT("error", (errmsg));
151
  }
152
  DBUG_RETURN(error);
153
}
154
155
156
/*
157
  Adjust the position pointer in the binary log file for all running slaves
158
159
  SYNOPSIS
160
    adjust_linfo_offsets()
161
    purge_offset	Number of bytes removed from start of log index file
162
163
  NOTES
164
    - This is called when doing a PURGE when we delete lines from the
165
      index log file
166
167
  REQUIREMENTS
168
    - Before calling this function, we have to ensure that no threads are
169
      using any binary log file before purge_offset.a
170
171
  TODO
172
    - Inform the slave threads that they should sync the position
173
      in the binary log file with flush_relay_log_info.
174
      Now they sync is done for next read.
175
*/
176
177
void adjust_linfo_offsets(my_off_t purge_offset)
178
{
179
  THD *tmp;
180
181
  pthread_mutex_lock(&LOCK_thread_count);
182
  I_List_iterator<THD> it(threads);
183
184
  while ((tmp=it++))
185
  {
186
    LOG_INFO* linfo;
187
    if ((linfo = tmp->current_linfo))
188
    {
189
      pthread_mutex_lock(&linfo->lock);
190
      /*
191
	Index file offset can be less that purge offset only if
192
	we just started reading the index file. In that case
193
	we have nothing to adjust
194
      */
195
      if (linfo->index_file_offset < purge_offset)
196
	linfo->fatal = (linfo->index_file_offset != 0);
197
      else
198
	linfo->index_file_offset -= purge_offset;
199
      pthread_mutex_unlock(&linfo->lock);
200
    }
201
  }
202
  pthread_mutex_unlock(&LOCK_thread_count);
203
}
204
205
206
bool log_in_use(const char* log_name)
207
{
208
  int log_name_len = strlen(log_name) + 1;
209
  THD *tmp;
210
  bool result = 0;
211
212
  pthread_mutex_lock(&LOCK_thread_count);
213
  I_List_iterator<THD> it(threads);
214
215
  while ((tmp=it++))
216
  {
217
    LOG_INFO* linfo;
218
    if ((linfo = tmp->current_linfo))
219
    {
220
      pthread_mutex_lock(&linfo->lock);
221
      result = !bcmp((uchar*) log_name, (uchar*) linfo->log_file_name,
222
                     log_name_len);
223
      pthread_mutex_unlock(&linfo->lock);
224
      if (result)
225
	break;
226
    }
227
  }
228
229
  pthread_mutex_unlock(&LOCK_thread_count);
230
  return result;
231
}
232
233
bool purge_error_message(THD* thd, int res)
234
{
235
  uint errmsg= 0;
236
237
  switch (res)  {
238
  case 0: break;
239
  case LOG_INFO_EOF:	errmsg= ER_UNKNOWN_TARGET_BINLOG; break;
240
  case LOG_INFO_IO:	errmsg= ER_IO_ERR_LOG_INDEX_READ; break;
241
  case LOG_INFO_INVALID:errmsg= ER_BINLOG_PURGE_PROHIBITED; break;
242
  case LOG_INFO_SEEK:	errmsg= ER_FSEEK_FAIL; break;
243
  case LOG_INFO_MEM:	errmsg= ER_OUT_OF_RESOURCES; break;
244
  case LOG_INFO_FATAL:	errmsg= ER_BINLOG_PURGE_FATAL_ERR; break;
245
  case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break;
246
  case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break;
247
  default:		errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break;
248
  }
249
250
  if (errmsg)
251
  {
252
    my_message(errmsg, ER(errmsg), MYF(0));
253
    return TRUE;
254
  }
255
  my_ok(thd);
256
  return FALSE;
257
}
258
259
260
bool purge_master_logs(THD* thd, const char* to_log)
261
{
262
  char search_file_name[FN_REFLEN];
263
  if (!mysql_bin_log.is_open())
264
  {
265
    my_ok(thd);
266
    return FALSE;
267
  }
268
269
  mysql_bin_log.make_log_name(search_file_name, to_log);
270
  return purge_error_message(thd,
271
			     mysql_bin_log.purge_logs(search_file_name, 0, 1,
272
						      1, NULL));
273
}
274
275
276
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
277
{
278
  if (!mysql_bin_log.is_open())
279
  {
280
    my_ok(thd);
281
    return 0;
282
  }
283
  return purge_error_message(thd,
284
                             mysql_bin_log.purge_logs_before_date(purge_time));
285
}
286
287
int test_for_non_eof_log_read_errors(int error, const char **errmsg)
288
{
289
  if (error == LOG_READ_EOF)
290
    return 0;
291
  my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
292
  switch (error) {
293
  case LOG_READ_BOGUS:
294
    *errmsg = "bogus data in log event";
295
    break;
296
  case LOG_READ_TOO_LARGE:
297
    *errmsg = "log event entry exceeded max_allowed_packet; \
298
Increase max_allowed_packet on master";
299
    break;
300
  case LOG_READ_IO:
301
    *errmsg = "I/O error reading log event";
302
    break;
303
  case LOG_READ_MEM:
304
    *errmsg = "memory allocation failed reading log event";
305
    break;
306
  case LOG_READ_TRUNC:
307
    *errmsg = "binlog truncated in the middle of event";
308
    break;
309
  default:
310
    *errmsg = "unknown error reading log event on the master";
311
    break;
312
  }
313
  return error;
314
}
315
316
317
/**
318
  An auxiliary function for calling in mysql_binlog_send
319
  to initialize the heartbeat timeout in waiting for a binlogged event.
320
321
  @param[in]    thd  THD to access a user variable
322
323
  @return        heartbeat period an ulonglong of nanoseconds
324
                 or zero if heartbeat was not demanded by slave
325
*/ 
326
static ulonglong get_heartbeat_period(THD * thd)
327
{
328
  my_bool null_value;
329
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
330
  user_var_entry *entry= 
331
    (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
332
                                  name.length);
333
  return entry? entry->val_int(&null_value) : 0;
334
}
335
336
/*
337
  Function prepares and sends repliation heartbeat event.
338
339
  @param net                net object of THD
340
  @param packet             buffer to store the heartbeat instance
341
  @param event_coordinates  binlog file name and position of the last
342
                            real event master sent from binlog
343
344
  @note 
345
    Among three essential pieces of heartbeat data Log_event::when
346
    is computed locally.
347
    The  error to send is serious and should force terminating
348
    the dump thread.
349
*/
350
static int send_heartbeat_event(NET* net, String* packet,
351
                                const struct event_coordinates *coord)
352
{
353
  DBUG_ENTER("send_heartbeat_event");
354
  char header[LOG_EVENT_HEADER_LEN];
355
  /*
356
    'when' (the timestamp) is set to 0 so that slave could distinguish between
357
    real and fake Rotate events (if necessary)
358
  */
359
  memset(header, 0, 4);  // when
360
361
  header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
362
363
  char* p= coord->file_name + dirname_length(coord->file_name);
364
365
  uint ident_len = strlen(p);
366
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
367
  int4store(header + SERVER_ID_OFFSET, server_id);
368
  int4store(header + EVENT_LEN_OFFSET, event_len);
369
  int2store(header + FLAGS_OFFSET, 0);
370
371
  int4store(header + LOG_POS_OFFSET, coord->pos);  // log_pos
372
373
  packet->append(header, sizeof(header));
374
  packet->append(p, ident_len);             // log_file_name
375
376
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
377
      net_flush(net))
378
  {
379
    DBUG_RETURN(-1);
380
  }
381
  packet->set("\0", 1, &my_charset_bin);
382
  DBUG_RETURN(0);
383
}
384
385
/*
386
  TODO: Clean up loop to only have one call to send_file()
387
*/
388
389
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
390
		       ushort flags)
391
{
392
  LOG_INFO linfo;
393
  char *log_file_name = linfo.log_file_name;
394
  char search_file_name[FN_REFLEN], *name;
395
  IO_CACHE log;
396
  File file = -1;
397
  String* packet = &thd->packet;
398
  int error;
399
  const char *errmsg = "Unknown error";
400
  NET* net = &thd->net;
401
  pthread_mutex_t *log_lock;
402
  bool binlog_can_be_corrupted= FALSE;
403
#ifndef DBUG_OFF
404
  int left_events = max_binlog_dump_events;
405
#endif
406
  DBUG_ENTER("mysql_binlog_send");
407
  DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));
408
409
  bzero((char*) &log,sizeof(log));
410
  /* 
411
     heartbeat_period from @master_heartbeat_period user variable
412
  */
413
  ulonglong heartbeat_period= get_heartbeat_period(thd);
414
  struct timespec heartbeat_buf;
415
  struct event_coordinates coord_buf;
416
  struct timespec *heartbeat_ts= NULL;
417
  struct event_coordinates *coord= NULL;
80.1.1 by Brian Aker
LL() cleanup
418
  if (heartbeat_period != 0LL)
1 by brian
clean slate
419
  {
420
    heartbeat_ts= &heartbeat_buf;
421
    set_timespec_nsec(*heartbeat_ts, 0);
422
    coord= &coord_buf;
423
    coord->file_name= log_file_name; // initialization basing on what slave remembers
424
    coord->pos= pos;
425
  }
426
#ifndef DBUG_OFF
427
  if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
428
  {
429
    errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover";
430
    my_errno= ER_UNKNOWN_ERROR;
431
    goto err;
432
  }
433
#endif
434
435
  if (!mysql_bin_log.is_open())
436
  {
437
    errmsg = "Binary log is not open";
438
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
439
    goto err;
440
  }
441
  if (!server_id_supplied)
442
  {
443
    errmsg = "Misconfigured master - server id was not set";
444
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
445
    goto err;
446
  }
447
448
  name=search_file_name;
449
  if (log_ident[0])
450
    mysql_bin_log.make_log_name(search_file_name, log_ident);
451
  else
452
    name=0;					// Find first log
453
454
  linfo.index_file_offset = 0;
455
456
  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
457
  {
458
    errmsg = "Could not find first log file name in binary log index file";
459
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
460
    goto err;
461
  }
462
463
  pthread_mutex_lock(&LOCK_thread_count);
464
  thd->current_linfo = &linfo;
465
  pthread_mutex_unlock(&LOCK_thread_count);
466
467
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
468
  {
469
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
470
    goto err;
471
  }
472
  if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
473
  {
474
    errmsg= "Client requested master to start replication from \
475
impossible position";
476
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
477
    goto err;
478
  }
479
480
  /*
481
    We need to start a packet with something other than 255
482
    to distinguish it from error
483
  */
484
  packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
485
486
  /*
487
    Tell the client about the log name with a fake Rotate event;
488
    this is needed even if we also send a Format_description_log_event
489
    just after, because that event does not contain the binlog's name.
490
    Note that as this Rotate event is sent before
491
    Format_description_log_event, the slave cannot have any info to
492
    understand this event's format, so the header len of
493
    Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
494
    than other events except FORMAT_DESCRIPTION_EVENT).
495
    Before 4.0.14 we called fake_rotate_event below only if (pos ==
496
    BIN_LOG_HEADER_SIZE), because if this is false then the slave
497
    already knows the binlog's name.
498
    Since, we always call fake_rotate_event; if the slave already knew
499
    the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
500
    useless but does not harm much. It is nice for 3.23 (>=.58) slaves
501
    which test Rotate events to see if the master is 4.0 (then they
502
    choose to stop because they can't replicate 4.0); by always calling
503
    fake_rotate_event we are sure that 3.23.58 and newer will detect the
504
    problem as soon as replication starts (BUG#198).
505
    Always calling fake_rotate_event makes sending of normal
506
    (=from-binlog) Rotate events a priori unneeded, but it is not so
507
    simple: the 2 Rotate events are not equivalent, the normal one is
508
    before the Stop event, the fake one is after. If we don't send the
509
    normal one, then the Stop event will be interpreted (by existing 4.0
510
    slaves) as "the master stopped", which is wrong. So for safety,
511
    given that we want minimum modification of 4.0, we send the normal
512
    and fake Rotates.
513
  */
514
  if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
515
  {
516
    /*
517
       This error code is not perfect, as fake_rotate_event() does not
518
       read anything from the binlog; if it fails it's because of an
519
       error in my_net_write(), fortunately it will say so in errmsg.
520
    */
521
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
522
    goto err;
523
  }
524
  packet->set("\0", 1, &my_charset_bin);
525
  /*
526
    Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
527
    this larger than the corresponding packet (query) sent 
528
    from client to master.
529
  */
530
  thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
531
532
  /*
533
    We can set log_lock now, it does not move (it's a member of
534
    mysql_bin_log, and it's already inited, and it will be destroyed
535
    only at shutdown).
536
  */
537
  log_lock = mysql_bin_log.get_log_lock();
538
  if (pos > BIN_LOG_HEADER_SIZE)
539
  {
540
     /*
541
       Try to find a Format_description_log_event at the beginning of
542
       the binlog
543
     */
544
     if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
545
     {
546
       /*
547
         The packet has offsets equal to the normal offsets in a binlog
548
         event +1 (the first character is \0).
549
       */
550
       if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
551
       {
552
         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
553
                                       LOG_EVENT_BINLOG_IN_USE_F);
554
         (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
555
         /*
556
           mark that this event with "log_pos=0", so the slave
557
           should not increment master's binlog position
558
           (rli->group_master_log_pos)
559
         */
560
         int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
561
         /*
562
           if reconnect master sends FD event with `created' as 0
563
           to avoid destroying temp tables.
564
          */
565
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
566
                   ST_CREATED_OFFSET+1, (ulong) 0);
567
         /* send it */
568
         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
569
         {
570
           errmsg = "Failed on my_net_write()";
571
           my_errno= ER_UNKNOWN_ERROR;
572
           goto err;
573
         }
574
575
         /*
576
           No need to save this event. We are only doing simple reads
577
           (no real parsing of the events) so we don't need it. And so
578
           we don't need the artificial Format_description_log_event of
579
           3.23&4.x.
580
         */
581
       }
582
     }
583
     else
584
     {
585
       if (test_for_non_eof_log_read_errors(error, &errmsg))
586
         goto err;
587
       /*
588
         It's EOF, nothing to do, go on reading next events, the
589
         Format_description_log_event will be found naturally if it is written.
590
       */
591
     }
592
     /* reset the packet as we wrote to it in any case */
593
     packet->set("\0", 1, &my_charset_bin);
594
  } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
595
  else
596
  {
597
    /* The Format_description_log_event event will be found naturally. */
598
  }
599
600
  /* seek to the requested position, to start the requested dump */
601
  my_b_seek(&log, pos);			// Seek will done on next read
602
603
  while (!net->error && net->vio != 0 && !thd->killed)
604
  {
605
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
606
    {
607
#ifndef DBUG_OFF
608
      if (max_binlog_dump_events && !left_events--)
609
      {
610
	net_flush(net);
611
	errmsg = "Debugging binlog dump abort";
612
	my_errno= ER_UNKNOWN_ERROR;
613
	goto err;
614
      }
615
#endif
616
      /*
617
        log's filename does not change while it's active
618
      */
619
      if (coord)
620
        coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
621
622
      if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
623
      {
624
        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
625
                                      LOG_EVENT_BINLOG_IN_USE_F);
626
        (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
627
      }
628
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
629
        binlog_can_be_corrupted= FALSE;
630
631
      if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
632
      {
633
	errmsg = "Failed on my_net_write()";
634
	my_errno= ER_UNKNOWN_ERROR;
635
	goto err;
636
      }
637
638
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
639
      {
640
	if (send_file(thd))
641
	{
642
	  errmsg = "failed in send_file()";
643
	  my_errno= ER_UNKNOWN_ERROR;
644
	  goto err;
645
	}
646
      }
647
      packet->set("\0", 1, &my_charset_bin);
648
    }
649
650
    /*
651
      here we were reading binlog that was not closed properly (as a result
652
      of a crash ?). treat any corruption as EOF
653
    */
654
    if (binlog_can_be_corrupted && error != LOG_READ_MEM)
655
      error=LOG_READ_EOF;
656
    /*
657
      TODO: now that we are logging the offset, check to make sure
658
      the recorded offset and the actual match.
659
      Guilhem 2003-06: this is not true if this master is a slave
660
      <4.0.15 running with --log-slave-updates, because then log_pos may
661
      be the offset in the-master-of-this-master's binlog.
662
    */
663
    if (test_for_non_eof_log_read_errors(error, &errmsg))
664
      goto err;
665
666
    if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
667
        mysql_bin_log.is_active(log_file_name))
668
    {
669
      /*
670
	Block until there is more data in the log
671
      */
672
      if (net_flush(net))
673
      {
674
	errmsg = "failed on net_flush()";
675
	my_errno= ER_UNKNOWN_ERROR;
676
	goto err;
677
      }
678
679
      /*
680
	We may have missed the update broadcast from the log
681
	that has just happened, let's try to catch it if it did.
682
	If we did not miss anything, we just wait for other threads
683
	to signal us.
684
      */
685
      {
686
	log.error=0;
687
	bool read_packet = 0, fatal_error = 0;
688
689
#ifndef DBUG_OFF
690
	if (max_binlog_dump_events && !left_events--)
691
	{
692
	  errmsg = "Debugging binlog dump abort";
693
	  my_errno= ER_UNKNOWN_ERROR;
694
	  goto err;
695
	}
696
#endif
697
698
	/*
699
	  No one will update the log while we are reading
700
	  now, but we'll be quick and just read one record
701
702
	  TODO:
703
          Add an counter that is incremented for each time we update the
704
          binary log.  We can avoid the following read if the counter
705
          has not been updated since last read.
706
	*/
707
708
	pthread_mutex_lock(log_lock);
709
	switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
710
	case 0:
711
	  /* we read successfully, so we'll need to send it to the slave */
712
	  pthread_mutex_unlock(log_lock);
713
	  read_packet = 1;
714
          if (coord)
715
            coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
716
	  break;
717
718
	case LOG_READ_EOF:
719
        {
720
          int ret;
721
	  DBUG_PRINT("wait",("waiting for data in binary log"));
722
	  if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
723
	  {
724
	    pthread_mutex_unlock(log_lock);
725
	    goto end;
726
	  }
727
728
#ifndef DBUG_OFF
729
          ulong hb_info_counter= 0;
730
#endif
731
          do 
732
          {
733
            if (coord)
734
            {
80.1.1 by Brian Aker
LL() cleanup
735
              DBUG_ASSERT(heartbeat_ts && heartbeat_period != 0LL);
1 by brian
clean slate
736
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
737
            }
738
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
80.1.1 by Brian Aker
LL() cleanup
739
            DBUG_ASSERT(ret == 0 || heartbeat_period != 0LL && coord != NULL);
1 by brian
clean slate
740
            if (ret == ETIMEDOUT || ret == ETIME)
741
            {
742
#ifndef DBUG_OFF
743
              if (hb_info_counter < 3)
744
              {
745
                sql_print_information("master sends heartbeat message");
746
                hb_info_counter++;
747
                if (hb_info_counter == 3)
748
                  sql_print_information("the rest of heartbeat info skipped ...");
749
              }
750
#endif
751
              if (send_heartbeat_event(net, packet, coord))
752
              {
753
                errmsg = "Failed on my_net_write()";
754
                my_errno= ER_UNKNOWN_ERROR;
755
                pthread_mutex_unlock(log_lock);
756
                goto err;
757
              }
758
            }
759
            else
760
            {
761
              DBUG_ASSERT(ret == 0);
762
              DBUG_PRINT("wait",("binary log received update"));
763
            }
764
          } while (ret != 0 && coord != NULL && !thd->killed);
765
          pthread_mutex_unlock(log_lock);
766
        }    
767
        break;
768
            
769
        default:
770
	  pthread_mutex_unlock(log_lock);
771
	  fatal_error = 1;
772
	  break;
773
	}
774
775
	if (read_packet)
776
	{
777
	  thd_proc_info(thd, "Sending binlog event to slave");
778
	  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
779
	  {
780
	    errmsg = "Failed on my_net_write()";
781
	    my_errno= ER_UNKNOWN_ERROR;
782
	    goto err;
783
	  }
784
785
	  if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
786
	  {
787
	    if (send_file(thd))
788
	    {
789
	      errmsg = "failed in send_file()";
790
	      my_errno= ER_UNKNOWN_ERROR;
791
	      goto err;
792
	    }
793
	  }
794
	  packet->set("\0", 1, &my_charset_bin);
795
	  /*
796
	    No need to net_flush because we will get to flush later when
797
	    we hit EOF pretty quick
798
	  */
799
	}
800
801
	if (fatal_error)
802
	{
803
	  errmsg = "error reading log entry";
804
          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
805
	  goto err;
806
	}
807
	log.error=0;
808
      }
809
    }
810
    else
811
    {
812
      bool loop_breaker = 0;
813
      /* need this to break out of the for loop from switch */
814
815
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
816
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
817
      case LOG_INFO_EOF:
818
	loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
819
	break;
820
      case 0:
821
	break;
822
      default:
823
	errmsg = "could not find next log";
824
	my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
825
	goto err;
826
      }
827
828
      if (loop_breaker)
829
        break;
830
831
      end_io_cache(&log);
832
      (void) my_close(file, MYF(MY_WME));
833
834
      /*
835
        Call fake_rotate_event() in case the previous log (the one which
836
        we have just finished reading) did not contain a Rotate event
837
        (for example (I don't know any other example) the previous log
838
        was the last one before the master was shutdown & restarted).
839
        This way we tell the slave about the new log's name and
840
        position.  If the binlog is 5.0, the next event we are going to
841
        read and send is Format_description_log_event.
842
      */
843
      if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
844
	  fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
845
                            &errmsg))
846
      {
847
	my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
848
	goto err;
849
      }
850
851
      packet->length(0);
852
      packet->append('\0');
853
      if (coord)
854
        coord->file_name= log_file_name; // reset to the next
855
    }
856
  }
857
858
end:
859
  end_io_cache(&log);
860
  (void)my_close(file, MYF(MY_WME));
861
862
  my_eof(thd);
863
  thd_proc_info(thd, "Waiting to finalize termination");
864
  pthread_mutex_lock(&LOCK_thread_count);
865
  thd->current_linfo = 0;
866
  pthread_mutex_unlock(&LOCK_thread_count);
867
  DBUG_VOID_RETURN;
868
869
err:
870
  thd_proc_info(thd, "Waiting to finalize termination");
871
  end_io_cache(&log);
872
  /*
873
    Exclude  iteration through thread list
874
    this is needed for purge_logs() - it will iterate through
875
    thread list and update thd->current_linfo->index_file_offset
876
    this mutex will make sure that it never tried to update our linfo
877
    after we return from this stack frame
878
  */
879
  pthread_mutex_lock(&LOCK_thread_count);
880
  thd->current_linfo = 0;
881
  pthread_mutex_unlock(&LOCK_thread_count);
882
  if (file >= 0)
883
    (void) my_close(file, MYF(MY_WME));
884
885
  my_message(my_errno, errmsg, MYF(0));
886
  DBUG_VOID_RETURN;
887
}
888
889
int start_slave(THD* thd , Master_info* mi,  bool net_report)
890
{
891
  int slave_errno= 0;
892
  int thread_mask;
893
  DBUG_ENTER("start_slave");
894
895
  lock_slave_threads(mi);  // this allows us to cleanly read slave_running
896
  // Get a mask of _stopped_ threads
897
  init_thread_mask(&thread_mask,mi,1 /* inverse */);
898
  /*
899
    Below we will start all stopped threads.  But if the user wants to
900
    start only one thread, do as if the other thread was running (as we
901
    don't wan't to touch the other thread), so set the bit to 0 for the
902
    other thread
903
  */
904
  if (thd->lex->slave_thd_opt)
905
    thread_mask&= thd->lex->slave_thd_opt;
906
  if (thread_mask) //some threads are stopped, start them
907
  {
908
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
909
			 thread_mask))
910
      slave_errno=ER_MASTER_INFO;
911
    else if (server_id_supplied && *mi->host)
912
    {
913
      /*
914
        If we will start SQL thread we will care about UNTIL options If
915
        not and they are specified we will ignore them and warn user
916
        about this fact.
917
      */
918
      if (thread_mask & SLAVE_SQL)
919
      {
920
        pthread_mutex_lock(&mi->rli.data_lock);
921
922
        if (thd->lex->mi.pos)
923
        {
924
          mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
925
          mi->rli.until_log_pos= thd->lex->mi.pos;
926
          /*
927
             We don't check thd->lex->mi.log_file_name for NULL here
928
             since it is checked in sql_yacc.yy
929
          */
930
          strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
931
                  sizeof(mi->rli.until_log_name)-1);
932
        }
933
        else if (thd->lex->mi.relay_log_pos)
934
        {
935
          mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
936
          mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
937
          strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
938
                  sizeof(mi->rli.until_log_name)-1);
939
        }
940
        else
941
          mi->rli.clear_until_condition();
942
943
        if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
944
        {
945
          /* Preparing members for effective until condition checking */
946
          const char *p= fn_ext(mi->rli.until_log_name);
947
          char *p_end;
948
          if (*p)
949
          {
950
            //p points to '.'
951
            mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
952
            /*
953
              p_end points to the first invalid character. If it equals
954
              to p, no digits were found, error. If it contains '\0' it
955
              means  conversion went ok.
956
            */
957
            if (p_end==p || *p_end)
958
              slave_errno=ER_BAD_SLAVE_UNTIL_COND;
959
          }
960
          else
961
            slave_errno=ER_BAD_SLAVE_UNTIL_COND;
962
963
          /* mark the cached result of the UNTIL comparison as "undefined" */
964
          mi->rli.until_log_names_cmp_result=
965
            Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
966
967
          /* Issuing warning then started without --skip-slave-start */
968
          if (!opt_skip_slave_start)
969
            push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
970
                         ER_MISSING_SKIP_SLAVE,
971
                         ER(ER_MISSING_SKIP_SLAVE));
972
        }
973
974
        pthread_mutex_unlock(&mi->rli.data_lock);
975
      }
976
      else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
977
        push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
978
                     ER(ER_UNTIL_COND_IGNORED));
979
980
      if (!slave_errno)
981
        slave_errno = start_slave_threads(0 /*no mutex */,
982
					1 /* wait for start */,
983
					mi,
984
					master_info_file,relay_log_info_file,
985
					thread_mask);
986
    }
987
    else
988
      slave_errno = ER_BAD_SLAVE;
989
  }
990
  else
991
  {
992
    /* no error if all threads are already started, only a warning */
993
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
994
                 ER(ER_SLAVE_WAS_RUNNING));
995
  }
996
997
  unlock_slave_threads(mi);
998
999
  if (slave_errno)
1000
  {
1001
    if (net_report)
1002
      my_message(slave_errno, ER(slave_errno), MYF(0));
1003
    DBUG_RETURN(1);
1004
  }
1005
  else if (net_report)
1006
    my_ok(thd);
1007
1008
  DBUG_RETURN(0);
1009
}
1010
1011
1012
int stop_slave(THD* thd, Master_info* mi, bool net_report )
1013
{
1014
  DBUG_ENTER("stop_slave");
1015
  
1016
  int slave_errno;
1017
  if (!thd)
1018
    thd = current_thd;
1019
1020
  thd_proc_info(thd, "Killing slave");
1021
  int thread_mask;
1022
  lock_slave_threads(mi);
1023
  // Get a mask of _running_ threads
1024
  init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
1025
  /*
1026
    Below we will stop all running threads.
1027
    But if the user wants to stop only one thread, do as if the other thread
1028
    was stopped (as we don't wan't to touch the other thread), so set the
1029
    bit to 0 for the other thread
1030
  */
1031
  if (thd->lex->slave_thd_opt)
1032
    thread_mask &= thd->lex->slave_thd_opt;
1033
1034
  if (thread_mask)
1035
  {
1036
    slave_errno= terminate_slave_threads(mi,thread_mask,
1037
                                         1 /*skip lock */);
1038
  }
1039
  else
1040
  {
1041
    //no error if both threads are already stopped, only a warning
1042
    slave_errno= 0;
1043
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
1044
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
1045
  }
1046
  unlock_slave_threads(mi);
1047
  thd_proc_info(thd, 0);
1048
1049
  if (slave_errno)
1050
  {
1051
    if (net_report)
1052
      my_message(slave_errno, ER(slave_errno), MYF(0));
1053
    DBUG_RETURN(1);
1054
  }
1055
  else if (net_report)
1056
    my_ok(thd);
1057
1058
  DBUG_RETURN(0);
1059
}
1060
1061
1062
/*
1063
  Remove all relay logs and start replication from the start
1064
1065
  SYNOPSIS
1066
    reset_slave()
1067
    thd			Thread handler
1068
    mi			Master info for the slave
1069
1070
  RETURN
1071
    0	ok
1072
    1	error
1073
*/
1074
1075
1076
int reset_slave(THD *thd, Master_info* mi)
1077
{
15 by brian
Fix for stat, NETWARE removal
1078
  struct stat stat_area;
1 by brian
clean slate
1079
  char fname[FN_REFLEN];
1080
  int thread_mask= 0, error= 0;
1081
  uint sql_errno=0;
1082
  const char* errmsg=0;
1083
  DBUG_ENTER("reset_slave");
1084
1085
  lock_slave_threads(mi);
1086
  init_thread_mask(&thread_mask,mi,0 /* not inverse */);
1087
  if (thread_mask) // We refuse if any slave thread is running
1088
  {
1089
    sql_errno= ER_SLAVE_MUST_STOP;
1090
    error=1;
1091
    goto err;
1092
  }
1093
1094
  ha_reset_slave(thd);
1095
1096
  // delete relay logs, clear relay log coordinates
1097
  if ((error= purge_relay_logs(&mi->rli, thd,
1098
			       1 /* just reset */,
1099
			       &errmsg)))
1100
    goto err;
1101
1102
  /* Clear master's log coordinates */
1103
  init_master_log_pos(mi);
1104
  /*
1105
     Reset errors (the idea is that we forget about the
1106
     old master).
1107
  */
1108
  mi->rli.clear_error();
1109
  mi->rli.clear_until_condition();
1110
1111
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1112
  end_master_info(mi);
1113
  // and delete these two files
1114
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
15 by brian
Fix for stat, NETWARE removal
1115
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1 by brian
clean slate
1116
  {
1117
    error=1;
1118
    goto err;
1119
  }
1120
  // delete relay_log_info_file
1121
  fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
15 by brian
Fix for stat, NETWARE removal
1122
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1 by brian
clean slate
1123
  {
1124
    error=1;
1125
    goto err;
1126
  }
1127
1128
err:
1129
  unlock_slave_threads(mi);
1130
  if (error)
1131
    my_error(sql_errno, MYF(0), errmsg);
1132
  DBUG_RETURN(error);
1133
}
1134
1135
/*
1136
1137
  Kill all Binlog_dump threads which previously talked to the same slave
1138
  ("same" means with the same server id). Indeed, if the slave stops, if the
1139
  Binlog_dump thread is waiting (pthread_cond_wait) for binlog update, then it
1140
  will keep existing until a query is written to the binlog. If the master is
1141
  idle, then this could last long, and if the slave reconnects, we could have 2
1142
  Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
1143
  binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
1144
  the master kills any existing thread with the slave's server id (if this id is
1145
  not zero; it will be true for real slaves, but false for mysqlbinlog when it
1146
  sends COM_BINLOG_DUMP to get a remote binlog dump).
1147
1148
  SYNOPSIS
1149
    kill_zombie_dump_threads()
1150
    slave_server_id     the slave's server id
1151
1152
*/
1153
1154
1155
void kill_zombie_dump_threads(uint32 slave_server_id)
1156
{
1157
  pthread_mutex_lock(&LOCK_thread_count);
1158
  I_List_iterator<THD> it(threads);
1159
  THD *tmp;
1160
1161
  while ((tmp=it++))
1162
  {
1163
    if (tmp->command == COM_BINLOG_DUMP &&
1164
       tmp->server_id == slave_server_id)
1165
    {
1166
      pthread_mutex_lock(&tmp->LOCK_delete);	// Lock from delete
1167
      break;
1168
    }
1169
  }
1170
  pthread_mutex_unlock(&LOCK_thread_count);
1171
  if (tmp)
1172
  {
1173
    /*
1174
      Here we do not call kill_one_thread() as
1175
      it will be slow because it will iterate through the list
1176
      again. We just to do kill the thread ourselves.
1177
    */
1178
    tmp->awake(THD::KILL_QUERY);
1179
    pthread_mutex_unlock(&tmp->LOCK_delete);
1180
  }
1181
}
1182
1183
1184
bool change_master(THD* thd, Master_info* mi)
1185
{
1186
  int thread_mask;
1187
  const char* errmsg= 0;
1188
  bool need_relay_log_purge= 1;
1189
  DBUG_ENTER("change_master");
1190
1191
  lock_slave_threads(mi);
1192
  init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
1193
  if (thread_mask) // We refuse if any slave thread is running
1194
  {
1195
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1196
    unlock_slave_threads(mi);
1197
    DBUG_RETURN(TRUE);
1198
  }
1199
1200
  thd_proc_info(thd, "Changing master");
1201
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1202
  // TODO: see if needs re-write
1203
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
1204
		       thread_mask))
1205
  {
1206
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1207
    unlock_slave_threads(mi);
1208
    DBUG_RETURN(TRUE);
1209
  }
1210
1211
  /*
1212
    Data lock not needed since we have already stopped the running threads,
1213
    and we have the hold on the run locks which will keep all threads that
1214
    could possibly modify the data structures from running
1215
  */
1216
1217
  /*
1218
    If the user specified host or port without binlog or position,
1219
    reset binlog's name to FIRST and position to 4.
1220
  */
1221
1222
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1223
  {
1224
    mi->master_log_name[0] = 0;
1225
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
1226
  }
1227
1228
  if (lex_mi->log_file_name)
1229
    strmake(mi->master_log_name, lex_mi->log_file_name,
1230
	    sizeof(mi->master_log_name)-1);
1231
  if (lex_mi->pos)
1232
  {
1233
    mi->master_log_pos= lex_mi->pos;
1234
  }
1235
  DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
1236
1237
  if (lex_mi->host)
1238
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1239
  if (lex_mi->user)
1240
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
1241
  if (lex_mi->password)
1242
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
1243
  if (lex_mi->port)
1244
    mi->port = lex_mi->port;
1245
  if (lex_mi->connect_retry)
1246
    mi->connect_retry = lex_mi->connect_retry;
1247
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1248
    mi->heartbeat_period = lex_mi->heartbeat_period;
1249
  else
1250
    mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
1251
                                      (slave_net_timeout/2.0));
80.1.1 by Brian Aker
LL() cleanup
1252
  mi->received_heartbeats= 0LL; // counter lives until master is CHANGEd
1 by brian
clean slate
1253
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1254
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
1255
1256
  if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1257
    mi->ssl_verify_server_cert=
1258
      (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
1259
1260
  if (lex_mi->ssl_ca)
1261
    strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
1262
  if (lex_mi->ssl_capath)
1263
    strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
1264
  if (lex_mi->ssl_cert)
1265
    strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
1266
  if (lex_mi->ssl_cipher)
1267
    strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
1268
  if (lex_mi->ssl_key)
1269
    strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
1270
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
1271
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
1272
      lex_mi->ssl_verify_server_cert )
1273
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
1274
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1275
1276
  if (lex_mi->relay_log_name)
1277
  {
1278
    need_relay_log_purge= 0;
1279
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
1280
	    sizeof(mi->rli.group_relay_log_name)-1);
1281
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
1282
	    sizeof(mi->rli.event_relay_log_name)-1);
1283
  }
1284
1285
  if (lex_mi->relay_log_pos)
1286
  {
1287
    need_relay_log_purge= 0;
1288
    mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
1289
  }
1290
1291
  /*
1292
    If user did specify neither host nor port nor any log name nor any log
1293
    pos, i.e. he specified only user/password/master_connect_retry, he probably
1294
    wants replication to resume from where it had left, i.e. from the
1295
    coordinates of the **SQL** thread (imagine the case where the I/O is ahead
1296
    of the SQL; restarting from the coordinates of the I/O would lose some
1297
    events which is probably unwanted when you are just doing minor changes
1298
    like changing master_connect_retry).
1299
    A side-effect is that if only the I/O thread was started, this thread may
1300
    restart from ''/4 after the CHANGE MASTER. That's a minor problem (it is a
1301
    much more unlikely situation than the one we are fixing here).
1302
    Note: coordinates of the SQL thread must be read here, before the
1303
    'if (need_relay_log_purge)' block which resets them.
1304
  */
1305
  if (!lex_mi->host && !lex_mi->port &&
1306
      !lex_mi->log_file_name && !lex_mi->pos &&
1307
      need_relay_log_purge)
1308
   {
1309
     /*
1310
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1311
       not initialized), so we use a max().
1312
       What happens to mi->rli.master_log_pos during the initialization stages
1313
       of replication is not 100% clear, so we guard against problems using
1314
       max().
1315
      */
1316
     mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
1317
			      mi->rli.group_master_log_pos);
1318
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
1319
             sizeof(mi->master_log_name)-1);
1320
  }
1321
  /*
1322
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1323
    a slave before).
1324
  */
1325
  if (flush_master_info(mi, 0))
1326
  {
1327
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1328
    unlock_slave_threads(mi);
1329
    DBUG_RETURN(TRUE);
1330
  }
1331
  if (need_relay_log_purge)
1332
  {
1333
    relay_log_purge= 1;
1334
    thd_proc_info(thd, "Purging old relay logs");
1335
    if (purge_relay_logs(&mi->rli, thd,
1336
			 0 /* not only reset, but also reinit */,
1337
			 &errmsg))
1338
    {
1339
      my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
1340
      unlock_slave_threads(mi);
1341
      DBUG_RETURN(TRUE);
1342
    }
1343
  }
1344
  else
1345
  {
1346
    const char* msg;
1347
    relay_log_purge= 0;
1348
    /* Relay log is already initialized */
1349
    if (init_relay_log_pos(&mi->rli,
1350
			   mi->rli.group_relay_log_name,
1351
			   mi->rli.group_relay_log_pos,
1352
			   0 /*no data lock*/,
1353
			   &msg, 0))
1354
    {
1355
      my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
1356
      unlock_slave_threads(mi);
1357
      DBUG_RETURN(TRUE);
1358
    }
1359
  }
1360
  /*
1361
    Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block,
1362
    so restore them to good values. If we left them to ''/0, that would work;
1363
    but that would fail in the case of 2 successive CHANGE MASTER (without a
1364
    START SLAVE in between): because first one would set the coords in mi to
1365
    the good values of those in rli, the set those in rli to ''/0, then
1366
    second CHANGE MASTER would set the coords in mi to those of rli, i.e. to
1367
    ''/0: we have lost all copies of the original good coordinates.
1368
    That's why we always save good coords in rli.
1369
  */
1370
  mi->rli.group_master_log_pos= mi->master_log_pos;
1371
  DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
1372
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
1373
	  sizeof(mi->rli.group_master_log_name)-1);
1374
1375
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
1376
    mi->rli.group_master_log_pos=0;
1377
1378
  pthread_mutex_lock(&mi->rli.data_lock);
1379
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1380
  /* Clear the errors, for a clean start */
1381
  mi->rli.clear_error();
1382
  mi->rli.clear_until_condition();
1383
  /*
1384
    If we don't write new coordinates to disk now, then old will remain in
1385
    relay-log.info until START SLAVE is issued; but if mysqld is shutdown
1386
    before START SLAVE, then old will remain in relay-log.info, and will be the
1387
    in-memory value at restart (thus causing errors, as the old relay log does
1388
    not exist anymore).
1389
  */
1390
  flush_relay_log_info(&mi->rli);
1391
  pthread_cond_broadcast(&mi->data_cond);
1392
  pthread_mutex_unlock(&mi->rli.data_lock);
1393
1394
  unlock_slave_threads(mi);
1395
  thd_proc_info(thd, 0);
1396
  my_ok(thd);
1397
  DBUG_RETURN(FALSE);
1398
}
1399
1400
int reset_master(THD* thd)
1401
{
1402
  if (!mysql_bin_log.is_open())
1403
  {
1404
    my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
1405
               ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
1406
    return 1;
1407
  }
1408
  return mysql_bin_log.reset_logs(thd);
1409
}
1410
1411
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
1412
		   const char* log_file_name2, ulonglong log_pos2)
1413
{
1414
  int res;
1415
  uint log_file_name1_len=  strlen(log_file_name1);
1416
  uint log_file_name2_len=  strlen(log_file_name2);
1417
1418
  //  We assume that both log names match up to '.'
1419
  if (log_file_name1_len == log_file_name2_len)
1420
  {
1421
    if ((res= strcmp(log_file_name1, log_file_name2)))
1422
      return res;
1423
    return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
1424
  }
1425
  return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
1426
}
1427
1428
1429
bool mysql_show_binlog_events(THD* thd)
1430
{
1431
  Protocol *protocol= thd->protocol;
1432
  List<Item> field_list;
1433
  const char *errmsg = 0;
1434
  bool ret = TRUE;
1435
  IO_CACHE log;
1436
  File file = -1;
1437
  DBUG_ENTER("mysql_show_binlog_events");
1438
1439
  Log_event::init_show_field_list(&field_list);
1440
  if (protocol->send_fields(&field_list,
1441
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1442
    DBUG_RETURN(TRUE);
1443
1444
  Format_description_log_event *description_event= new
1445
    Format_description_log_event(3); /* MySQL 4.0 by default */
1446
1447
  /*
1448
    Wait for handlers to insert any pending information
1449
    into the binlog.  For e.g. ndb which updates the binlog asynchronously
1450
    this is needed so that the uses sees all its own commands in the binlog
1451
  */
1452
  ha_binlog_wait(thd);
1453
1454
  if (mysql_bin_log.is_open())
1455
  {
1456
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1457
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
1458
    ha_rows event_count, limit_start, limit_end;
1459
    my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1460
    char search_file_name[FN_REFLEN], *name;
1461
    const char *log_file_name = lex_mi->log_file_name;
1462
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1463
    LOG_INFO linfo;
1464
    Log_event* ev;
1465
1466
    unit->set_limit(thd->lex->current_select);
1467
    limit_start= unit->offset_limit_cnt;
1468
    limit_end= unit->select_limit_cnt;
1469
1470
    name= search_file_name;
1471
    if (log_file_name)
1472
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
1473
    else
1474
      name=0;					// Find first log
1475
1476
    linfo.index_file_offset = 0;
1477
1478
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1479
    {
1480
      errmsg = "Could not find target log";
1481
      goto err;
1482
    }
1483
1484
    pthread_mutex_lock(&LOCK_thread_count);
1485
    thd->current_linfo = &linfo;
1486
    pthread_mutex_unlock(&LOCK_thread_count);
1487
1488
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
1489
      goto err;
1490
1491
    /*
1492
      to account binlog event header size
1493
    */
1494
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
1495
1496
    pthread_mutex_lock(log_lock);
1497
1498
    /*
1499
      open_binlog() sought to position 4.
1500
      Read the first event in case it's a Format_description_log_event, to
1501
      know the format. If there's no such event, we are 3.23 or 4.x. This
1502
      code, like before, can't read 3.23 binlogs.
1503
      This code will fail on a mixed relay log (one which has Format_desc then
1504
      Rotate then Format_desc).
1505
    */
1506
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
1507
    if (ev)
1508
    {
1509
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
1510
      {
1511
        delete description_event;
1512
        description_event= (Format_description_log_event*) ev;
1513
      }
1514
      else
1515
        delete ev;
1516
    }
1517
1518
    my_b_seek(&log, pos);
1519
1520
    if (!description_event->is_valid())
1521
    {
1522
      errmsg="Invalid Format_description event; could be out of memory";
1523
      goto err;
1524
    }
1525
1526
    for (event_count = 0;
1527
	 (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
1528
                                         description_event)); )
1529
    {
1530
      if (event_count >= limit_start &&
1531
	  ev->net_send(protocol, linfo.log_file_name, pos))
1532
      {
1533
	errmsg = "Net error";
1534
	delete ev;
1535
	pthread_mutex_unlock(log_lock);
1536
	goto err;
1537
      }
1538
1539
      pos = my_b_tell(&log);
1540
      delete ev;
1541
1542
      if (++event_count >= limit_end)
1543
	break;
1544
    }
1545
1546
    if (event_count < limit_end && log.error)
1547
    {
1548
      errmsg = "Wrong offset or I/O error";
1549
      pthread_mutex_unlock(log_lock);
1550
      goto err;
1551
    }
1552
1553
    pthread_mutex_unlock(log_lock);
1554
  }
1555
1556
  ret= FALSE;
1557
1558
err:
1559
  delete description_event;
1560
  if (file >= 0)
1561
  {
1562
    end_io_cache(&log);
1563
    (void) my_close(file, MYF(MY_WME));
1564
  }
1565
1566
  if (errmsg)
1567
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
1568
             "SHOW BINLOG EVENTS", errmsg);
1569
  else
1570
    my_eof(thd);
1571
1572
  pthread_mutex_lock(&LOCK_thread_count);
1573
  thd->current_linfo = 0;
1574
  pthread_mutex_unlock(&LOCK_thread_count);
1575
  DBUG_RETURN(ret);
1576
}
1577
1578
1579
bool show_binlog_info(THD* thd)
1580
{
1581
  Protocol *protocol= thd->protocol;
1582
  DBUG_ENTER("show_binlog_info");
1583
  List<Item> field_list;
1584
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1585
  field_list.push_back(new Item_return_int("Position",20,
1586
					   MYSQL_TYPE_LONGLONG));
1587
  field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
1588
  field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
1589
1590
  if (protocol->send_fields(&field_list,
1591
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1592
    DBUG_RETURN(TRUE);
1593
  protocol->prepare_for_resend();
1594
1595
  if (mysql_bin_log.is_open())
1596
  {
1597
    LOG_INFO li;
1598
    mysql_bin_log.get_current_log(&li);
1599
    int dir_len = dirname_length(li.log_file_name);
1600
    protocol->store(li.log_file_name + dir_len, &my_charset_bin);
1601
    protocol->store((ulonglong) li.pos);
1602
    protocol->store(binlog_filter->get_do_db());
1603
    protocol->store(binlog_filter->get_ignore_db());
1604
    if (protocol->write())
1605
      DBUG_RETURN(TRUE);
1606
  }
1607
  my_eof(thd);
1608
  DBUG_RETURN(FALSE);
1609
}
1610
1611
1612
/*
1613
  Send a list of all binary logs to client
1614
1615
  SYNOPSIS
1616
    show_binlogs()
1617
    thd		Thread specific variable
1618
1619
  RETURN VALUES
1620
    FALSE OK
1621
    TRUE  error
1622
*/
1623
1624
bool show_binlogs(THD* thd)
1625
{
1626
  IO_CACHE *index_file;
1627
  LOG_INFO cur;
1628
  File file;
1629
  char fname[FN_REFLEN];
1630
  List<Item> field_list;
1631
  uint length;
1632
  int cur_dir_len;
1633
  Protocol *protocol= thd->protocol;
1634
  DBUG_ENTER("show_binlogs");
1635
1636
  if (!mysql_bin_log.is_open())
1637
  {
1638
    my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0));
1639
    return 1;
1640
  }
1641
1642
  field_list.push_back(new Item_empty_string("Log_name", 255));
1643
  field_list.push_back(new Item_return_int("File_size", 20,
1644
                                           MYSQL_TYPE_LONGLONG));
1645
  if (protocol->send_fields(&field_list,
1646
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1647
    DBUG_RETURN(TRUE);
1648
  
1649
  pthread_mutex_lock(mysql_bin_log.get_log_lock());
1650
  mysql_bin_log.lock_index();
1651
  index_file=mysql_bin_log.get_index_file();
1652
  
1653
  mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
1654
  pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
1655
  
1656
  cur_dir_len= dirname_length(cur.log_file_name);
1657
1658
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
1659
1660
  /* The file ends with EOF or empty line */
1661
  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
1662
  {
1663
    int dir_len;
1664
    ulonglong file_length= 0;                   // Length if open fails
1665
    fname[--length] = '\0';                     // remove the newline
1666
1667
    protocol->prepare_for_resend();
1668
    dir_len= dirname_length(fname);
1669
    length-= dir_len;
1670
    protocol->store(fname + dir_len, length, &my_charset_bin);
1671
1672
    if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
1673
      file_length= cur.pos;  /* The active log, use the active position */
1674
    else
1675
    {
1676
      /* this is an old log, open it and find the size */
1677
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
1678
                         MYF(0))) >= 0)
1679
      {
1680
        file_length= (ulonglong) my_seek(file, 0L, MY_SEEK_END, MYF(0));
1681
        my_close(file, MYF(0));
1682
      }
1683
    }
1684
    protocol->store(file_length);
1685
    if (protocol->write())
1686
      goto err;
1687
  }
1688
  mysql_bin_log.unlock_index();
1689
  my_eof(thd);
1690
  DBUG_RETURN(FALSE);
1691
1692
err:
1693
  mysql_bin_log.unlock_index();
1694
  DBUG_RETURN(TRUE);
1695
}
1696
1697
/**
1698
   Load data's io cache specific hook to be executed
1699
   before a chunk of data is being read into the cache's buffer
1700
   The fuction instantianates and writes into the binlog
1701
   replication events along LOAD DATA processing.
1702
   
1703
   @param file  pointer to io-cache
1704
   @return 0
1705
*/
1706
int log_loaded_block(IO_CACHE* file)
1707
{
1708
  DBUG_ENTER("log_loaded_block");
1709
  LOAD_FILE_INFO *lf_info;
1710
  uint block_len;
1711
  /* buffer contains position where we started last read */
1712
  uchar* buffer= (uchar*) my_b_get_buffer_start(file);
1713
  uint max_event_size= current_thd->variables.max_allowed_packet;
1714
  lf_info= (LOAD_FILE_INFO*) file->arg;
1715
  if (lf_info->thd->current_stmt_binlog_row_based)
1716
    DBUG_RETURN(0);
1717
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1718
      lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1719
    DBUG_RETURN(0);
1720
  
1721
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1722
       buffer += min(block_len, max_event_size),
1723
       block_len -= min(block_len, max_event_size))
1724
  {
1725
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1726
    if (lf_info->wrote_create_file)
1727
    {
1728
      Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
1729
                               min(block_len, max_event_size),
1730
                               lf_info->log_delayed);
1731
      mysql_bin_log.write(&a);
1732
    }
1733
    else
1734
    {
1735
      Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1736
                                   buffer,
1737
                                   min(block_len, max_event_size),
1738
                                   lf_info->log_delayed);
1739
      mysql_bin_log.write(&b);
1740
      lf_info->wrote_create_file= 1;
1741
      DBUG_SYNC_POINT("debug_lock.created_file_event",10);
1742
    }
1743
  }
1744
  DBUG_RETURN(0);
1745
}
1746
1747
/*
1748
  Replication System Variables
1749
*/
1750
1751
class sys_var_slave_skip_counter :public sys_var
1752
{
1753
public:
1754
  sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
1755
    :sys_var(name_arg)
1756
  { chain_sys_var(chain); }
1757
  bool check(THD *thd, set_var *var);
1758
  bool update(THD *thd, set_var *var);
1759
  bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
1760
  /*
1761
    We can't retrieve the value of this, so we don't have to define
1762
    type() or value_ptr()
1763
  */
1764
};
1765
1766
class sys_var_sync_binlog_period :public sys_var_long_ptr
1767
{
1768
public:
1769
  sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg, 
1770
                             ulong *value_ptr)
1771
    :sys_var_long_ptr(chain, name_arg,value_ptr) {}
1772
  bool update(THD *thd, set_var *var);
1773
};
1774
1775
static void fix_slave_net_timeout(THD *thd, enum_var_type type)
1776
{
1777
  DBUG_ENTER("fix_slave_net_timeout");
1778
#ifdef HAVE_REPLICATION
1779
  pthread_mutex_lock(&LOCK_active_mi);
1780
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1781
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1782
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1783
                        "The currect value for master_heartbeat_period"
1784
                        " exceeds the new value of `slave_net_timeout' sec."
1785
                        " A sensible value for the period should be"
1786
                        " less than the timeout.");
1787
  pthread_mutex_unlock(&LOCK_active_mi);
1788
#endif
1789
  DBUG_VOID_RETURN;
1790
}
1791
1792
static sys_var_chain vars = { NULL, NULL };
1793
1794
static sys_var_bool_ptr	sys_relay_log_purge(&vars, "relay_log_purge",
1795
					    &relay_log_purge);
1796
static sys_var_long_ptr	sys_slave_net_timeout(&vars, "slave_net_timeout",
1797
					      &slave_net_timeout,
1798
                                              fix_slave_net_timeout);
1799
static sys_var_long_ptr	sys_slave_trans_retries(&vars, "slave_transaction_retries",
1800
						&slave_trans_retries);
1801
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
1802
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
1803
1804
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
1805
1806
1807
static SHOW_VAR fixed_vars[]= {
1808
  {"log_slave_updates",       (char*) &opt_log_slave_updates,       SHOW_MY_BOOL},
1809
  {"relay_log" , (char*) &opt_relay_logname, SHOW_CHAR_PTR},
1810
  {"relay_log_index", (char*) &opt_relaylog_index_name, SHOW_CHAR_PTR},
1811
  {"relay_log_info_file", (char*) &relay_log_info_file, SHOW_CHAR_PTR},
1812
  {"relay_log_space_limit",   (char*) &relay_log_space_limit,       SHOW_LONGLONG},
1813
  {"slave_load_tmpdir",       (char*) &slave_load_tmpdir,           SHOW_CHAR_PTR},
1814
  {"slave_skip_errors",       (char*) &show_slave_skip_errors,      SHOW_FUNC},
1815
};
1816
1817
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff)
1818
{
1819
  var->type=SHOW_CHAR;
1820
  var->value= buff;
1821
  if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
1822
  {
1823
    var->value= const_cast<char *>("OFF");
1824
  }
1825
  else if (bitmap_is_set_all(&slave_error_mask))
1826
  {
1827
    var->value= const_cast<char *>("ALL");
1828
  }
1829
  else
1830
  {
1831
    /* 10 is enough assuming errors are max 4 digits */
1832
    int i;
1833
    var->value= buff;
1834
    for (i= 1;
1835
         i < MAX_SLAVE_ERROR &&
1836
         (buff - var->value) < SHOW_VAR_FUNC_BUFF_SIZE;
1837
         i++)
1838
    {
1839
      if (bitmap_is_set(&slave_error_mask, i))
1840
      {
1841
        buff= int10_to_str(i, buff, 10);
1842
        *buff++= ',';
1843
      }
1844
    }
1845
    if (var->value != buff)
1846
      buff--;				// Remove last ','
1847
    if (i < MAX_SLAVE_ERROR)
1848
      buff= strmov(buff, "...");  // Couldn't show all errors
1849
    *buff=0;
1850
  }
1851
  return 0;
1852
}
1853
1854
bool sys_var_slave_skip_counter::check(THD *thd, set_var *var)
1855
{
1856
  int result= 0;
1857
  pthread_mutex_lock(&LOCK_active_mi);
1858
  pthread_mutex_lock(&active_mi->rli.run_lock);
1859
  if (active_mi->rli.slave_running)
1860
  {
1861
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1862
    result=1;
1863
  }
1864
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1865
  pthread_mutex_unlock(&LOCK_active_mi);
1866
  var->save_result.ulong_value= (ulong) var->value->val_int();
1867
  return result;
1868
}
1869
1870
1871
bool sys_var_slave_skip_counter::update(THD *thd, set_var *var)
1872
{
1873
  pthread_mutex_lock(&LOCK_active_mi);
1874
  pthread_mutex_lock(&active_mi->rli.run_lock);
1875
  /*
1876
    The following test should normally never be true as we test this
1877
    in the check function;  To be safe against multiple
1878
    SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
1879
  */
1880
  if (!active_mi->rli.slave_running)
1881
  {
1882
    pthread_mutex_lock(&active_mi->rli.data_lock);
1883
    active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
1884
    pthread_mutex_unlock(&active_mi->rli.data_lock);
1885
  }
1886
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1887
  pthread_mutex_unlock(&LOCK_active_mi);
1888
  return 0;
1889
}
1890
1891
1892
bool sys_var_sync_binlog_period::update(THD *thd, set_var *var)
1893
{
1894
  sync_binlog_period= (ulong) var->save_result.ulonglong_value;
1895
  return 0;
1896
}
1897
1898
int init_replication_sys_vars()
1899
{
1900
  mysql_append_static_vars(fixed_vars, sizeof(fixed_vars) / sizeof(SHOW_VAR));
1901
1902
  if (mysql_add_sys_var_chain(vars.first, my_long_options))
1903
  {
1904
    /* should not happen */
1905
    fprintf(stderr, "failed to initialize replication system variables");
1906
    unireg_abort(1);
1907
  }
1908
  return 0;
1909
}
1910
1911
#endif /* HAVE_REPLICATION */
1912
1913