~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to sql/sql_repl.cc

  • Committer: brian
  • Date: 2008-06-25 05:29:13 UTC
  • Revision ID: brian@localhost.localdomain-20080625052913-6upwo0jsrl4lnapl
clean slate

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2006 MySQL AB & Sasha
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include "mysql_priv.h"
 
17
#ifdef HAVE_REPLICATION
 
18
 
 
19
#include "rpl_mi.h"
 
20
#include "sql_repl.h"
 
21
#include "log_event.h"
 
22
#include "rpl_filter.h"
 
23
#include <my_dir.h>
 
24
 
 
25
int max_binlog_dump_events = 0; // unlimited
 
26
my_bool opt_sporadic_binlog_dump_fail = 0;
 
27
#ifndef DBUG_OFF
 
28
static int binlog_dump_count = 0;
 
29
#endif
 
30
 
 
31
/*
 
32
    fake_rotate_event() builds a fake (=which does not exist physically in any
 
33
    binlog) Rotate event, which contains the name of the binlog we are going to
 
34
    send to the slave (because the slave may not know it if it just asked for
 
35
    MASTER_LOG_FILE='', MASTER_LOG_POS=4).
 
36
    < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
 
37
    After this version we always call it, so that a 3.23.58 slave can rely on
 
38
    it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
 
39
    zeros in the good positions which, by chance, make it possible for the 3.23
 
40
    slave to detect that this event is unexpected) (this is luck which happens
 
41
    because the master and slave disagree on the size of the header of
 
42
    Log_event).
 
43
 
 
44
    Relying on the event length of the Rotate event instead of these
 
45
    well-placed zeros was not possible as Rotate events have a variable-length
 
46
    part.
 
47
*/
 
48
 
 
49
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
 
50
                             ulonglong position, const char** errmsg)
 
51
{
 
52
  DBUG_ENTER("fake_rotate_event");
 
53
  char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
 
54
  /*
 
55
    'when' (the timestamp) is set to 0 so that slave could distinguish between
 
56
    real and fake Rotate events (if necessary)
 
57
  */
 
58
  memset(header, 0, 4);
 
59
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
 
60
 
 
61
  char* p = log_file_name+dirname_length(log_file_name);
 
62
  uint ident_len = (uint) strlen(p);
 
63
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
 
64
  int4store(header + SERVER_ID_OFFSET, server_id);
 
65
  int4store(header + EVENT_LEN_OFFSET, event_len);
 
66
  int2store(header + FLAGS_OFFSET, 0);
 
67
 
 
68
  // TODO: check what problems this may cause and fix them
 
69
  int4store(header + LOG_POS_OFFSET, 0);
 
70
 
 
71
  packet->append(header, sizeof(header));
 
72
  int8store(buf+R_POS_OFFSET,position);
 
73
  packet->append(buf, ROTATE_HEADER_LEN);
 
74
  packet->append(p,ident_len);
 
75
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
 
76
  {
 
77
    *errmsg = "failed on my_net_write()";
 
78
    DBUG_RETURN(-1);
 
79
  }
 
80
  DBUG_RETURN(0);
 
81
}
 
82
 
 
83
static int send_file(THD *thd)
 
84
{
 
85
  NET* net = &thd->net;
 
86
  int fd = -1, error = 1;
 
87
  size_t bytes;
 
88
  char fname[FN_REFLEN+1];
 
89
  const char *errmsg = 0;
 
90
  int old_timeout;
 
91
  unsigned long packet_len;
 
92
  uchar buf[IO_SIZE];                           // It's safe to alloc this
 
93
  DBUG_ENTER("send_file");
 
94
 
 
95
  /*
 
96
    The client might be slow loading the data, give him wait_timeout to do
 
97
    the job
 
98
  */
 
99
  old_timeout= net->read_timeout;
 
100
  my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
 
101
 
 
102
  /*
 
103
    We need net_flush here because the client will not know it needs to send
 
104
    us the file name until it has processed the load event entry
 
105
  */
 
106
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
 
107
  {
 
108
    errmsg = "while reading file name";
 
109
    goto err;
 
110
  }
 
111
 
 
112
  // terminate with \0 for fn_format
 
113
  *((char*)net->read_pos +  packet_len) = 0;
 
114
  fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
 
115
  // this is needed to make replicate-ignore-db
 
116
  if (!strcmp(fname,"/dev/null"))
 
117
    goto end;
 
118
 
 
119
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
 
120
  {
 
121
    errmsg = "on open of file";
 
122
    goto err;
 
123
  }
 
124
 
 
125
  while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0)
 
126
  {
 
127
    if (my_net_write(net, buf, bytes))
 
128
    {
 
129
      errmsg = "while writing data to client";
 
130
      goto err;
 
131
    }
 
132
  }
 
133
 
 
134
 end:
 
135
  if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
 
136
      (my_net_read(net) == packet_error))
 
137
  {
 
138
    errmsg = "while negotiating file transfer close";
 
139
    goto err;
 
140
  }
 
141
  error = 0;
 
142
 
 
143
 err:
 
144
  my_net_set_read_timeout(net, old_timeout);
 
145
  if (fd >= 0)
 
146
    (void) my_close(fd, MYF(0));
 
147
  if (errmsg)
 
148
  {
 
149
    sql_print_error("Failed in send_file() %s", errmsg);
 
150
    DBUG_PRINT("error", (errmsg));
 
151
  }
 
152
  DBUG_RETURN(error);
 
153
}
 
154
 
 
155
 
 
156
/*
 
157
  Adjust the position pointer in the binary log file for all running slaves
 
158
 
 
159
  SYNOPSIS
 
160
    adjust_linfo_offsets()
 
161
    purge_offset        Number of bytes removed from start of log index file
 
162
 
 
163
  NOTES
 
164
    - This is called when doing a PURGE when we delete lines from the
 
165
      index log file
 
166
 
 
167
  REQUIREMENTS
 
168
    - Before calling this function, we have to ensure that no threads are
 
169
      using any binary log file before purge_offset.a
 
170
 
 
171
  TODO
 
172
    - Inform the slave threads that they should sync the position
 
173
      in the binary log file with flush_relay_log_info.
 
174
      Now they sync is done for next read.
 
175
*/
 
176
 
 
177
void adjust_linfo_offsets(my_off_t purge_offset)
 
178
{
 
179
  THD *tmp;
 
180
 
 
181
  pthread_mutex_lock(&LOCK_thread_count);
 
182
  I_List_iterator<THD> it(threads);
 
183
 
 
184
  while ((tmp=it++))
 
185
  {
 
186
    LOG_INFO* linfo;
 
187
    if ((linfo = tmp->current_linfo))
 
188
    {
 
189
      pthread_mutex_lock(&linfo->lock);
 
190
      /*
 
191
        Index file offset can be less that purge offset only if
 
192
        we just started reading the index file. In that case
 
193
        we have nothing to adjust
 
194
      */
 
195
      if (linfo->index_file_offset < purge_offset)
 
196
        linfo->fatal = (linfo->index_file_offset != 0);
 
197
      else
 
198
        linfo->index_file_offset -= purge_offset;
 
199
      pthread_mutex_unlock(&linfo->lock);
 
200
    }
 
201
  }
 
202
  pthread_mutex_unlock(&LOCK_thread_count);
 
203
}
 
204
 
 
205
 
 
206
bool log_in_use(const char* log_name)
 
207
{
 
208
  int log_name_len = strlen(log_name) + 1;
 
209
  THD *tmp;
 
210
  bool result = 0;
 
211
 
 
212
  pthread_mutex_lock(&LOCK_thread_count);
 
213
  I_List_iterator<THD> it(threads);
 
214
 
 
215
  while ((tmp=it++))
 
216
  {
 
217
    LOG_INFO* linfo;
 
218
    if ((linfo = tmp->current_linfo))
 
219
    {
 
220
      pthread_mutex_lock(&linfo->lock);
 
221
      result = !bcmp((uchar*) log_name, (uchar*) linfo->log_file_name,
 
222
                     log_name_len);
 
223
      pthread_mutex_unlock(&linfo->lock);
 
224
      if (result)
 
225
        break;
 
226
    }
 
227
  }
 
228
 
 
229
  pthread_mutex_unlock(&LOCK_thread_count);
 
230
  return result;
 
231
}
 
232
 
 
233
bool purge_error_message(THD* thd, int res)
 
234
{
 
235
  uint errmsg= 0;
 
236
 
 
237
  switch (res)  {
 
238
  case 0: break;
 
239
  case LOG_INFO_EOF:    errmsg= ER_UNKNOWN_TARGET_BINLOG; break;
 
240
  case LOG_INFO_IO:     errmsg= ER_IO_ERR_LOG_INDEX_READ; break;
 
241
  case LOG_INFO_INVALID:errmsg= ER_BINLOG_PURGE_PROHIBITED; break;
 
242
  case LOG_INFO_SEEK:   errmsg= ER_FSEEK_FAIL; break;
 
243
  case LOG_INFO_MEM:    errmsg= ER_OUT_OF_RESOURCES; break;
 
244
  case LOG_INFO_FATAL:  errmsg= ER_BINLOG_PURGE_FATAL_ERR; break;
 
245
  case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break;
 
246
  case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break;
 
247
  default:              errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break;
 
248
  }
 
249
 
 
250
  if (errmsg)
 
251
  {
 
252
    my_message(errmsg, ER(errmsg), MYF(0));
 
253
    return TRUE;
 
254
  }
 
255
  my_ok(thd);
 
256
  return FALSE;
 
257
}
 
258
 
 
259
 
 
260
bool purge_master_logs(THD* thd, const char* to_log)
 
261
{
 
262
  char search_file_name[FN_REFLEN];
 
263
  if (!mysql_bin_log.is_open())
 
264
  {
 
265
    my_ok(thd);
 
266
    return FALSE;
 
267
  }
 
268
 
 
269
  mysql_bin_log.make_log_name(search_file_name, to_log);
 
270
  return purge_error_message(thd,
 
271
                             mysql_bin_log.purge_logs(search_file_name, 0, 1,
 
272
                                                      1, NULL));
 
273
}
 
274
 
 
275
 
 
276
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
 
277
{
 
278
  if (!mysql_bin_log.is_open())
 
279
  {
 
280
    my_ok(thd);
 
281
    return 0;
 
282
  }
 
283
  return purge_error_message(thd,
 
284
                             mysql_bin_log.purge_logs_before_date(purge_time));
 
285
}
 
286
 
 
287
int test_for_non_eof_log_read_errors(int error, const char **errmsg)
 
288
{
 
289
  if (error == LOG_READ_EOF)
 
290
    return 0;
 
291
  my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
292
  switch (error) {
 
293
  case LOG_READ_BOGUS:
 
294
    *errmsg = "bogus data in log event";
 
295
    break;
 
296
  case LOG_READ_TOO_LARGE:
 
297
    *errmsg = "log event entry exceeded max_allowed_packet; \
 
298
Increase max_allowed_packet on master";
 
299
    break;
 
300
  case LOG_READ_IO:
 
301
    *errmsg = "I/O error reading log event";
 
302
    break;
 
303
  case LOG_READ_MEM:
 
304
    *errmsg = "memory allocation failed reading log event";
 
305
    break;
 
306
  case LOG_READ_TRUNC:
 
307
    *errmsg = "binlog truncated in the middle of event";
 
308
    break;
 
309
  default:
 
310
    *errmsg = "unknown error reading log event on the master";
 
311
    break;
 
312
  }
 
313
  return error;
 
314
}
 
315
 
 
316
 
 
317
/**
 
318
  An auxiliary function for calling in mysql_binlog_send
 
319
  to initialize the heartbeat timeout in waiting for a binlogged event.
 
320
 
 
321
  @param[in]    thd  THD to access a user variable
 
322
 
 
323
  @return        heartbeat period an ulonglong of nanoseconds
 
324
                 or zero if heartbeat was not demanded by slave
 
325
*/ 
 
326
static ulonglong get_heartbeat_period(THD * thd)
 
327
{
 
328
  my_bool null_value;
 
329
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
 
330
  user_var_entry *entry= 
 
331
    (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
 
332
                                  name.length);
 
333
  return entry? entry->val_int(&null_value) : 0;
 
334
}
 
335
 
 
336
/*
 
337
  Function prepares and sends repliation heartbeat event.
 
338
 
 
339
  @param net                net object of THD
 
340
  @param packet             buffer to store the heartbeat instance
 
341
  @param event_coordinates  binlog file name and position of the last
 
342
                            real event master sent from binlog
 
343
 
 
344
  @note 
 
345
    Among three essential pieces of heartbeat data Log_event::when
 
346
    is computed locally.
 
347
    The  error to send is serious and should force terminating
 
348
    the dump thread.
 
349
*/
 
350
static int send_heartbeat_event(NET* net, String* packet,
 
351
                                const struct event_coordinates *coord)
 
352
{
 
353
  DBUG_ENTER("send_heartbeat_event");
 
354
  char header[LOG_EVENT_HEADER_LEN];
 
355
  /*
 
356
    'when' (the timestamp) is set to 0 so that slave could distinguish between
 
357
    real and fake Rotate events (if necessary)
 
358
  */
 
359
  memset(header, 0, 4);  // when
 
360
 
 
361
  header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
 
362
 
 
363
  char* p= coord->file_name + dirname_length(coord->file_name);
 
364
 
 
365
  uint ident_len = strlen(p);
 
366
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
 
367
  int4store(header + SERVER_ID_OFFSET, server_id);
 
368
  int4store(header + EVENT_LEN_OFFSET, event_len);
 
369
  int2store(header + FLAGS_OFFSET, 0);
 
370
 
 
371
  int4store(header + LOG_POS_OFFSET, coord->pos);  // log_pos
 
372
 
 
373
  packet->append(header, sizeof(header));
 
374
  packet->append(p, ident_len);             // log_file_name
 
375
 
 
376
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
 
377
      net_flush(net))
 
378
  {
 
379
    DBUG_RETURN(-1);
 
380
  }
 
381
  packet->set("\0", 1, &my_charset_bin);
 
382
  DBUG_RETURN(0);
 
383
}
 
384
 
 
385
/*
 
386
  TODO: Clean up loop to only have one call to send_file()
 
387
*/
 
388
 
 
389
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
 
390
                       ushort flags)
 
391
{
 
392
  LOG_INFO linfo;
 
393
  char *log_file_name = linfo.log_file_name;
 
394
  char search_file_name[FN_REFLEN], *name;
 
395
  IO_CACHE log;
 
396
  File file = -1;
 
397
  String* packet = &thd->packet;
 
398
  int error;
 
399
  const char *errmsg = "Unknown error";
 
400
  NET* net = &thd->net;
 
401
  pthread_mutex_t *log_lock;
 
402
  bool binlog_can_be_corrupted= FALSE;
 
403
#ifndef DBUG_OFF
 
404
  int left_events = max_binlog_dump_events;
 
405
#endif
 
406
  DBUG_ENTER("mysql_binlog_send");
 
407
  DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));
 
408
 
 
409
  bzero((char*) &log,sizeof(log));
 
410
  /* 
 
411
     heartbeat_period from @master_heartbeat_period user variable
 
412
  */
 
413
  ulonglong heartbeat_period= get_heartbeat_period(thd);
 
414
  struct timespec heartbeat_buf;
 
415
  struct event_coordinates coord_buf;
 
416
  struct timespec *heartbeat_ts= NULL;
 
417
  struct event_coordinates *coord= NULL;
 
418
  if (heartbeat_period != LL(0))
 
419
  {
 
420
    heartbeat_ts= &heartbeat_buf;
 
421
    set_timespec_nsec(*heartbeat_ts, 0);
 
422
    coord= &coord_buf;
 
423
    coord->file_name= log_file_name; // initialization basing on what slave remembers
 
424
    coord->pos= pos;
 
425
  }
 
426
#ifndef DBUG_OFF
 
427
  if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
 
428
  {
 
429
    errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover";
 
430
    my_errno= ER_UNKNOWN_ERROR;
 
431
    goto err;
 
432
  }
 
433
#endif
 
434
 
 
435
  if (!mysql_bin_log.is_open())
 
436
  {
 
437
    errmsg = "Binary log is not open";
 
438
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
439
    goto err;
 
440
  }
 
441
  if (!server_id_supplied)
 
442
  {
 
443
    errmsg = "Misconfigured master - server id was not set";
 
444
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
445
    goto err;
 
446
  }
 
447
 
 
448
  name=search_file_name;
 
449
  if (log_ident[0])
 
450
    mysql_bin_log.make_log_name(search_file_name, log_ident);
 
451
  else
 
452
    name=0;                                     // Find first log
 
453
 
 
454
  linfo.index_file_offset = 0;
 
455
 
 
456
  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
 
457
  {
 
458
    errmsg = "Could not find first log file name in binary log index file";
 
459
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
460
    goto err;
 
461
  }
 
462
 
 
463
  pthread_mutex_lock(&LOCK_thread_count);
 
464
  thd->current_linfo = &linfo;
 
465
  pthread_mutex_unlock(&LOCK_thread_count);
 
466
 
 
467
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
 
468
  {
 
469
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
470
    goto err;
 
471
  }
 
472
  if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
 
473
  {
 
474
    errmsg= "Client requested master to start replication from \
 
475
impossible position";
 
476
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
477
    goto err;
 
478
  }
 
479
 
 
480
  /*
 
481
    We need to start a packet with something other than 255
 
482
    to distinguish it from error
 
483
  */
 
484
  packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
 
485
 
 
486
  /*
 
487
    Tell the client about the log name with a fake Rotate event;
 
488
    this is needed even if we also send a Format_description_log_event
 
489
    just after, because that event does not contain the binlog's name.
 
490
    Note that as this Rotate event is sent before
 
491
    Format_description_log_event, the slave cannot have any info to
 
492
    understand this event's format, so the header len of
 
493
    Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
 
494
    than other events except FORMAT_DESCRIPTION_EVENT).
 
495
    Before 4.0.14 we called fake_rotate_event below only if (pos ==
 
496
    BIN_LOG_HEADER_SIZE), because if this is false then the slave
 
497
    already knows the binlog's name.
 
498
    Since, we always call fake_rotate_event; if the slave already knew
 
499
    the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
 
500
    useless but does not harm much. It is nice for 3.23 (>=.58) slaves
 
501
    which test Rotate events to see if the master is 4.0 (then they
 
502
    choose to stop because they can't replicate 4.0); by always calling
 
503
    fake_rotate_event we are sure that 3.23.58 and newer will detect the
 
504
    problem as soon as replication starts (BUG#198).
 
505
    Always calling fake_rotate_event makes sending of normal
 
506
    (=from-binlog) Rotate events a priori unneeded, but it is not so
 
507
    simple: the 2 Rotate events are not equivalent, the normal one is
 
508
    before the Stop event, the fake one is after. If we don't send the
 
509
    normal one, then the Stop event will be interpreted (by existing 4.0
 
510
    slaves) as "the master stopped", which is wrong. So for safety,
 
511
    given that we want minimum modification of 4.0, we send the normal
 
512
    and fake Rotates.
 
513
  */
 
514
  if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
 
515
  {
 
516
    /*
 
517
       This error code is not perfect, as fake_rotate_event() does not
 
518
       read anything from the binlog; if it fails it's because of an
 
519
       error in my_net_write(), fortunately it will say so in errmsg.
 
520
    */
 
521
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
522
    goto err;
 
523
  }
 
524
  packet->set("\0", 1, &my_charset_bin);
 
525
  /*
 
526
    Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
 
527
    this larger than the corresponding packet (query) sent 
 
528
    from client to master.
 
529
  */
 
530
  thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
 
531
 
 
532
  /*
 
533
    We can set log_lock now, it does not move (it's a member of
 
534
    mysql_bin_log, and it's already inited, and it will be destroyed
 
535
    only at shutdown).
 
536
  */
 
537
  log_lock = mysql_bin_log.get_log_lock();
 
538
  if (pos > BIN_LOG_HEADER_SIZE)
 
539
  {
 
540
     /*
 
541
       Try to find a Format_description_log_event at the beginning of
 
542
       the binlog
 
543
     */
 
544
     if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
 
545
     {
 
546
       /*
 
547
         The packet has offsets equal to the normal offsets in a binlog
 
548
         event +1 (the first character is \0).
 
549
       */
 
550
       DBUG_PRINT("info",
 
551
                  ("Looked for a Format_description_log_event, found event type %d",
 
552
                   (*packet)[EVENT_TYPE_OFFSET+1]));
 
553
       if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
 
554
       {
 
555
         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
 
556
                                       LOG_EVENT_BINLOG_IN_USE_F);
 
557
         (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
 
558
         /*
 
559
           mark that this event with "log_pos=0", so the slave
 
560
           should not increment master's binlog position
 
561
           (rli->group_master_log_pos)
 
562
         */
 
563
         int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
 
564
         /*
 
565
           if reconnect master sends FD event with `created' as 0
 
566
           to avoid destroying temp tables.
 
567
          */
 
568
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
 
569
                   ST_CREATED_OFFSET+1, (ulong) 0);
 
570
         /* send it */
 
571
         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
 
572
         {
 
573
           errmsg = "Failed on my_net_write()";
 
574
           my_errno= ER_UNKNOWN_ERROR;
 
575
           goto err;
 
576
         }
 
577
 
 
578
         /*
 
579
           No need to save this event. We are only doing simple reads
 
580
           (no real parsing of the events) so we don't need it. And so
 
581
           we don't need the artificial Format_description_log_event of
 
582
           3.23&4.x.
 
583
         */
 
584
       }
 
585
     }
 
586
     else
 
587
     {
 
588
       if (test_for_non_eof_log_read_errors(error, &errmsg))
 
589
         goto err;
 
590
       /*
 
591
         It's EOF, nothing to do, go on reading next events, the
 
592
         Format_description_log_event will be found naturally if it is written.
 
593
       */
 
594
     }
 
595
     /* reset the packet as we wrote to it in any case */
 
596
     packet->set("\0", 1, &my_charset_bin);
 
597
  } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
 
598
  else
 
599
  {
 
600
    /* The Format_description_log_event event will be found naturally. */
 
601
  }
 
602
 
 
603
  /* seek to the requested position, to start the requested dump */
 
604
  my_b_seek(&log, pos);                 // Seek will done on next read
 
605
 
 
606
  while (!net->error && net->vio != 0 && !thd->killed)
 
607
  {
 
608
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
 
609
    {
 
610
#ifndef DBUG_OFF
 
611
      if (max_binlog_dump_events && !left_events--)
 
612
      {
 
613
        net_flush(net);
 
614
        errmsg = "Debugging binlog dump abort";
 
615
        my_errno= ER_UNKNOWN_ERROR;
 
616
        goto err;
 
617
      }
 
618
#endif
 
619
      /*
 
620
        log's filename does not change while it's active
 
621
      */
 
622
      if (coord)
 
623
        coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
 
624
 
 
625
      if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
 
626
      {
 
627
        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
 
628
                                      LOG_EVENT_BINLOG_IN_USE_F);
 
629
        (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
 
630
      }
 
631
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
 
632
        binlog_can_be_corrupted= FALSE;
 
633
 
 
634
      if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
 
635
      {
 
636
        errmsg = "Failed on my_net_write()";
 
637
        my_errno= ER_UNKNOWN_ERROR;
 
638
        goto err;
 
639
      }
 
640
 
 
641
      DBUG_PRINT("info", ("log event code %d",
 
642
                          (*packet)[LOG_EVENT_OFFSET+1] ));
 
643
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
 
644
      {
 
645
        if (send_file(thd))
 
646
        {
 
647
          errmsg = "failed in send_file()";
 
648
          my_errno= ER_UNKNOWN_ERROR;
 
649
          goto err;
 
650
        }
 
651
      }
 
652
      packet->set("\0", 1, &my_charset_bin);
 
653
    }
 
654
 
 
655
    /*
 
656
      here we were reading binlog that was not closed properly (as a result
 
657
      of a crash ?). treat any corruption as EOF
 
658
    */
 
659
    if (binlog_can_be_corrupted && error != LOG_READ_MEM)
 
660
      error=LOG_READ_EOF;
 
661
    /*
 
662
      TODO: now that we are logging the offset, check to make sure
 
663
      the recorded offset and the actual match.
 
664
      Guilhem 2003-06: this is not true if this master is a slave
 
665
      <4.0.15 running with --log-slave-updates, because then log_pos may
 
666
      be the offset in the-master-of-this-master's binlog.
 
667
    */
 
668
    if (test_for_non_eof_log_read_errors(error, &errmsg))
 
669
      goto err;
 
670
 
 
671
    if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
 
672
        mysql_bin_log.is_active(log_file_name))
 
673
    {
 
674
      /*
 
675
        Block until there is more data in the log
 
676
      */
 
677
      if (net_flush(net))
 
678
      {
 
679
        errmsg = "failed on net_flush()";
 
680
        my_errno= ER_UNKNOWN_ERROR;
 
681
        goto err;
 
682
      }
 
683
 
 
684
      /*
 
685
        We may have missed the update broadcast from the log
 
686
        that has just happened, let's try to catch it if it did.
 
687
        If we did not miss anything, we just wait for other threads
 
688
        to signal us.
 
689
      */
 
690
      {
 
691
        log.error=0;
 
692
        bool read_packet = 0, fatal_error = 0;
 
693
 
 
694
#ifndef DBUG_OFF
 
695
        if (max_binlog_dump_events && !left_events--)
 
696
        {
 
697
          errmsg = "Debugging binlog dump abort";
 
698
          my_errno= ER_UNKNOWN_ERROR;
 
699
          goto err;
 
700
        }
 
701
#endif
 
702
 
 
703
        /*
 
704
          No one will update the log while we are reading
 
705
          now, but we'll be quick and just read one record
 
706
 
 
707
          TODO:
 
708
          Add an counter that is incremented for each time we update the
 
709
          binary log.  We can avoid the following read if the counter
 
710
          has not been updated since last read.
 
711
        */
 
712
 
 
713
        pthread_mutex_lock(log_lock);
 
714
        switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
 
715
        case 0:
 
716
          /* we read successfully, so we'll need to send it to the slave */
 
717
          pthread_mutex_unlock(log_lock);
 
718
          read_packet = 1;
 
719
          if (coord)
 
720
            coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
 
721
          break;
 
722
 
 
723
        case LOG_READ_EOF:
 
724
        {
 
725
          int ret;
 
726
          DBUG_PRINT("wait",("waiting for data in binary log"));
 
727
          if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
 
728
          {
 
729
            pthread_mutex_unlock(log_lock);
 
730
            goto end;
 
731
          }
 
732
 
 
733
#ifndef DBUG_OFF
 
734
          ulong hb_info_counter= 0;
 
735
#endif
 
736
          do 
 
737
          {
 
738
            if (coord)
 
739
            {
 
740
              DBUG_ASSERT(heartbeat_ts && heartbeat_period != LL(0));
 
741
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
 
742
            }
 
743
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
 
744
            DBUG_ASSERT(ret == 0 || heartbeat_period != LL(0) && coord != NULL);
 
745
            if (ret == ETIMEDOUT || ret == ETIME)
 
746
            {
 
747
#ifndef DBUG_OFF
 
748
              if (hb_info_counter < 3)
 
749
              {
 
750
                sql_print_information("master sends heartbeat message");
 
751
                hb_info_counter++;
 
752
                if (hb_info_counter == 3)
 
753
                  sql_print_information("the rest of heartbeat info skipped ...");
 
754
              }
 
755
#endif
 
756
              if (send_heartbeat_event(net, packet, coord))
 
757
              {
 
758
                errmsg = "Failed on my_net_write()";
 
759
                my_errno= ER_UNKNOWN_ERROR;
 
760
                pthread_mutex_unlock(log_lock);
 
761
                goto err;
 
762
              }
 
763
            }
 
764
            else
 
765
            {
 
766
              DBUG_ASSERT(ret == 0);
 
767
              DBUG_PRINT("wait",("binary log received update"));
 
768
            }
 
769
          } while (ret != 0 && coord != NULL && !thd->killed);
 
770
          pthread_mutex_unlock(log_lock);
 
771
        }    
 
772
        break;
 
773
            
 
774
        default:
 
775
          pthread_mutex_unlock(log_lock);
 
776
          fatal_error = 1;
 
777
          break;
 
778
        }
 
779
 
 
780
        if (read_packet)
 
781
        {
 
782
          thd_proc_info(thd, "Sending binlog event to slave");
 
783
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
 
784
          {
 
785
            errmsg = "Failed on my_net_write()";
 
786
            my_errno= ER_UNKNOWN_ERROR;
 
787
            goto err;
 
788
          }
 
789
 
 
790
          if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
 
791
          {
 
792
            if (send_file(thd))
 
793
            {
 
794
              errmsg = "failed in send_file()";
 
795
              my_errno= ER_UNKNOWN_ERROR;
 
796
              goto err;
 
797
            }
 
798
          }
 
799
          packet->set("\0", 1, &my_charset_bin);
 
800
          /*
 
801
            No need to net_flush because we will get to flush later when
 
802
            we hit EOF pretty quick
 
803
          */
 
804
        }
 
805
 
 
806
        if (fatal_error)
 
807
        {
 
808
          errmsg = "error reading log entry";
 
809
          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
810
          goto err;
 
811
        }
 
812
        log.error=0;
 
813
      }
 
814
    }
 
815
    else
 
816
    {
 
817
      bool loop_breaker = 0;
 
818
      /* need this to break out of the for loop from switch */
 
819
 
 
820
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
 
821
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
 
822
      case LOG_INFO_EOF:
 
823
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
 
824
        break;
 
825
      case 0:
 
826
        break;
 
827
      default:
 
828
        errmsg = "could not find next log";
 
829
        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
830
        goto err;
 
831
      }
 
832
 
 
833
      if (loop_breaker)
 
834
        break;
 
835
 
 
836
      end_io_cache(&log);
 
837
      (void) my_close(file, MYF(MY_WME));
 
838
 
 
839
      /*
 
840
        Call fake_rotate_event() in case the previous log (the one which
 
841
        we have just finished reading) did not contain a Rotate event
 
842
        (for example (I don't know any other example) the previous log
 
843
        was the last one before the master was shutdown & restarted).
 
844
        This way we tell the slave about the new log's name and
 
845
        position.  If the binlog is 5.0, the next event we are going to
 
846
        read and send is Format_description_log_event.
 
847
      */
 
848
      if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
 
849
          fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
 
850
                            &errmsg))
 
851
      {
 
852
        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
853
        goto err;
 
854
      }
 
855
 
 
856
      packet->length(0);
 
857
      packet->append('\0');
 
858
      if (coord)
 
859
        coord->file_name= log_file_name; // reset to the next
 
860
    }
 
861
  }
 
862
 
 
863
end:
 
864
  end_io_cache(&log);
 
865
  (void)my_close(file, MYF(MY_WME));
 
866
 
 
867
  my_eof(thd);
 
868
  thd_proc_info(thd, "Waiting to finalize termination");
 
869
  pthread_mutex_lock(&LOCK_thread_count);
 
870
  thd->current_linfo = 0;
 
871
  pthread_mutex_unlock(&LOCK_thread_count);
 
872
  DBUG_VOID_RETURN;
 
873
 
 
874
err:
 
875
  thd_proc_info(thd, "Waiting to finalize termination");
 
876
  end_io_cache(&log);
 
877
  /*
 
878
    Exclude  iteration through thread list
 
879
    this is needed for purge_logs() - it will iterate through
 
880
    thread list and update thd->current_linfo->index_file_offset
 
881
    this mutex will make sure that it never tried to update our linfo
 
882
    after we return from this stack frame
 
883
  */
 
884
  pthread_mutex_lock(&LOCK_thread_count);
 
885
  thd->current_linfo = 0;
 
886
  pthread_mutex_unlock(&LOCK_thread_count);
 
887
  if (file >= 0)
 
888
    (void) my_close(file, MYF(MY_WME));
 
889
 
 
890
  my_message(my_errno, errmsg, MYF(0));
 
891
  DBUG_VOID_RETURN;
 
892
}
 
893
 
 
894
int start_slave(THD* thd , Master_info* mi,  bool net_report)
 
895
{
 
896
  int slave_errno= 0;
 
897
  int thread_mask;
 
898
  DBUG_ENTER("start_slave");
 
899
 
 
900
  lock_slave_threads(mi);  // this allows us to cleanly read slave_running
 
901
  // Get a mask of _stopped_ threads
 
902
  init_thread_mask(&thread_mask,mi,1 /* inverse */);
 
903
  /*
 
904
    Below we will start all stopped threads.  But if the user wants to
 
905
    start only one thread, do as if the other thread was running (as we
 
906
    don't wan't to touch the other thread), so set the bit to 0 for the
 
907
    other thread
 
908
  */
 
909
  if (thd->lex->slave_thd_opt)
 
910
    thread_mask&= thd->lex->slave_thd_opt;
 
911
  if (thread_mask) //some threads are stopped, start them
 
912
  {
 
913
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
 
914
                         thread_mask))
 
915
      slave_errno=ER_MASTER_INFO;
 
916
    else if (server_id_supplied && *mi->host)
 
917
    {
 
918
      /*
 
919
        If we will start SQL thread we will care about UNTIL options If
 
920
        not and they are specified we will ignore them and warn user
 
921
        about this fact.
 
922
      */
 
923
      if (thread_mask & SLAVE_SQL)
 
924
      {
 
925
        pthread_mutex_lock(&mi->rli.data_lock);
 
926
 
 
927
        if (thd->lex->mi.pos)
 
928
        {
 
929
          mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
 
930
          mi->rli.until_log_pos= thd->lex->mi.pos;
 
931
          /*
 
932
             We don't check thd->lex->mi.log_file_name for NULL here
 
933
             since it is checked in sql_yacc.yy
 
934
          */
 
935
          strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
 
936
                  sizeof(mi->rli.until_log_name)-1);
 
937
        }
 
938
        else if (thd->lex->mi.relay_log_pos)
 
939
        {
 
940
          mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
 
941
          mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
 
942
          strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
 
943
                  sizeof(mi->rli.until_log_name)-1);
 
944
        }
 
945
        else
 
946
          mi->rli.clear_until_condition();
 
947
 
 
948
        if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
 
949
        {
 
950
          /* Preparing members for effective until condition checking */
 
951
          const char *p= fn_ext(mi->rli.until_log_name);
 
952
          char *p_end;
 
953
          if (*p)
 
954
          {
 
955
            //p points to '.'
 
956
            mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
 
957
            /*
 
958
              p_end points to the first invalid character. If it equals
 
959
              to p, no digits were found, error. If it contains '\0' it
 
960
              means  conversion went ok.
 
961
            */
 
962
            if (p_end==p || *p_end)
 
963
              slave_errno=ER_BAD_SLAVE_UNTIL_COND;
 
964
          }
 
965
          else
 
966
            slave_errno=ER_BAD_SLAVE_UNTIL_COND;
 
967
 
 
968
          /* mark the cached result of the UNTIL comparison as "undefined" */
 
969
          mi->rli.until_log_names_cmp_result=
 
970
            Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
 
971
 
 
972
          /* Issuing warning then started without --skip-slave-start */
 
973
          if (!opt_skip_slave_start)
 
974
            push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
 
975
                         ER_MISSING_SKIP_SLAVE,
 
976
                         ER(ER_MISSING_SKIP_SLAVE));
 
977
        }
 
978
 
 
979
        pthread_mutex_unlock(&mi->rli.data_lock);
 
980
      }
 
981
      else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
 
982
        push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
 
983
                     ER(ER_UNTIL_COND_IGNORED));
 
984
 
 
985
      if (!slave_errno)
 
986
        slave_errno = start_slave_threads(0 /*no mutex */,
 
987
                                        1 /* wait for start */,
 
988
                                        mi,
 
989
                                        master_info_file,relay_log_info_file,
 
990
                                        thread_mask);
 
991
    }
 
992
    else
 
993
      slave_errno = ER_BAD_SLAVE;
 
994
  }
 
995
  else
 
996
  {
 
997
    /* no error if all threads are already started, only a warning */
 
998
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
 
999
                 ER(ER_SLAVE_WAS_RUNNING));
 
1000
  }
 
1001
 
 
1002
  unlock_slave_threads(mi);
 
1003
 
 
1004
  if (slave_errno)
 
1005
  {
 
1006
    if (net_report)
 
1007
      my_message(slave_errno, ER(slave_errno), MYF(0));
 
1008
    DBUG_RETURN(1);
 
1009
  }
 
1010
  else if (net_report)
 
1011
    my_ok(thd);
 
1012
 
 
1013
  DBUG_RETURN(0);
 
1014
}
 
1015
 
 
1016
 
 
1017
int stop_slave(THD* thd, Master_info* mi, bool net_report )
 
1018
{
 
1019
  DBUG_ENTER("stop_slave");
 
1020
  
 
1021
  int slave_errno;
 
1022
  if (!thd)
 
1023
    thd = current_thd;
 
1024
 
 
1025
  thd_proc_info(thd, "Killing slave");
 
1026
  int thread_mask;
 
1027
  lock_slave_threads(mi);
 
1028
  // Get a mask of _running_ threads
 
1029
  init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
 
1030
  /*
 
1031
    Below we will stop all running threads.
 
1032
    But if the user wants to stop only one thread, do as if the other thread
 
1033
    was stopped (as we don't wan't to touch the other thread), so set the
 
1034
    bit to 0 for the other thread
 
1035
  */
 
1036
  if (thd->lex->slave_thd_opt)
 
1037
    thread_mask &= thd->lex->slave_thd_opt;
 
1038
 
 
1039
  if (thread_mask)
 
1040
  {
 
1041
    slave_errno= terminate_slave_threads(mi,thread_mask,
 
1042
                                         1 /*skip lock */);
 
1043
  }
 
1044
  else
 
1045
  {
 
1046
    //no error if both threads are already stopped, only a warning
 
1047
    slave_errno= 0;
 
1048
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
 
1049
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
 
1050
  }
 
1051
  unlock_slave_threads(mi);
 
1052
  thd_proc_info(thd, 0);
 
1053
 
 
1054
  if (slave_errno)
 
1055
  {
 
1056
    if (net_report)
 
1057
      my_message(slave_errno, ER(slave_errno), MYF(0));
 
1058
    DBUG_RETURN(1);
 
1059
  }
 
1060
  else if (net_report)
 
1061
    my_ok(thd);
 
1062
 
 
1063
  DBUG_RETURN(0);
 
1064
}
 
1065
 
 
1066
 
 
1067
/*
 
1068
  Remove all relay logs and start replication from the start
 
1069
 
 
1070
  SYNOPSIS
 
1071
    reset_slave()
 
1072
    thd                 Thread handler
 
1073
    mi                  Master info for the slave
 
1074
 
 
1075
  RETURN
 
1076
    0   ok
 
1077
    1   error
 
1078
*/
 
1079
 
 
1080
 
 
1081
int reset_slave(THD *thd, Master_info* mi)
 
1082
{
 
1083
  MY_STAT stat_area;
 
1084
  char fname[FN_REFLEN];
 
1085
  int thread_mask= 0, error= 0;
 
1086
  uint sql_errno=0;
 
1087
  const char* errmsg=0;
 
1088
  DBUG_ENTER("reset_slave");
 
1089
 
 
1090
  lock_slave_threads(mi);
 
1091
  init_thread_mask(&thread_mask,mi,0 /* not inverse */);
 
1092
  if (thread_mask) // We refuse if any slave thread is running
 
1093
  {
 
1094
    sql_errno= ER_SLAVE_MUST_STOP;
 
1095
    error=1;
 
1096
    goto err;
 
1097
  }
 
1098
 
 
1099
  ha_reset_slave(thd);
 
1100
 
 
1101
  // delete relay logs, clear relay log coordinates
 
1102
  if ((error= purge_relay_logs(&mi->rli, thd,
 
1103
                               1 /* just reset */,
 
1104
                               &errmsg)))
 
1105
    goto err;
 
1106
 
 
1107
  /* Clear master's log coordinates */
 
1108
  init_master_log_pos(mi);
 
1109
  /*
 
1110
     Reset errors (the idea is that we forget about the
 
1111
     old master).
 
1112
  */
 
1113
  mi->rli.clear_error();
 
1114
  mi->rli.clear_until_condition();
 
1115
 
 
1116
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
 
1117
  end_master_info(mi);
 
1118
  // and delete these two files
 
1119
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
 
1120
  if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
 
1121
  {
 
1122
    error=1;
 
1123
    goto err;
 
1124
  }
 
1125
  // delete relay_log_info_file
 
1126
  fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
 
1127
  if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
 
1128
  {
 
1129
    error=1;
 
1130
    goto err;
 
1131
  }
 
1132
 
 
1133
err:
 
1134
  unlock_slave_threads(mi);
 
1135
  if (error)
 
1136
    my_error(sql_errno, MYF(0), errmsg);
 
1137
  DBUG_RETURN(error);
 
1138
}
 
1139
 
 
1140
/*
 
1141
 
 
1142
  Kill all Binlog_dump threads which previously talked to the same slave
 
1143
  ("same" means with the same server id). Indeed, if the slave stops, if the
 
1144
  Binlog_dump thread is waiting (pthread_cond_wait) for binlog update, then it
 
1145
  will keep existing until a query is written to the binlog. If the master is
 
1146
  idle, then this could last long, and if the slave reconnects, we could have 2
 
1147
  Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
 
1148
  binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
 
1149
  the master kills any existing thread with the slave's server id (if this id is
 
1150
  not zero; it will be true for real slaves, but false for mysqlbinlog when it
 
1151
  sends COM_BINLOG_DUMP to get a remote binlog dump).
 
1152
 
 
1153
  SYNOPSIS
 
1154
    kill_zombie_dump_threads()
 
1155
    slave_server_id     the slave's server id
 
1156
 
 
1157
*/
 
1158
 
 
1159
 
 
1160
void kill_zombie_dump_threads(uint32 slave_server_id)
 
1161
{
 
1162
  pthread_mutex_lock(&LOCK_thread_count);
 
1163
  I_List_iterator<THD> it(threads);
 
1164
  THD *tmp;
 
1165
 
 
1166
  while ((tmp=it++))
 
1167
  {
 
1168
    if (tmp->command == COM_BINLOG_DUMP &&
 
1169
       tmp->server_id == slave_server_id)
 
1170
    {
 
1171
      pthread_mutex_lock(&tmp->LOCK_delete);    // Lock from delete
 
1172
      break;
 
1173
    }
 
1174
  }
 
1175
  pthread_mutex_unlock(&LOCK_thread_count);
 
1176
  if (tmp)
 
1177
  {
 
1178
    /*
 
1179
      Here we do not call kill_one_thread() as
 
1180
      it will be slow because it will iterate through the list
 
1181
      again. We just to do kill the thread ourselves.
 
1182
    */
 
1183
    tmp->awake(THD::KILL_QUERY);
 
1184
    pthread_mutex_unlock(&tmp->LOCK_delete);
 
1185
  }
 
1186
}
 
1187
 
 
1188
 
 
1189
bool change_master(THD* thd, Master_info* mi)
 
1190
{
 
1191
  int thread_mask;
 
1192
  const char* errmsg= 0;
 
1193
  bool need_relay_log_purge= 1;
 
1194
  DBUG_ENTER("change_master");
 
1195
 
 
1196
  lock_slave_threads(mi);
 
1197
  init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
 
1198
  if (thread_mask) // We refuse if any slave thread is running
 
1199
  {
 
1200
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
 
1201
    unlock_slave_threads(mi);
 
1202
    DBUG_RETURN(TRUE);
 
1203
  }
 
1204
 
 
1205
  thd_proc_info(thd, "Changing master");
 
1206
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
 
1207
  // TODO: see if needs re-write
 
1208
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
 
1209
                       thread_mask))
 
1210
  {
 
1211
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
 
1212
    unlock_slave_threads(mi);
 
1213
    DBUG_RETURN(TRUE);
 
1214
  }
 
1215
 
 
1216
  /*
 
1217
    Data lock not needed since we have already stopped the running threads,
 
1218
    and we have the hold on the run locks which will keep all threads that
 
1219
    could possibly modify the data structures from running
 
1220
  */
 
1221
 
 
1222
  /*
 
1223
    If the user specified host or port without binlog or position,
 
1224
    reset binlog's name to FIRST and position to 4.
 
1225
  */
 
1226
 
 
1227
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
 
1228
  {
 
1229
    mi->master_log_name[0] = 0;
 
1230
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
 
1231
  }
 
1232
 
 
1233
  if (lex_mi->log_file_name)
 
1234
    strmake(mi->master_log_name, lex_mi->log_file_name,
 
1235
            sizeof(mi->master_log_name)-1);
 
1236
  if (lex_mi->pos)
 
1237
  {
 
1238
    mi->master_log_pos= lex_mi->pos;
 
1239
  }
 
1240
  DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
 
1241
 
 
1242
  if (lex_mi->host)
 
1243
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
 
1244
  if (lex_mi->user)
 
1245
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
 
1246
  if (lex_mi->password)
 
1247
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
 
1248
  if (lex_mi->port)
 
1249
    mi->port = lex_mi->port;
 
1250
  if (lex_mi->connect_retry)
 
1251
    mi->connect_retry = lex_mi->connect_retry;
 
1252
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1253
    mi->heartbeat_period = lex_mi->heartbeat_period;
 
1254
  else
 
1255
    mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
 
1256
                                      (slave_net_timeout/2.0));
 
1257
  mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd
 
1258
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1259
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
1260
 
 
1261
  if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1262
    mi->ssl_verify_server_cert=
 
1263
      (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
1264
 
 
1265
  if (lex_mi->ssl_ca)
 
1266
    strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
 
1267
  if (lex_mi->ssl_capath)
 
1268
    strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
 
1269
  if (lex_mi->ssl_cert)
 
1270
    strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
 
1271
  if (lex_mi->ssl_cipher)
 
1272
    strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
 
1273
  if (lex_mi->ssl_key)
 
1274
    strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
 
1275
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
 
1276
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
 
1277
      lex_mi->ssl_verify_server_cert )
 
1278
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
 
1279
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
 
1280
 
 
1281
  if (lex_mi->relay_log_name)
 
1282
  {
 
1283
    need_relay_log_purge= 0;
 
1284
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
 
1285
            sizeof(mi->rli.group_relay_log_name)-1);
 
1286
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
 
1287
            sizeof(mi->rli.event_relay_log_name)-1);
 
1288
  }
 
1289
 
 
1290
  if (lex_mi->relay_log_pos)
 
1291
  {
 
1292
    need_relay_log_purge= 0;
 
1293
    mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
 
1294
  }
 
1295
 
 
1296
  /*
 
1297
    If user did specify neither host nor port nor any log name nor any log
 
1298
    pos, i.e. he specified only user/password/master_connect_retry, he probably
 
1299
    wants replication to resume from where it had left, i.e. from the
 
1300
    coordinates of the **SQL** thread (imagine the case where the I/O is ahead
 
1301
    of the SQL; restarting from the coordinates of the I/O would lose some
 
1302
    events which is probably unwanted when you are just doing minor changes
 
1303
    like changing master_connect_retry).
 
1304
    A side-effect is that if only the I/O thread was started, this thread may
 
1305
    restart from ''/4 after the CHANGE MASTER. That's a minor problem (it is a
 
1306
    much more unlikely situation than the one we are fixing here).
 
1307
    Note: coordinates of the SQL thread must be read here, before the
 
1308
    'if (need_relay_log_purge)' block which resets them.
 
1309
  */
 
1310
  if (!lex_mi->host && !lex_mi->port &&
 
1311
      !lex_mi->log_file_name && !lex_mi->pos &&
 
1312
      need_relay_log_purge)
 
1313
   {
 
1314
     /*
 
1315
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
 
1316
       not initialized), so we use a max().
 
1317
       What happens to mi->rli.master_log_pos during the initialization stages
 
1318
       of replication is not 100% clear, so we guard against problems using
 
1319
       max().
 
1320
      */
 
1321
     mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
 
1322
                              mi->rli.group_master_log_pos);
 
1323
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
 
1324
             sizeof(mi->master_log_name)-1);
 
1325
  }
 
1326
  /*
 
1327
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
 
1328
    a slave before).
 
1329
  */
 
1330
  if (flush_master_info(mi, 0))
 
1331
  {
 
1332
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
 
1333
    unlock_slave_threads(mi);
 
1334
    DBUG_RETURN(TRUE);
 
1335
  }
 
1336
  if (need_relay_log_purge)
 
1337
  {
 
1338
    relay_log_purge= 1;
 
1339
    thd_proc_info(thd, "Purging old relay logs");
 
1340
    if (purge_relay_logs(&mi->rli, thd,
 
1341
                         0 /* not only reset, but also reinit */,
 
1342
                         &errmsg))
 
1343
    {
 
1344
      my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
 
1345
      unlock_slave_threads(mi);
 
1346
      DBUG_RETURN(TRUE);
 
1347
    }
 
1348
  }
 
1349
  else
 
1350
  {
 
1351
    const char* msg;
 
1352
    relay_log_purge= 0;
 
1353
    /* Relay log is already initialized */
 
1354
    if (init_relay_log_pos(&mi->rli,
 
1355
                           mi->rli.group_relay_log_name,
 
1356
                           mi->rli.group_relay_log_pos,
 
1357
                           0 /*no data lock*/,
 
1358
                           &msg, 0))
 
1359
    {
 
1360
      my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
 
1361
      unlock_slave_threads(mi);
 
1362
      DBUG_RETURN(TRUE);
 
1363
    }
 
1364
  }
 
1365
  /*
 
1366
    Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block,
 
1367
    so restore them to good values. If we left them to ''/0, that would work;
 
1368
    but that would fail in the case of 2 successive CHANGE MASTER (without a
 
1369
    START SLAVE in between): because first one would set the coords in mi to
 
1370
    the good values of those in rli, the set those in rli to ''/0, then
 
1371
    second CHANGE MASTER would set the coords in mi to those of rli, i.e. to
 
1372
    ''/0: we have lost all copies of the original good coordinates.
 
1373
    That's why we always save good coords in rli.
 
1374
  */
 
1375
  mi->rli.group_master_log_pos= mi->master_log_pos;
 
1376
  DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
 
1377
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
 
1378
          sizeof(mi->rli.group_master_log_name)-1);
 
1379
 
 
1380
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
 
1381
    mi->rli.group_master_log_pos=0;
 
1382
 
 
1383
  pthread_mutex_lock(&mi->rli.data_lock);
 
1384
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
 
1385
  /* Clear the errors, for a clean start */
 
1386
  mi->rli.clear_error();
 
1387
  mi->rli.clear_until_condition();
 
1388
  /*
 
1389
    If we don't write new coordinates to disk now, then old will remain in
 
1390
    relay-log.info until START SLAVE is issued; but if mysqld is shutdown
 
1391
    before START SLAVE, then old will remain in relay-log.info, and will be the
 
1392
    in-memory value at restart (thus causing errors, as the old relay log does
 
1393
    not exist anymore).
 
1394
  */
 
1395
  flush_relay_log_info(&mi->rli);
 
1396
  pthread_cond_broadcast(&mi->data_cond);
 
1397
  pthread_mutex_unlock(&mi->rli.data_lock);
 
1398
 
 
1399
  unlock_slave_threads(mi);
 
1400
  thd_proc_info(thd, 0);
 
1401
  my_ok(thd);
 
1402
  DBUG_RETURN(FALSE);
 
1403
}
 
1404
 
 
1405
int reset_master(THD* thd)
 
1406
{
 
1407
  if (!mysql_bin_log.is_open())
 
1408
  {
 
1409
    my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
 
1410
               ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
 
1411
    return 1;
 
1412
  }
 
1413
  return mysql_bin_log.reset_logs(thd);
 
1414
}
 
1415
 
 
1416
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
 
1417
                   const char* log_file_name2, ulonglong log_pos2)
 
1418
{
 
1419
  int res;
 
1420
  uint log_file_name1_len=  strlen(log_file_name1);
 
1421
  uint log_file_name2_len=  strlen(log_file_name2);
 
1422
 
 
1423
  //  We assume that both log names match up to '.'
 
1424
  if (log_file_name1_len == log_file_name2_len)
 
1425
  {
 
1426
    if ((res= strcmp(log_file_name1, log_file_name2)))
 
1427
      return res;
 
1428
    return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
 
1429
  }
 
1430
  return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
 
1431
}
 
1432
 
 
1433
 
 
1434
bool mysql_show_binlog_events(THD* thd)
 
1435
{
 
1436
  Protocol *protocol= thd->protocol;
 
1437
  List<Item> field_list;
 
1438
  const char *errmsg = 0;
 
1439
  bool ret = TRUE;
 
1440
  IO_CACHE log;
 
1441
  File file = -1;
 
1442
  DBUG_ENTER("mysql_show_binlog_events");
 
1443
 
 
1444
  Log_event::init_show_field_list(&field_list);
 
1445
  if (protocol->send_fields(&field_list,
 
1446
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1447
    DBUG_RETURN(TRUE);
 
1448
 
 
1449
  Format_description_log_event *description_event= new
 
1450
    Format_description_log_event(3); /* MySQL 4.0 by default */
 
1451
 
 
1452
  /*
 
1453
    Wait for handlers to insert any pending information
 
1454
    into the binlog.  For e.g. ndb which updates the binlog asynchronously
 
1455
    this is needed so that the uses sees all its own commands in the binlog
 
1456
  */
 
1457
  ha_binlog_wait(thd);
 
1458
 
 
1459
  if (mysql_bin_log.is_open())
 
1460
  {
 
1461
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
 
1462
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
 
1463
    ha_rows event_count, limit_start, limit_end;
 
1464
    my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
 
1465
    char search_file_name[FN_REFLEN], *name;
 
1466
    const char *log_file_name = lex_mi->log_file_name;
 
1467
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
 
1468
    LOG_INFO linfo;
 
1469
    Log_event* ev;
 
1470
 
 
1471
    unit->set_limit(thd->lex->current_select);
 
1472
    limit_start= unit->offset_limit_cnt;
 
1473
    limit_end= unit->select_limit_cnt;
 
1474
 
 
1475
    name= search_file_name;
 
1476
    if (log_file_name)
 
1477
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
 
1478
    else
 
1479
      name=0;                                   // Find first log
 
1480
 
 
1481
    linfo.index_file_offset = 0;
 
1482
 
 
1483
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
 
1484
    {
 
1485
      errmsg = "Could not find target log";
 
1486
      goto err;
 
1487
    }
 
1488
 
 
1489
    pthread_mutex_lock(&LOCK_thread_count);
 
1490
    thd->current_linfo = &linfo;
 
1491
    pthread_mutex_unlock(&LOCK_thread_count);
 
1492
 
 
1493
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
 
1494
      goto err;
 
1495
 
 
1496
    /*
 
1497
      to account binlog event header size
 
1498
    */
 
1499
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
 
1500
 
 
1501
    pthread_mutex_lock(log_lock);
 
1502
 
 
1503
    /*
 
1504
      open_binlog() sought to position 4.
 
1505
      Read the first event in case it's a Format_description_log_event, to
 
1506
      know the format. If there's no such event, we are 3.23 or 4.x. This
 
1507
      code, like before, can't read 3.23 binlogs.
 
1508
      This code will fail on a mixed relay log (one which has Format_desc then
 
1509
      Rotate then Format_desc).
 
1510
    */
 
1511
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
 
1512
    if (ev)
 
1513
    {
 
1514
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
 
1515
      {
 
1516
        delete description_event;
 
1517
        description_event= (Format_description_log_event*) ev;
 
1518
      }
 
1519
      else
 
1520
        delete ev;
 
1521
    }
 
1522
 
 
1523
    my_b_seek(&log, pos);
 
1524
 
 
1525
    if (!description_event->is_valid())
 
1526
    {
 
1527
      errmsg="Invalid Format_description event; could be out of memory";
 
1528
      goto err;
 
1529
    }
 
1530
 
 
1531
    for (event_count = 0;
 
1532
         (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
 
1533
                                         description_event)); )
 
1534
    {
 
1535
      if (event_count >= limit_start &&
 
1536
          ev->net_send(protocol, linfo.log_file_name, pos))
 
1537
      {
 
1538
        errmsg = "Net error";
 
1539
        delete ev;
 
1540
        pthread_mutex_unlock(log_lock);
 
1541
        goto err;
 
1542
      }
 
1543
 
 
1544
      pos = my_b_tell(&log);
 
1545
      delete ev;
 
1546
 
 
1547
      if (++event_count >= limit_end)
 
1548
        break;
 
1549
    }
 
1550
 
 
1551
    if (event_count < limit_end && log.error)
 
1552
    {
 
1553
      errmsg = "Wrong offset or I/O error";
 
1554
      pthread_mutex_unlock(log_lock);
 
1555
      goto err;
 
1556
    }
 
1557
 
 
1558
    pthread_mutex_unlock(log_lock);
 
1559
  }
 
1560
 
 
1561
  ret= FALSE;
 
1562
 
 
1563
err:
 
1564
  delete description_event;
 
1565
  if (file >= 0)
 
1566
  {
 
1567
    end_io_cache(&log);
 
1568
    (void) my_close(file, MYF(MY_WME));
 
1569
  }
 
1570
 
 
1571
  if (errmsg)
 
1572
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
 
1573
             "SHOW BINLOG EVENTS", errmsg);
 
1574
  else
 
1575
    my_eof(thd);
 
1576
 
 
1577
  pthread_mutex_lock(&LOCK_thread_count);
 
1578
  thd->current_linfo = 0;
 
1579
  pthread_mutex_unlock(&LOCK_thread_count);
 
1580
  DBUG_RETURN(ret);
 
1581
}
 
1582
 
 
1583
 
 
1584
bool show_binlog_info(THD* thd)
 
1585
{
 
1586
  Protocol *protocol= thd->protocol;
 
1587
  DBUG_ENTER("show_binlog_info");
 
1588
  List<Item> field_list;
 
1589
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
 
1590
  field_list.push_back(new Item_return_int("Position",20,
 
1591
                                           MYSQL_TYPE_LONGLONG));
 
1592
  field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
 
1593
  field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
 
1594
 
 
1595
  if (protocol->send_fields(&field_list,
 
1596
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1597
    DBUG_RETURN(TRUE);
 
1598
  protocol->prepare_for_resend();
 
1599
 
 
1600
  if (mysql_bin_log.is_open())
 
1601
  {
 
1602
    LOG_INFO li;
 
1603
    mysql_bin_log.get_current_log(&li);
 
1604
    int dir_len = dirname_length(li.log_file_name);
 
1605
    protocol->store(li.log_file_name + dir_len, &my_charset_bin);
 
1606
    protocol->store((ulonglong) li.pos);
 
1607
    protocol->store(binlog_filter->get_do_db());
 
1608
    protocol->store(binlog_filter->get_ignore_db());
 
1609
    if (protocol->write())
 
1610
      DBUG_RETURN(TRUE);
 
1611
  }
 
1612
  my_eof(thd);
 
1613
  DBUG_RETURN(FALSE);
 
1614
}
 
1615
 
 
1616
 
 
1617
/*
 
1618
  Send a list of all binary logs to client
 
1619
 
 
1620
  SYNOPSIS
 
1621
    show_binlogs()
 
1622
    thd         Thread specific variable
 
1623
 
 
1624
  RETURN VALUES
 
1625
    FALSE OK
 
1626
    TRUE  error
 
1627
*/
 
1628
 
 
1629
bool show_binlogs(THD* thd)
 
1630
{
 
1631
  IO_CACHE *index_file;
 
1632
  LOG_INFO cur;
 
1633
  File file;
 
1634
  char fname[FN_REFLEN];
 
1635
  List<Item> field_list;
 
1636
  uint length;
 
1637
  int cur_dir_len;
 
1638
  Protocol *protocol= thd->protocol;
 
1639
  DBUG_ENTER("show_binlogs");
 
1640
 
 
1641
  if (!mysql_bin_log.is_open())
 
1642
  {
 
1643
    my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0));
 
1644
    return 1;
 
1645
  }
 
1646
 
 
1647
  field_list.push_back(new Item_empty_string("Log_name", 255));
 
1648
  field_list.push_back(new Item_return_int("File_size", 20,
 
1649
                                           MYSQL_TYPE_LONGLONG));
 
1650
  if (protocol->send_fields(&field_list,
 
1651
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1652
    DBUG_RETURN(TRUE);
 
1653
  
 
1654
  pthread_mutex_lock(mysql_bin_log.get_log_lock());
 
1655
  mysql_bin_log.lock_index();
 
1656
  index_file=mysql_bin_log.get_index_file();
 
1657
  
 
1658
  mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
 
1659
  pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
 
1660
  
 
1661
  cur_dir_len= dirname_length(cur.log_file_name);
 
1662
 
 
1663
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
 
1664
 
 
1665
  /* The file ends with EOF or empty line */
 
1666
  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
 
1667
  {
 
1668
    int dir_len;
 
1669
    ulonglong file_length= 0;                   // Length if open fails
 
1670
    fname[--length] = '\0';                     // remove the newline
 
1671
 
 
1672
    protocol->prepare_for_resend();
 
1673
    dir_len= dirname_length(fname);
 
1674
    length-= dir_len;
 
1675
    protocol->store(fname + dir_len, length, &my_charset_bin);
 
1676
 
 
1677
    if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
 
1678
      file_length= cur.pos;  /* The active log, use the active position */
 
1679
    else
 
1680
    {
 
1681
      /* this is an old log, open it and find the size */
 
1682
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
 
1683
                         MYF(0))) >= 0)
 
1684
      {
 
1685
        file_length= (ulonglong) my_seek(file, 0L, MY_SEEK_END, MYF(0));
 
1686
        my_close(file, MYF(0));
 
1687
      }
 
1688
    }
 
1689
    protocol->store(file_length);
 
1690
    if (protocol->write())
 
1691
      goto err;
 
1692
  }
 
1693
  mysql_bin_log.unlock_index();
 
1694
  my_eof(thd);
 
1695
  DBUG_RETURN(FALSE);
 
1696
 
 
1697
err:
 
1698
  mysql_bin_log.unlock_index();
 
1699
  DBUG_RETURN(TRUE);
 
1700
}
 
1701
 
 
1702
/**
 
1703
   Load data's io cache specific hook to be executed
 
1704
   before a chunk of data is being read into the cache's buffer
 
1705
   The fuction instantianates and writes into the binlog
 
1706
   replication events along LOAD DATA processing.
 
1707
   
 
1708
   @param file  pointer to io-cache
 
1709
   @return 0
 
1710
*/
 
1711
int log_loaded_block(IO_CACHE* file)
 
1712
{
 
1713
  DBUG_ENTER("log_loaded_block");
 
1714
  LOAD_FILE_INFO *lf_info;
 
1715
  uint block_len;
 
1716
  /* buffer contains position where we started last read */
 
1717
  uchar* buffer= (uchar*) my_b_get_buffer_start(file);
 
1718
  uint max_event_size= current_thd->variables.max_allowed_packet;
 
1719
  lf_info= (LOAD_FILE_INFO*) file->arg;
 
1720
  if (lf_info->thd->current_stmt_binlog_row_based)
 
1721
    DBUG_RETURN(0);
 
1722
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
 
1723
      lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
 
1724
    DBUG_RETURN(0);
 
1725
  
 
1726
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
 
1727
       buffer += min(block_len, max_event_size),
 
1728
       block_len -= min(block_len, max_event_size))
 
1729
  {
 
1730
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
 
1731
    if (lf_info->wrote_create_file)
 
1732
    {
 
1733
      Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
 
1734
                               min(block_len, max_event_size),
 
1735
                               lf_info->log_delayed);
 
1736
      mysql_bin_log.write(&a);
 
1737
    }
 
1738
    else
 
1739
    {
 
1740
      Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
 
1741
                                   buffer,
 
1742
                                   min(block_len, max_event_size),
 
1743
                                   lf_info->log_delayed);
 
1744
      mysql_bin_log.write(&b);
 
1745
      lf_info->wrote_create_file= 1;
 
1746
      DBUG_SYNC_POINT("debug_lock.created_file_event",10);
 
1747
    }
 
1748
  }
 
1749
  DBUG_RETURN(0);
 
1750
}
 
1751
 
 
1752
/*
 
1753
  Replication System Variables
 
1754
*/
 
1755
 
 
1756
class sys_var_slave_skip_counter :public sys_var
 
1757
{
 
1758
public:
 
1759
  sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
 
1760
    :sys_var(name_arg)
 
1761
  { chain_sys_var(chain); }
 
1762
  bool check(THD *thd, set_var *var);
 
1763
  bool update(THD *thd, set_var *var);
 
1764
  bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
 
1765
  /*
 
1766
    We can't retrieve the value of this, so we don't have to define
 
1767
    type() or value_ptr()
 
1768
  */
 
1769
};
 
1770
 
 
1771
class sys_var_sync_binlog_period :public sys_var_long_ptr
 
1772
{
 
1773
public:
 
1774
  sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg, 
 
1775
                             ulong *value_ptr)
 
1776
    :sys_var_long_ptr(chain, name_arg,value_ptr) {}
 
1777
  bool update(THD *thd, set_var *var);
 
1778
};
 
1779
 
 
1780
static void fix_slave_net_timeout(THD *thd, enum_var_type type)
 
1781
{
 
1782
  DBUG_ENTER("fix_slave_net_timeout");
 
1783
#ifdef HAVE_REPLICATION
 
1784
  pthread_mutex_lock(&LOCK_active_mi);
 
1785
  DBUG_PRINT("info",("slave_net_timeout=%lu mi->heartbeat_period=%.3f",
 
1786
                     slave_net_timeout,
 
1787
                     (active_mi? active_mi->heartbeat_period : 0.0)));
 
1788
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
 
1789
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
 
1790
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
 
1791
                        "The currect value for master_heartbeat_period"
 
1792
                        " exceeds the new value of `slave_net_timeout' sec."
 
1793
                        " A sensible value for the period should be"
 
1794
                        " less than the timeout.");
 
1795
  pthread_mutex_unlock(&LOCK_active_mi);
 
1796
#endif
 
1797
  DBUG_VOID_RETURN;
 
1798
}
 
1799
 
 
1800
static sys_var_chain vars = { NULL, NULL };
 
1801
 
 
1802
static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
 
1803
                                            &relay_log_purge);
 
1804
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
 
1805
                                              &slave_net_timeout,
 
1806
                                              fix_slave_net_timeout);
 
1807
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
 
1808
                                                &slave_trans_retries);
 
1809
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
 
1810
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
 
1811
 
 
1812
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
 
1813
 
 
1814
 
 
1815
static SHOW_VAR fixed_vars[]= {
 
1816
  {"log_slave_updates",       (char*) &opt_log_slave_updates,       SHOW_MY_BOOL},
 
1817
  {"relay_log" , (char*) &opt_relay_logname, SHOW_CHAR_PTR},
 
1818
  {"relay_log_index", (char*) &opt_relaylog_index_name, SHOW_CHAR_PTR},
 
1819
  {"relay_log_info_file", (char*) &relay_log_info_file, SHOW_CHAR_PTR},
 
1820
  {"relay_log_space_limit",   (char*) &relay_log_space_limit,       SHOW_LONGLONG},
 
1821
  {"slave_load_tmpdir",       (char*) &slave_load_tmpdir,           SHOW_CHAR_PTR},
 
1822
  {"slave_skip_errors",       (char*) &show_slave_skip_errors,      SHOW_FUNC},
 
1823
};
 
1824
 
 
1825
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff)
 
1826
{
 
1827
  var->type=SHOW_CHAR;
 
1828
  var->value= buff;
 
1829
  if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
 
1830
  {
 
1831
    var->value= const_cast<char *>("OFF");
 
1832
  }
 
1833
  else if (bitmap_is_set_all(&slave_error_mask))
 
1834
  {
 
1835
    var->value= const_cast<char *>("ALL");
 
1836
  }
 
1837
  else
 
1838
  {
 
1839
    /* 10 is enough assuming errors are max 4 digits */
 
1840
    int i;
 
1841
    var->value= buff;
 
1842
    for (i= 1;
 
1843
         i < MAX_SLAVE_ERROR &&
 
1844
         (buff - var->value) < SHOW_VAR_FUNC_BUFF_SIZE;
 
1845
         i++)
 
1846
    {
 
1847
      if (bitmap_is_set(&slave_error_mask, i))
 
1848
      {
 
1849
        buff= int10_to_str(i, buff, 10);
 
1850
        *buff++= ',';
 
1851
      }
 
1852
    }
 
1853
    if (var->value != buff)
 
1854
      buff--;                           // Remove last ','
 
1855
    if (i < MAX_SLAVE_ERROR)
 
1856
      buff= strmov(buff, "...");  // Couldn't show all errors
 
1857
    *buff=0;
 
1858
  }
 
1859
  return 0;
 
1860
}
 
1861
 
 
1862
bool sys_var_slave_skip_counter::check(THD *thd, set_var *var)
 
1863
{
 
1864
  int result= 0;
 
1865
  pthread_mutex_lock(&LOCK_active_mi);
 
1866
  pthread_mutex_lock(&active_mi->rli.run_lock);
 
1867
  if (active_mi->rli.slave_running)
 
1868
  {
 
1869
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
 
1870
    result=1;
 
1871
  }
 
1872
  pthread_mutex_unlock(&active_mi->rli.run_lock);
 
1873
  pthread_mutex_unlock(&LOCK_active_mi);
 
1874
  var->save_result.ulong_value= (ulong) var->value->val_int();
 
1875
  return result;
 
1876
}
 
1877
 
 
1878
 
 
1879
bool sys_var_slave_skip_counter::update(THD *thd, set_var *var)
 
1880
{
 
1881
  pthread_mutex_lock(&LOCK_active_mi);
 
1882
  pthread_mutex_lock(&active_mi->rli.run_lock);
 
1883
  /*
 
1884
    The following test should normally never be true as we test this
 
1885
    in the check function;  To be safe against multiple
 
1886
    SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
 
1887
  */
 
1888
  if (!active_mi->rli.slave_running)
 
1889
  {
 
1890
    pthread_mutex_lock(&active_mi->rli.data_lock);
 
1891
    active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
 
1892
    pthread_mutex_unlock(&active_mi->rli.data_lock);
 
1893
  }
 
1894
  pthread_mutex_unlock(&active_mi->rli.run_lock);
 
1895
  pthread_mutex_unlock(&LOCK_active_mi);
 
1896
  return 0;
 
1897
}
 
1898
 
 
1899
 
 
1900
bool sys_var_sync_binlog_period::update(THD *thd, set_var *var)
 
1901
{
 
1902
  sync_binlog_period= (ulong) var->save_result.ulonglong_value;
 
1903
  return 0;
 
1904
}
 
1905
 
 
1906
int init_replication_sys_vars()
 
1907
{
 
1908
  mysql_append_static_vars(fixed_vars, sizeof(fixed_vars) / sizeof(SHOW_VAR));
 
1909
 
 
1910
  if (mysql_add_sys_var_chain(vars.first, my_long_options))
 
1911
  {
 
1912
    /* should not happen */
 
1913
    fprintf(stderr, "failed to initialize replication system variables");
 
1914
    unireg_abort(1);
 
1915
  }
 
1916
  return 0;
 
1917
}
 
1918
 
 
1919
#endif /* HAVE_REPLICATION */
 
1920
 
 
1921