~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to sql/sql_repl.cc

  • Committer: Olaf van der Spek
  • Date: 2011-08-13 15:08:14 UTC
  • mto: This revision was merged to the branch mainline in revision 2407.
  • Revision ID: olafvdspek@gmail.com-20110813150814-x12xd0c230a9bgtb
Refactor

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2000-2006 MySQL AB & Sasha
2
 
 
3
 
   This program is free software; you can redistribute it and/or modify
4
 
   it under the terms of the GNU General Public License as published by
5
 
   the Free Software Foundation; version 2 of the License.
6
 
 
7
 
   This program is distributed in the hope that it will be useful,
8
 
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 
   GNU General Public License for more details.
11
 
 
12
 
   You should have received a copy of the GNU General Public License
13
 
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
 
 
16
 
#include "mysql_priv.h"
17
 
#ifdef HAVE_REPLICATION
18
 
 
19
 
#include "rpl_mi.h"
20
 
#include "sql_repl.h"
21
 
#include "log_event.h"
22
 
#include "rpl_filter.h"
23
 
#include <my_dir.h>
24
 
 
25
 
int max_binlog_dump_events = 0; // unlimited
26
 
my_bool opt_sporadic_binlog_dump_fail = 0;
27
 
#ifndef DBUG_OFF
28
 
static int binlog_dump_count = 0;
29
 
#endif
30
 
 
31
 
/*
32
 
    fake_rotate_event() builds a fake (=which does not exist physically in any
33
 
    binlog) Rotate event, which contains the name of the binlog we are going to
34
 
    send to the slave (because the slave may not know it if it just asked for
35
 
    MASTER_LOG_FILE='', MASTER_LOG_POS=4).
36
 
    < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
37
 
    After this version we always call it, so that a 3.23.58 slave can rely on
38
 
    it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
39
 
    zeros in the good positions which, by chance, make it possible for the 3.23
40
 
    slave to detect that this event is unexpected) (this is luck which happens
41
 
    because the master and slave disagree on the size of the header of
42
 
    Log_event).
43
 
 
44
 
    Relying on the event length of the Rotate event instead of these
45
 
    well-placed zeros was not possible as Rotate events have a variable-length
46
 
    part.
47
 
*/
48
 
 
49
 
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
50
 
                             ulonglong position, const char** errmsg)
51
 
{
52
 
  DBUG_ENTER("fake_rotate_event");
53
 
  char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
54
 
  /*
55
 
    'when' (the timestamp) is set to 0 so that slave could distinguish between
56
 
    real and fake Rotate events (if necessary)
57
 
  */
58
 
  memset(header, 0, 4);
59
 
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
60
 
 
61
 
  char* p = log_file_name+dirname_length(log_file_name);
62
 
  uint ident_len = (uint) strlen(p);
63
 
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
64
 
  int4store(header + SERVER_ID_OFFSET, server_id);
65
 
  int4store(header + EVENT_LEN_OFFSET, event_len);
66
 
  int2store(header + FLAGS_OFFSET, 0);
67
 
 
68
 
  // TODO: check what problems this may cause and fix them
69
 
  int4store(header + LOG_POS_OFFSET, 0);
70
 
 
71
 
  packet->append(header, sizeof(header));
72
 
  int8store(buf+R_POS_OFFSET,position);
73
 
  packet->append(buf, ROTATE_HEADER_LEN);
74
 
  packet->append(p,ident_len);
75
 
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
76
 
  {
77
 
    *errmsg = "failed on my_net_write()";
78
 
    DBUG_RETURN(-1);
79
 
  }
80
 
  DBUG_RETURN(0);
81
 
}
82
 
 
83
 
static int send_file(THD *thd)
84
 
{
85
 
  NET* net = &thd->net;
86
 
  int fd = -1, error = 1;
87
 
  size_t bytes;
88
 
  char fname[FN_REFLEN+1];
89
 
  const char *errmsg = 0;
90
 
  int old_timeout;
91
 
  unsigned long packet_len;
92
 
  uchar buf[IO_SIZE];                           // It's safe to alloc this
93
 
  DBUG_ENTER("send_file");
94
 
 
95
 
  /*
96
 
    The client might be slow loading the data, give him wait_timeout to do
97
 
    the job
98
 
  */
99
 
  old_timeout= net->read_timeout;
100
 
  my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
101
 
 
102
 
  /*
103
 
    We need net_flush here because the client will not know it needs to send
104
 
    us the file name until it has processed the load event entry
105
 
  */
106
 
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
107
 
  {
108
 
    errmsg = "while reading file name";
109
 
    goto err;
110
 
  }
111
 
 
112
 
  // terminate with \0 for fn_format
113
 
  *((char*)net->read_pos +  packet_len) = 0;
114
 
  fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
115
 
  // this is needed to make replicate-ignore-db
116
 
  if (!strcmp(fname,"/dev/null"))
117
 
    goto end;
118
 
 
119
 
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
120
 
  {
121
 
    errmsg = "on open of file";
122
 
    goto err;
123
 
  }
124
 
 
125
 
  while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0)
126
 
  {
127
 
    if (my_net_write(net, buf, bytes))
128
 
    {
129
 
      errmsg = "while writing data to client";
130
 
      goto err;
131
 
    }
132
 
  }
133
 
 
134
 
 end:
135
 
  if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
136
 
      (my_net_read(net) == packet_error))
137
 
  {
138
 
    errmsg = "while negotiating file transfer close";
139
 
    goto err;
140
 
  }
141
 
  error = 0;
142
 
 
143
 
 err:
144
 
  my_net_set_read_timeout(net, old_timeout);
145
 
  if (fd >= 0)
146
 
    (void) my_close(fd, MYF(0));
147
 
  if (errmsg)
148
 
  {
149
 
    sql_print_error("Failed in send_file() %s", errmsg);
150
 
    DBUG_PRINT("error", (errmsg));
151
 
  }
152
 
  DBUG_RETURN(error);
153
 
}
154
 
 
155
 
 
156
 
/*
157
 
  Adjust the position pointer in the binary log file for all running slaves
158
 
 
159
 
  SYNOPSIS
160
 
    adjust_linfo_offsets()
161
 
    purge_offset        Number of bytes removed from start of log index file
162
 
 
163
 
  NOTES
164
 
    - This is called when doing a PURGE when we delete lines from the
165
 
      index log file
166
 
 
167
 
  REQUIREMENTS
168
 
    - Before calling this function, we have to ensure that no threads are
169
 
      using any binary log file before purge_offset.a
170
 
 
171
 
  TODO
172
 
    - Inform the slave threads that they should sync the position
173
 
      in the binary log file with flush_relay_log_info.
174
 
      Now they sync is done for next read.
175
 
*/
176
 
 
177
 
void adjust_linfo_offsets(my_off_t purge_offset)
178
 
{
179
 
  THD *tmp;
180
 
 
181
 
  pthread_mutex_lock(&LOCK_thread_count);
182
 
  I_List_iterator<THD> it(threads);
183
 
 
184
 
  while ((tmp=it++))
185
 
  {
186
 
    LOG_INFO* linfo;
187
 
    if ((linfo = tmp->current_linfo))
188
 
    {
189
 
      pthread_mutex_lock(&linfo->lock);
190
 
      /*
191
 
        Index file offset can be less that purge offset only if
192
 
        we just started reading the index file. In that case
193
 
        we have nothing to adjust
194
 
      */
195
 
      if (linfo->index_file_offset < purge_offset)
196
 
        linfo->fatal = (linfo->index_file_offset != 0);
197
 
      else
198
 
        linfo->index_file_offset -= purge_offset;
199
 
      pthread_mutex_unlock(&linfo->lock);
200
 
    }
201
 
  }
202
 
  pthread_mutex_unlock(&LOCK_thread_count);
203
 
}
204
 
 
205
 
 
206
 
bool log_in_use(const char* log_name)
207
 
{
208
 
  int log_name_len = strlen(log_name) + 1;
209
 
  THD *tmp;
210
 
  bool result = 0;
211
 
 
212
 
  pthread_mutex_lock(&LOCK_thread_count);
213
 
  I_List_iterator<THD> it(threads);
214
 
 
215
 
  while ((tmp=it++))
216
 
  {
217
 
    LOG_INFO* linfo;
218
 
    if ((linfo = tmp->current_linfo))
219
 
    {
220
 
      pthread_mutex_lock(&linfo->lock);
221
 
      result = !bcmp((uchar*) log_name, (uchar*) linfo->log_file_name,
222
 
                     log_name_len);
223
 
      pthread_mutex_unlock(&linfo->lock);
224
 
      if (result)
225
 
        break;
226
 
    }
227
 
  }
228
 
 
229
 
  pthread_mutex_unlock(&LOCK_thread_count);
230
 
  return result;
231
 
}
232
 
 
233
 
bool purge_error_message(THD* thd, int res)
234
 
{
235
 
  uint errmsg= 0;
236
 
 
237
 
  switch (res)  {
238
 
  case 0: break;
239
 
  case LOG_INFO_EOF:    errmsg= ER_UNKNOWN_TARGET_BINLOG; break;
240
 
  case LOG_INFO_IO:     errmsg= ER_IO_ERR_LOG_INDEX_READ; break;
241
 
  case LOG_INFO_INVALID:errmsg= ER_BINLOG_PURGE_PROHIBITED; break;
242
 
  case LOG_INFO_SEEK:   errmsg= ER_FSEEK_FAIL; break;
243
 
  case LOG_INFO_MEM:    errmsg= ER_OUT_OF_RESOURCES; break;
244
 
  case LOG_INFO_FATAL:  errmsg= ER_BINLOG_PURGE_FATAL_ERR; break;
245
 
  case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break;
246
 
  case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break;
247
 
  default:              errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break;
248
 
  }
249
 
 
250
 
  if (errmsg)
251
 
  {
252
 
    my_message(errmsg, ER(errmsg), MYF(0));
253
 
    return TRUE;
254
 
  }
255
 
  my_ok(thd);
256
 
  return FALSE;
257
 
}
258
 
 
259
 
 
260
 
bool purge_master_logs(THD* thd, const char* to_log)
261
 
{
262
 
  char search_file_name[FN_REFLEN];
263
 
  if (!mysql_bin_log.is_open())
264
 
  {
265
 
    my_ok(thd);
266
 
    return FALSE;
267
 
  }
268
 
 
269
 
  mysql_bin_log.make_log_name(search_file_name, to_log);
270
 
  return purge_error_message(thd,
271
 
                             mysql_bin_log.purge_logs(search_file_name, 0, 1,
272
 
                                                      1, NULL));
273
 
}
274
 
 
275
 
 
276
 
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
277
 
{
278
 
  if (!mysql_bin_log.is_open())
279
 
  {
280
 
    my_ok(thd);
281
 
    return 0;
282
 
  }
283
 
  return purge_error_message(thd,
284
 
                             mysql_bin_log.purge_logs_before_date(purge_time));
285
 
}
286
 
 
287
 
int test_for_non_eof_log_read_errors(int error, const char **errmsg)
288
 
{
289
 
  if (error == LOG_READ_EOF)
290
 
    return 0;
291
 
  my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
292
 
  switch (error) {
293
 
  case LOG_READ_BOGUS:
294
 
    *errmsg = "bogus data in log event";
295
 
    break;
296
 
  case LOG_READ_TOO_LARGE:
297
 
    *errmsg = "log event entry exceeded max_allowed_packet; \
298
 
Increase max_allowed_packet on master";
299
 
    break;
300
 
  case LOG_READ_IO:
301
 
    *errmsg = "I/O error reading log event";
302
 
    break;
303
 
  case LOG_READ_MEM:
304
 
    *errmsg = "memory allocation failed reading log event";
305
 
    break;
306
 
  case LOG_READ_TRUNC:
307
 
    *errmsg = "binlog truncated in the middle of event";
308
 
    break;
309
 
  default:
310
 
    *errmsg = "unknown error reading log event on the master";
311
 
    break;
312
 
  }
313
 
  return error;
314
 
}
315
 
 
316
 
 
317
 
/**
318
 
  An auxiliary function for calling in mysql_binlog_send
319
 
  to initialize the heartbeat timeout in waiting for a binlogged event.
320
 
 
321
 
  @param[in]    thd  THD to access a user variable
322
 
 
323
 
  @return        heartbeat period an ulonglong of nanoseconds
324
 
                 or zero if heartbeat was not demanded by slave
325
 
*/ 
326
 
static ulonglong get_heartbeat_period(THD * thd)
327
 
{
328
 
  my_bool null_value;
329
 
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
330
 
  user_var_entry *entry= 
331
 
    (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
332
 
                                  name.length);
333
 
  return entry? entry->val_int(&null_value) : 0;
334
 
}
335
 
 
336
 
/*
337
 
  Function prepares and sends repliation heartbeat event.
338
 
 
339
 
  @param net                net object of THD
340
 
  @param packet             buffer to store the heartbeat instance
341
 
  @param event_coordinates  binlog file name and position of the last
342
 
                            real event master sent from binlog
343
 
 
344
 
  @note 
345
 
    Among three essential pieces of heartbeat data Log_event::when
346
 
    is computed locally.
347
 
    The  error to send is serious and should force terminating
348
 
    the dump thread.
349
 
*/
350
 
static int send_heartbeat_event(NET* net, String* packet,
351
 
                                const struct event_coordinates *coord)
352
 
{
353
 
  DBUG_ENTER("send_heartbeat_event");
354
 
  char header[LOG_EVENT_HEADER_LEN];
355
 
  /*
356
 
    'when' (the timestamp) is set to 0 so that slave could distinguish between
357
 
    real and fake Rotate events (if necessary)
358
 
  */
359
 
  memset(header, 0, 4);  // when
360
 
 
361
 
  header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
362
 
 
363
 
  char* p= coord->file_name + dirname_length(coord->file_name);
364
 
 
365
 
  uint ident_len = strlen(p);
366
 
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
367
 
  int4store(header + SERVER_ID_OFFSET, server_id);
368
 
  int4store(header + EVENT_LEN_OFFSET, event_len);
369
 
  int2store(header + FLAGS_OFFSET, 0);
370
 
 
371
 
  int4store(header + LOG_POS_OFFSET, coord->pos);  // log_pos
372
 
 
373
 
  packet->append(header, sizeof(header));
374
 
  packet->append(p, ident_len);             // log_file_name
375
 
 
376
 
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
377
 
      net_flush(net))
378
 
  {
379
 
    DBUG_RETURN(-1);
380
 
  }
381
 
  packet->set("\0", 1, &my_charset_bin);
382
 
  DBUG_RETURN(0);
383
 
}
384
 
 
385
 
/*
386
 
  TODO: Clean up loop to only have one call to send_file()
387
 
*/
388
 
 
389
 
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
390
 
                       ushort flags)
391
 
{
392
 
  LOG_INFO linfo;
393
 
  char *log_file_name = linfo.log_file_name;
394
 
  char search_file_name[FN_REFLEN], *name;
395
 
  IO_CACHE log;
396
 
  File file = -1;
397
 
  String* packet = &thd->packet;
398
 
  int error;
399
 
  const char *errmsg = "Unknown error";
400
 
  NET* net = &thd->net;
401
 
  pthread_mutex_t *log_lock;
402
 
  bool binlog_can_be_corrupted= FALSE;
403
 
#ifndef DBUG_OFF
404
 
  int left_events = max_binlog_dump_events;
405
 
#endif
406
 
  DBUG_ENTER("mysql_binlog_send");
407
 
  DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));
408
 
 
409
 
  bzero((char*) &log,sizeof(log));
410
 
  /* 
411
 
     heartbeat_period from @master_heartbeat_period user variable
412
 
  */
413
 
  ulonglong heartbeat_period= get_heartbeat_period(thd);
414
 
  struct timespec heartbeat_buf;
415
 
  struct event_coordinates coord_buf;
416
 
  struct timespec *heartbeat_ts= NULL;
417
 
  struct event_coordinates *coord= NULL;
418
 
  if (heartbeat_period != LL(0))
419
 
  {
420
 
    heartbeat_ts= &heartbeat_buf;
421
 
    set_timespec_nsec(*heartbeat_ts, 0);
422
 
    coord= &coord_buf;
423
 
    coord->file_name= log_file_name; // initialization basing on what slave remembers
424
 
    coord->pos= pos;
425
 
  }
426
 
#ifndef DBUG_OFF
427
 
  if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
428
 
  {
429
 
    errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover";
430
 
    my_errno= ER_UNKNOWN_ERROR;
431
 
    goto err;
432
 
  }
433
 
#endif
434
 
 
435
 
  if (!mysql_bin_log.is_open())
436
 
  {
437
 
    errmsg = "Binary log is not open";
438
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
439
 
    goto err;
440
 
  }
441
 
  if (!server_id_supplied)
442
 
  {
443
 
    errmsg = "Misconfigured master - server id was not set";
444
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
445
 
    goto err;
446
 
  }
447
 
 
448
 
  name=search_file_name;
449
 
  if (log_ident[0])
450
 
    mysql_bin_log.make_log_name(search_file_name, log_ident);
451
 
  else
452
 
    name=0;                                     // Find first log
453
 
 
454
 
  linfo.index_file_offset = 0;
455
 
 
456
 
  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
457
 
  {
458
 
    errmsg = "Could not find first log file name in binary log index file";
459
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
460
 
    goto err;
461
 
  }
462
 
 
463
 
  pthread_mutex_lock(&LOCK_thread_count);
464
 
  thd->current_linfo = &linfo;
465
 
  pthread_mutex_unlock(&LOCK_thread_count);
466
 
 
467
 
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
468
 
  {
469
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
470
 
    goto err;
471
 
  }
472
 
  if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
473
 
  {
474
 
    errmsg= "Client requested master to start replication from \
475
 
impossible position";
476
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
477
 
    goto err;
478
 
  }
479
 
 
480
 
  /*
481
 
    We need to start a packet with something other than 255
482
 
    to distinguish it from error
483
 
  */
484
 
  packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
485
 
 
486
 
  /*
487
 
    Tell the client about the log name with a fake Rotate event;
488
 
    this is needed even if we also send a Format_description_log_event
489
 
    just after, because that event does not contain the binlog's name.
490
 
    Note that as this Rotate event is sent before
491
 
    Format_description_log_event, the slave cannot have any info to
492
 
    understand this event's format, so the header len of
493
 
    Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
494
 
    than other events except FORMAT_DESCRIPTION_EVENT).
495
 
    Before 4.0.14 we called fake_rotate_event below only if (pos ==
496
 
    BIN_LOG_HEADER_SIZE), because if this is false then the slave
497
 
    already knows the binlog's name.
498
 
    Since, we always call fake_rotate_event; if the slave already knew
499
 
    the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
500
 
    useless but does not harm much. It is nice for 3.23 (>=.58) slaves
501
 
    which test Rotate events to see if the master is 4.0 (then they
502
 
    choose to stop because they can't replicate 4.0); by always calling
503
 
    fake_rotate_event we are sure that 3.23.58 and newer will detect the
504
 
    problem as soon as replication starts (BUG#198).
505
 
    Always calling fake_rotate_event makes sending of normal
506
 
    (=from-binlog) Rotate events a priori unneeded, but it is not so
507
 
    simple: the 2 Rotate events are not equivalent, the normal one is
508
 
    before the Stop event, the fake one is after. If we don't send the
509
 
    normal one, then the Stop event will be interpreted (by existing 4.0
510
 
    slaves) as "the master stopped", which is wrong. So for safety,
511
 
    given that we want minimum modification of 4.0, we send the normal
512
 
    and fake Rotates.
513
 
  */
514
 
  if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
515
 
  {
516
 
    /*
517
 
       This error code is not perfect, as fake_rotate_event() does not
518
 
       read anything from the binlog; if it fails it's because of an
519
 
       error in my_net_write(), fortunately it will say so in errmsg.
520
 
    */
521
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
522
 
    goto err;
523
 
  }
524
 
  packet->set("\0", 1, &my_charset_bin);
525
 
  /*
526
 
    Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
527
 
    this larger than the corresponding packet (query) sent 
528
 
    from client to master.
529
 
  */
530
 
  thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
531
 
 
532
 
  /*
533
 
    We can set log_lock now, it does not move (it's a member of
534
 
    mysql_bin_log, and it's already inited, and it will be destroyed
535
 
    only at shutdown).
536
 
  */
537
 
  log_lock = mysql_bin_log.get_log_lock();
538
 
  if (pos > BIN_LOG_HEADER_SIZE)
539
 
  {
540
 
     /*
541
 
       Try to find a Format_description_log_event at the beginning of
542
 
       the binlog
543
 
     */
544
 
     if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
545
 
     {
546
 
       /*
547
 
         The packet has offsets equal to the normal offsets in a binlog
548
 
         event +1 (the first character is \0).
549
 
       */
550
 
       DBUG_PRINT("info",
551
 
                  ("Looked for a Format_description_log_event, found event type %d",
552
 
                   (*packet)[EVENT_TYPE_OFFSET+1]));
553
 
       if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
554
 
       {
555
 
         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
556
 
                                       LOG_EVENT_BINLOG_IN_USE_F);
557
 
         (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
558
 
         /*
559
 
           mark that this event with "log_pos=0", so the slave
560
 
           should not increment master's binlog position
561
 
           (rli->group_master_log_pos)
562
 
         */
563
 
         int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
564
 
         /*
565
 
           if reconnect master sends FD event with `created' as 0
566
 
           to avoid destroying temp tables.
567
 
          */
568
 
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
569
 
                   ST_CREATED_OFFSET+1, (ulong) 0);
570
 
         /* send it */
571
 
         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
572
 
         {
573
 
           errmsg = "Failed on my_net_write()";
574
 
           my_errno= ER_UNKNOWN_ERROR;
575
 
           goto err;
576
 
         }
577
 
 
578
 
         /*
579
 
           No need to save this event. We are only doing simple reads
580
 
           (no real parsing of the events) so we don't need it. And so
581
 
           we don't need the artificial Format_description_log_event of
582
 
           3.23&4.x.
583
 
         */
584
 
       }
585
 
     }
586
 
     else
587
 
     {
588
 
       if (test_for_non_eof_log_read_errors(error, &errmsg))
589
 
         goto err;
590
 
       /*
591
 
         It's EOF, nothing to do, go on reading next events, the
592
 
         Format_description_log_event will be found naturally if it is written.
593
 
       */
594
 
     }
595
 
     /* reset the packet as we wrote to it in any case */
596
 
     packet->set("\0", 1, &my_charset_bin);
597
 
  } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
598
 
  else
599
 
  {
600
 
    /* The Format_description_log_event event will be found naturally. */
601
 
  }
602
 
 
603
 
  /* seek to the requested position, to start the requested dump */
604
 
  my_b_seek(&log, pos);                 // Seek will done on next read
605
 
 
606
 
  while (!net->error && net->vio != 0 && !thd->killed)
607
 
  {
608
 
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
609
 
    {
610
 
#ifndef DBUG_OFF
611
 
      if (max_binlog_dump_events && !left_events--)
612
 
      {
613
 
        net_flush(net);
614
 
        errmsg = "Debugging binlog dump abort";
615
 
        my_errno= ER_UNKNOWN_ERROR;
616
 
        goto err;
617
 
      }
618
 
#endif
619
 
      /*
620
 
        log's filename does not change while it's active
621
 
      */
622
 
      if (coord)
623
 
        coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
624
 
 
625
 
      if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
626
 
      {
627
 
        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
628
 
                                      LOG_EVENT_BINLOG_IN_USE_F);
629
 
        (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
630
 
      }
631
 
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
632
 
        binlog_can_be_corrupted= FALSE;
633
 
 
634
 
      if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
635
 
      {
636
 
        errmsg = "Failed on my_net_write()";
637
 
        my_errno= ER_UNKNOWN_ERROR;
638
 
        goto err;
639
 
      }
640
 
 
641
 
      DBUG_PRINT("info", ("log event code %d",
642
 
                          (*packet)[LOG_EVENT_OFFSET+1] ));
643
 
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
644
 
      {
645
 
        if (send_file(thd))
646
 
        {
647
 
          errmsg = "failed in send_file()";
648
 
          my_errno= ER_UNKNOWN_ERROR;
649
 
          goto err;
650
 
        }
651
 
      }
652
 
      packet->set("\0", 1, &my_charset_bin);
653
 
    }
654
 
 
655
 
    /*
656
 
      here we were reading binlog that was not closed properly (as a result
657
 
      of a crash ?). treat any corruption as EOF
658
 
    */
659
 
    if (binlog_can_be_corrupted && error != LOG_READ_MEM)
660
 
      error=LOG_READ_EOF;
661
 
    /*
662
 
      TODO: now that we are logging the offset, check to make sure
663
 
      the recorded offset and the actual match.
664
 
      Guilhem 2003-06: this is not true if this master is a slave
665
 
      <4.0.15 running with --log-slave-updates, because then log_pos may
666
 
      be the offset in the-master-of-this-master's binlog.
667
 
    */
668
 
    if (test_for_non_eof_log_read_errors(error, &errmsg))
669
 
      goto err;
670
 
 
671
 
    if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
672
 
        mysql_bin_log.is_active(log_file_name))
673
 
    {
674
 
      /*
675
 
        Block until there is more data in the log
676
 
      */
677
 
      if (net_flush(net))
678
 
      {
679
 
        errmsg = "failed on net_flush()";
680
 
        my_errno= ER_UNKNOWN_ERROR;
681
 
        goto err;
682
 
      }
683
 
 
684
 
      /*
685
 
        We may have missed the update broadcast from the log
686
 
        that has just happened, let's try to catch it if it did.
687
 
        If we did not miss anything, we just wait for other threads
688
 
        to signal us.
689
 
      */
690
 
      {
691
 
        log.error=0;
692
 
        bool read_packet = 0, fatal_error = 0;
693
 
 
694
 
#ifndef DBUG_OFF
695
 
        if (max_binlog_dump_events && !left_events--)
696
 
        {
697
 
          errmsg = "Debugging binlog dump abort";
698
 
          my_errno= ER_UNKNOWN_ERROR;
699
 
          goto err;
700
 
        }
701
 
#endif
702
 
 
703
 
        /*
704
 
          No one will update the log while we are reading
705
 
          now, but we'll be quick and just read one record
706
 
 
707
 
          TODO:
708
 
          Add an counter that is incremented for each time we update the
709
 
          binary log.  We can avoid the following read if the counter
710
 
          has not been updated since last read.
711
 
        */
712
 
 
713
 
        pthread_mutex_lock(log_lock);
714
 
        switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
715
 
        case 0:
716
 
          /* we read successfully, so we'll need to send it to the slave */
717
 
          pthread_mutex_unlock(log_lock);
718
 
          read_packet = 1;
719
 
          if (coord)
720
 
            coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
721
 
          break;
722
 
 
723
 
        case LOG_READ_EOF:
724
 
        {
725
 
          int ret;
726
 
          DBUG_PRINT("wait",("waiting for data in binary log"));
727
 
          if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
728
 
          {
729
 
            pthread_mutex_unlock(log_lock);
730
 
            goto end;
731
 
          }
732
 
 
733
 
#ifndef DBUG_OFF
734
 
          ulong hb_info_counter= 0;
735
 
#endif
736
 
          do 
737
 
          {
738
 
            if (coord)
739
 
            {
740
 
              DBUG_ASSERT(heartbeat_ts && heartbeat_period != LL(0));
741
 
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
742
 
            }
743
 
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
744
 
            DBUG_ASSERT(ret == 0 || heartbeat_period != LL(0) && coord != NULL);
745
 
            if (ret == ETIMEDOUT || ret == ETIME)
746
 
            {
747
 
#ifndef DBUG_OFF
748
 
              if (hb_info_counter < 3)
749
 
              {
750
 
                sql_print_information("master sends heartbeat message");
751
 
                hb_info_counter++;
752
 
                if (hb_info_counter == 3)
753
 
                  sql_print_information("the rest of heartbeat info skipped ...");
754
 
              }
755
 
#endif
756
 
              if (send_heartbeat_event(net, packet, coord))
757
 
              {
758
 
                errmsg = "Failed on my_net_write()";
759
 
                my_errno= ER_UNKNOWN_ERROR;
760
 
                pthread_mutex_unlock(log_lock);
761
 
                goto err;
762
 
              }
763
 
            }
764
 
            else
765
 
            {
766
 
              DBUG_ASSERT(ret == 0);
767
 
              DBUG_PRINT("wait",("binary log received update"));
768
 
            }
769
 
          } while (ret != 0 && coord != NULL && !thd->killed);
770
 
          pthread_mutex_unlock(log_lock);
771
 
        }    
772
 
        break;
773
 
            
774
 
        default:
775
 
          pthread_mutex_unlock(log_lock);
776
 
          fatal_error = 1;
777
 
          break;
778
 
        }
779
 
 
780
 
        if (read_packet)
781
 
        {
782
 
          thd_proc_info(thd, "Sending binlog event to slave");
783
 
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
784
 
          {
785
 
            errmsg = "Failed on my_net_write()";
786
 
            my_errno= ER_UNKNOWN_ERROR;
787
 
            goto err;
788
 
          }
789
 
 
790
 
          if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
791
 
          {
792
 
            if (send_file(thd))
793
 
            {
794
 
              errmsg = "failed in send_file()";
795
 
              my_errno= ER_UNKNOWN_ERROR;
796
 
              goto err;
797
 
            }
798
 
          }
799
 
          packet->set("\0", 1, &my_charset_bin);
800
 
          /*
801
 
            No need to net_flush because we will get to flush later when
802
 
            we hit EOF pretty quick
803
 
          */
804
 
        }
805
 
 
806
 
        if (fatal_error)
807
 
        {
808
 
          errmsg = "error reading log entry";
809
 
          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
810
 
          goto err;
811
 
        }
812
 
        log.error=0;
813
 
      }
814
 
    }
815
 
    else
816
 
    {
817
 
      bool loop_breaker = 0;
818
 
      /* need this to break out of the for loop from switch */
819
 
 
820
 
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
821
 
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
822
 
      case LOG_INFO_EOF:
823
 
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
824
 
        break;
825
 
      case 0:
826
 
        break;
827
 
      default:
828
 
        errmsg = "could not find next log";
829
 
        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
830
 
        goto err;
831
 
      }
832
 
 
833
 
      if (loop_breaker)
834
 
        break;
835
 
 
836
 
      end_io_cache(&log);
837
 
      (void) my_close(file, MYF(MY_WME));
838
 
 
839
 
      /*
840
 
        Call fake_rotate_event() in case the previous log (the one which
841
 
        we have just finished reading) did not contain a Rotate event
842
 
        (for example (I don't know any other example) the previous log
843
 
        was the last one before the master was shutdown & restarted).
844
 
        This way we tell the slave about the new log's name and
845
 
        position.  If the binlog is 5.0, the next event we are going to
846
 
        read and send is Format_description_log_event.
847
 
      */
848
 
      if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
849
 
          fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
850
 
                            &errmsg))
851
 
      {
852
 
        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
853
 
        goto err;
854
 
      }
855
 
 
856
 
      packet->length(0);
857
 
      packet->append('\0');
858
 
      if (coord)
859
 
        coord->file_name= log_file_name; // reset to the next
860
 
    }
861
 
  }
862
 
 
863
 
end:
864
 
  end_io_cache(&log);
865
 
  (void)my_close(file, MYF(MY_WME));
866
 
 
867
 
  my_eof(thd);
868
 
  thd_proc_info(thd, "Waiting to finalize termination");
869
 
  pthread_mutex_lock(&LOCK_thread_count);
870
 
  thd->current_linfo = 0;
871
 
  pthread_mutex_unlock(&LOCK_thread_count);
872
 
  DBUG_VOID_RETURN;
873
 
 
874
 
err:
875
 
  thd_proc_info(thd, "Waiting to finalize termination");
876
 
  end_io_cache(&log);
877
 
  /*
878
 
    Exclude  iteration through thread list
879
 
    this is needed for purge_logs() - it will iterate through
880
 
    thread list and update thd->current_linfo->index_file_offset
881
 
    this mutex will make sure that it never tried to update our linfo
882
 
    after we return from this stack frame
883
 
  */
884
 
  pthread_mutex_lock(&LOCK_thread_count);
885
 
  thd->current_linfo = 0;
886
 
  pthread_mutex_unlock(&LOCK_thread_count);
887
 
  if (file >= 0)
888
 
    (void) my_close(file, MYF(MY_WME));
889
 
 
890
 
  my_message(my_errno, errmsg, MYF(0));
891
 
  DBUG_VOID_RETURN;
892
 
}
893
 
 
894
 
int start_slave(THD* thd , Master_info* mi,  bool net_report)
895
 
{
896
 
  int slave_errno= 0;
897
 
  int thread_mask;
898
 
  DBUG_ENTER("start_slave");
899
 
 
900
 
  lock_slave_threads(mi);  // this allows us to cleanly read slave_running
901
 
  // Get a mask of _stopped_ threads
902
 
  init_thread_mask(&thread_mask,mi,1 /* inverse */);
903
 
  /*
904
 
    Below we will start all stopped threads.  But if the user wants to
905
 
    start only one thread, do as if the other thread was running (as we
906
 
    don't wan't to touch the other thread), so set the bit to 0 for the
907
 
    other thread
908
 
  */
909
 
  if (thd->lex->slave_thd_opt)
910
 
    thread_mask&= thd->lex->slave_thd_opt;
911
 
  if (thread_mask) //some threads are stopped, start them
912
 
  {
913
 
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
914
 
                         thread_mask))
915
 
      slave_errno=ER_MASTER_INFO;
916
 
    else if (server_id_supplied && *mi->host)
917
 
    {
918
 
      /*
919
 
        If we will start SQL thread we will care about UNTIL options If
920
 
        not and they are specified we will ignore them and warn user
921
 
        about this fact.
922
 
      */
923
 
      if (thread_mask & SLAVE_SQL)
924
 
      {
925
 
        pthread_mutex_lock(&mi->rli.data_lock);
926
 
 
927
 
        if (thd->lex->mi.pos)
928
 
        {
929
 
          mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
930
 
          mi->rli.until_log_pos= thd->lex->mi.pos;
931
 
          /*
932
 
             We don't check thd->lex->mi.log_file_name for NULL here
933
 
             since it is checked in sql_yacc.yy
934
 
          */
935
 
          strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
936
 
                  sizeof(mi->rli.until_log_name)-1);
937
 
        }
938
 
        else if (thd->lex->mi.relay_log_pos)
939
 
        {
940
 
          mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
941
 
          mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
942
 
          strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
943
 
                  sizeof(mi->rli.until_log_name)-1);
944
 
        }
945
 
        else
946
 
          mi->rli.clear_until_condition();
947
 
 
948
 
        if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
949
 
        {
950
 
          /* Preparing members for effective until condition checking */
951
 
          const char *p= fn_ext(mi->rli.until_log_name);
952
 
          char *p_end;
953
 
          if (*p)
954
 
          {
955
 
            //p points to '.'
956
 
            mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
957
 
            /*
958
 
              p_end points to the first invalid character. If it equals
959
 
              to p, no digits were found, error. If it contains '\0' it
960
 
              means  conversion went ok.
961
 
            */
962
 
            if (p_end==p || *p_end)
963
 
              slave_errno=ER_BAD_SLAVE_UNTIL_COND;
964
 
          }
965
 
          else
966
 
            slave_errno=ER_BAD_SLAVE_UNTIL_COND;
967
 
 
968
 
          /* mark the cached result of the UNTIL comparison as "undefined" */
969
 
          mi->rli.until_log_names_cmp_result=
970
 
            Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
971
 
 
972
 
          /* Issuing warning then started without --skip-slave-start */
973
 
          if (!opt_skip_slave_start)
974
 
            push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
975
 
                         ER_MISSING_SKIP_SLAVE,
976
 
                         ER(ER_MISSING_SKIP_SLAVE));
977
 
        }
978
 
 
979
 
        pthread_mutex_unlock(&mi->rli.data_lock);
980
 
      }
981
 
      else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
982
 
        push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
983
 
                     ER(ER_UNTIL_COND_IGNORED));
984
 
 
985
 
      if (!slave_errno)
986
 
        slave_errno = start_slave_threads(0 /*no mutex */,
987
 
                                        1 /* wait for start */,
988
 
                                        mi,
989
 
                                        master_info_file,relay_log_info_file,
990
 
                                        thread_mask);
991
 
    }
992
 
    else
993
 
      slave_errno = ER_BAD_SLAVE;
994
 
  }
995
 
  else
996
 
  {
997
 
    /* no error if all threads are already started, only a warning */
998
 
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
999
 
                 ER(ER_SLAVE_WAS_RUNNING));
1000
 
  }
1001
 
 
1002
 
  unlock_slave_threads(mi);
1003
 
 
1004
 
  if (slave_errno)
1005
 
  {
1006
 
    if (net_report)
1007
 
      my_message(slave_errno, ER(slave_errno), MYF(0));
1008
 
    DBUG_RETURN(1);
1009
 
  }
1010
 
  else if (net_report)
1011
 
    my_ok(thd);
1012
 
 
1013
 
  DBUG_RETURN(0);
1014
 
}
1015
 
 
1016
 
 
1017
 
int stop_slave(THD* thd, Master_info* mi, bool net_report )
1018
 
{
1019
 
  DBUG_ENTER("stop_slave");
1020
 
  
1021
 
  int slave_errno;
1022
 
  if (!thd)
1023
 
    thd = current_thd;
1024
 
 
1025
 
  thd_proc_info(thd, "Killing slave");
1026
 
  int thread_mask;
1027
 
  lock_slave_threads(mi);
1028
 
  // Get a mask of _running_ threads
1029
 
  init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
1030
 
  /*
1031
 
    Below we will stop all running threads.
1032
 
    But if the user wants to stop only one thread, do as if the other thread
1033
 
    was stopped (as we don't wan't to touch the other thread), so set the
1034
 
    bit to 0 for the other thread
1035
 
  */
1036
 
  if (thd->lex->slave_thd_opt)
1037
 
    thread_mask &= thd->lex->slave_thd_opt;
1038
 
 
1039
 
  if (thread_mask)
1040
 
  {
1041
 
    slave_errno= terminate_slave_threads(mi,thread_mask,
1042
 
                                         1 /*skip lock */);
1043
 
  }
1044
 
  else
1045
 
  {
1046
 
    //no error if both threads are already stopped, only a warning
1047
 
    slave_errno= 0;
1048
 
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
1049
 
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
1050
 
  }
1051
 
  unlock_slave_threads(mi);
1052
 
  thd_proc_info(thd, 0);
1053
 
 
1054
 
  if (slave_errno)
1055
 
  {
1056
 
    if (net_report)
1057
 
      my_message(slave_errno, ER(slave_errno), MYF(0));
1058
 
    DBUG_RETURN(1);
1059
 
  }
1060
 
  else if (net_report)
1061
 
    my_ok(thd);
1062
 
 
1063
 
  DBUG_RETURN(0);
1064
 
}
1065
 
 
1066
 
 
1067
 
/*
1068
 
  Remove all relay logs and start replication from the start
1069
 
 
1070
 
  SYNOPSIS
1071
 
    reset_slave()
1072
 
    thd                 Thread handler
1073
 
    mi                  Master info for the slave
1074
 
 
1075
 
  RETURN
1076
 
    0   ok
1077
 
    1   error
1078
 
*/
1079
 
 
1080
 
 
1081
 
int reset_slave(THD *thd, Master_info* mi)
1082
 
{
1083
 
  struct stat stat_area;
1084
 
  char fname[FN_REFLEN];
1085
 
  int thread_mask= 0, error= 0;
1086
 
  uint sql_errno=0;
1087
 
  const char* errmsg=0;
1088
 
  DBUG_ENTER("reset_slave");
1089
 
 
1090
 
  lock_slave_threads(mi);
1091
 
  init_thread_mask(&thread_mask,mi,0 /* not inverse */);
1092
 
  if (thread_mask) // We refuse if any slave thread is running
1093
 
  {
1094
 
    sql_errno= ER_SLAVE_MUST_STOP;
1095
 
    error=1;
1096
 
    goto err;
1097
 
  }
1098
 
 
1099
 
  ha_reset_slave(thd);
1100
 
 
1101
 
  // delete relay logs, clear relay log coordinates
1102
 
  if ((error= purge_relay_logs(&mi->rli, thd,
1103
 
                               1 /* just reset */,
1104
 
                               &errmsg)))
1105
 
    goto err;
1106
 
 
1107
 
  /* Clear master's log coordinates */
1108
 
  init_master_log_pos(mi);
1109
 
  /*
1110
 
     Reset errors (the idea is that we forget about the
1111
 
     old master).
1112
 
  */
1113
 
  mi->rli.clear_error();
1114
 
  mi->rli.clear_until_condition();
1115
 
 
1116
 
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1117
 
  end_master_info(mi);
1118
 
  // and delete these two files
1119
 
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
1120
 
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1121
 
  {
1122
 
    error=1;
1123
 
    goto err;
1124
 
  }
1125
 
  // delete relay_log_info_file
1126
 
  fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
1127
 
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1128
 
  {
1129
 
    error=1;
1130
 
    goto err;
1131
 
  }
1132
 
 
1133
 
err:
1134
 
  unlock_slave_threads(mi);
1135
 
  if (error)
1136
 
    my_error(sql_errno, MYF(0), errmsg);
1137
 
  DBUG_RETURN(error);
1138
 
}
1139
 
 
1140
 
/*
1141
 
 
1142
 
  Kill all Binlog_dump threads which previously talked to the same slave
1143
 
  ("same" means with the same server id). Indeed, if the slave stops, if the
1144
 
  Binlog_dump thread is waiting (pthread_cond_wait) for binlog update, then it
1145
 
  will keep existing until a query is written to the binlog. If the master is
1146
 
  idle, then this could last long, and if the slave reconnects, we could have 2
1147
 
  Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
1148
 
  binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
1149
 
  the master kills any existing thread with the slave's server id (if this id is
1150
 
  not zero; it will be true for real slaves, but false for mysqlbinlog when it
1151
 
  sends COM_BINLOG_DUMP to get a remote binlog dump).
1152
 
 
1153
 
  SYNOPSIS
1154
 
    kill_zombie_dump_threads()
1155
 
    slave_server_id     the slave's server id
1156
 
 
1157
 
*/
1158
 
 
1159
 
 
1160
 
void kill_zombie_dump_threads(uint32 slave_server_id)
1161
 
{
1162
 
  pthread_mutex_lock(&LOCK_thread_count);
1163
 
  I_List_iterator<THD> it(threads);
1164
 
  THD *tmp;
1165
 
 
1166
 
  while ((tmp=it++))
1167
 
  {
1168
 
    if (tmp->command == COM_BINLOG_DUMP &&
1169
 
       tmp->server_id == slave_server_id)
1170
 
    {
1171
 
      pthread_mutex_lock(&tmp->LOCK_delete);    // Lock from delete
1172
 
      break;
1173
 
    }
1174
 
  }
1175
 
  pthread_mutex_unlock(&LOCK_thread_count);
1176
 
  if (tmp)
1177
 
  {
1178
 
    /*
1179
 
      Here we do not call kill_one_thread() as
1180
 
      it will be slow because it will iterate through the list
1181
 
      again. We just to do kill the thread ourselves.
1182
 
    */
1183
 
    tmp->awake(THD::KILL_QUERY);
1184
 
    pthread_mutex_unlock(&tmp->LOCK_delete);
1185
 
  }
1186
 
}
1187
 
 
1188
 
 
1189
 
bool change_master(THD* thd, Master_info* mi)
1190
 
{
1191
 
  int thread_mask;
1192
 
  const char* errmsg= 0;
1193
 
  bool need_relay_log_purge= 1;
1194
 
  DBUG_ENTER("change_master");
1195
 
 
1196
 
  lock_slave_threads(mi);
1197
 
  init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
1198
 
  if (thread_mask) // We refuse if any slave thread is running
1199
 
  {
1200
 
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1201
 
    unlock_slave_threads(mi);
1202
 
    DBUG_RETURN(TRUE);
1203
 
  }
1204
 
 
1205
 
  thd_proc_info(thd, "Changing master");
1206
 
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1207
 
  // TODO: see if needs re-write
1208
 
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
1209
 
                       thread_mask))
1210
 
  {
1211
 
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1212
 
    unlock_slave_threads(mi);
1213
 
    DBUG_RETURN(TRUE);
1214
 
  }
1215
 
 
1216
 
  /*
1217
 
    Data lock not needed since we have already stopped the running threads,
1218
 
    and we have the hold on the run locks which will keep all threads that
1219
 
    could possibly modify the data structures from running
1220
 
  */
1221
 
 
1222
 
  /*
1223
 
    If the user specified host or port without binlog or position,
1224
 
    reset binlog's name to FIRST and position to 4.
1225
 
  */
1226
 
 
1227
 
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1228
 
  {
1229
 
    mi->master_log_name[0] = 0;
1230
 
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
1231
 
  }
1232
 
 
1233
 
  if (lex_mi->log_file_name)
1234
 
    strmake(mi->master_log_name, lex_mi->log_file_name,
1235
 
            sizeof(mi->master_log_name)-1);
1236
 
  if (lex_mi->pos)
1237
 
  {
1238
 
    mi->master_log_pos= lex_mi->pos;
1239
 
  }
1240
 
  DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
1241
 
 
1242
 
  if (lex_mi->host)
1243
 
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1244
 
  if (lex_mi->user)
1245
 
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
1246
 
  if (lex_mi->password)
1247
 
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
1248
 
  if (lex_mi->port)
1249
 
    mi->port = lex_mi->port;
1250
 
  if (lex_mi->connect_retry)
1251
 
    mi->connect_retry = lex_mi->connect_retry;
1252
 
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1253
 
    mi->heartbeat_period = lex_mi->heartbeat_period;
1254
 
  else
1255
 
    mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
1256
 
                                      (slave_net_timeout/2.0));
1257
 
  mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd
1258
 
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1259
 
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
1260
 
 
1261
 
  if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1262
 
    mi->ssl_verify_server_cert=
1263
 
      (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
1264
 
 
1265
 
  if (lex_mi->ssl_ca)
1266
 
    strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
1267
 
  if (lex_mi->ssl_capath)
1268
 
    strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
1269
 
  if (lex_mi->ssl_cert)
1270
 
    strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
1271
 
  if (lex_mi->ssl_cipher)
1272
 
    strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
1273
 
  if (lex_mi->ssl_key)
1274
 
    strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
1275
 
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
1276
 
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
1277
 
      lex_mi->ssl_verify_server_cert )
1278
 
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
1279
 
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1280
 
 
1281
 
  if (lex_mi->relay_log_name)
1282
 
  {
1283
 
    need_relay_log_purge= 0;
1284
 
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
1285
 
            sizeof(mi->rli.group_relay_log_name)-1);
1286
 
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
1287
 
            sizeof(mi->rli.event_relay_log_name)-1);
1288
 
  }
1289
 
 
1290
 
  if (lex_mi->relay_log_pos)
1291
 
  {
1292
 
    need_relay_log_purge= 0;
1293
 
    mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
1294
 
  }
1295
 
 
1296
 
  /*
1297
 
    If user did specify neither host nor port nor any log name nor any log
1298
 
    pos, i.e. he specified only user/password/master_connect_retry, he probably
1299
 
    wants replication to resume from where it had left, i.e. from the
1300
 
    coordinates of the **SQL** thread (imagine the case where the I/O is ahead
1301
 
    of the SQL; restarting from the coordinates of the I/O would lose some
1302
 
    events which is probably unwanted when you are just doing minor changes
1303
 
    like changing master_connect_retry).
1304
 
    A side-effect is that if only the I/O thread was started, this thread may
1305
 
    restart from ''/4 after the CHANGE MASTER. That's a minor problem (it is a
1306
 
    much more unlikely situation than the one we are fixing here).
1307
 
    Note: coordinates of the SQL thread must be read here, before the
1308
 
    'if (need_relay_log_purge)' block which resets them.
1309
 
  */
1310
 
  if (!lex_mi->host && !lex_mi->port &&
1311
 
      !lex_mi->log_file_name && !lex_mi->pos &&
1312
 
      need_relay_log_purge)
1313
 
   {
1314
 
     /*
1315
 
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1316
 
       not initialized), so we use a max().
1317
 
       What happens to mi->rli.master_log_pos during the initialization stages
1318
 
       of replication is not 100% clear, so we guard against problems using
1319
 
       max().
1320
 
      */
1321
 
     mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
1322
 
                              mi->rli.group_master_log_pos);
1323
 
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
1324
 
             sizeof(mi->master_log_name)-1);
1325
 
  }
1326
 
  /*
1327
 
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1328
 
    a slave before).
1329
 
  */
1330
 
  if (flush_master_info(mi, 0))
1331
 
  {
1332
 
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1333
 
    unlock_slave_threads(mi);
1334
 
    DBUG_RETURN(TRUE);
1335
 
  }
1336
 
  if (need_relay_log_purge)
1337
 
  {
1338
 
    relay_log_purge= 1;
1339
 
    thd_proc_info(thd, "Purging old relay logs");
1340
 
    if (purge_relay_logs(&mi->rli, thd,
1341
 
                         0 /* not only reset, but also reinit */,
1342
 
                         &errmsg))
1343
 
    {
1344
 
      my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
1345
 
      unlock_slave_threads(mi);
1346
 
      DBUG_RETURN(TRUE);
1347
 
    }
1348
 
  }
1349
 
  else
1350
 
  {
1351
 
    const char* msg;
1352
 
    relay_log_purge= 0;
1353
 
    /* Relay log is already initialized */
1354
 
    if (init_relay_log_pos(&mi->rli,
1355
 
                           mi->rli.group_relay_log_name,
1356
 
                           mi->rli.group_relay_log_pos,
1357
 
                           0 /*no data lock*/,
1358
 
                           &msg, 0))
1359
 
    {
1360
 
      my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
1361
 
      unlock_slave_threads(mi);
1362
 
      DBUG_RETURN(TRUE);
1363
 
    }
1364
 
  }
1365
 
  /*
1366
 
    Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block,
1367
 
    so restore them to good values. If we left them to ''/0, that would work;
1368
 
    but that would fail in the case of 2 successive CHANGE MASTER (without a
1369
 
    START SLAVE in between): because first one would set the coords in mi to
1370
 
    the good values of those in rli, the set those in rli to ''/0, then
1371
 
    second CHANGE MASTER would set the coords in mi to those of rli, i.e. to
1372
 
    ''/0: we have lost all copies of the original good coordinates.
1373
 
    That's why we always save good coords in rli.
1374
 
  */
1375
 
  mi->rli.group_master_log_pos= mi->master_log_pos;
1376
 
  DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
1377
 
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
1378
 
          sizeof(mi->rli.group_master_log_name)-1);
1379
 
 
1380
 
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
1381
 
    mi->rli.group_master_log_pos=0;
1382
 
 
1383
 
  pthread_mutex_lock(&mi->rli.data_lock);
1384
 
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1385
 
  /* Clear the errors, for a clean start */
1386
 
  mi->rli.clear_error();
1387
 
  mi->rli.clear_until_condition();
1388
 
  /*
1389
 
    If we don't write new coordinates to disk now, then old will remain in
1390
 
    relay-log.info until START SLAVE is issued; but if mysqld is shutdown
1391
 
    before START SLAVE, then old will remain in relay-log.info, and will be the
1392
 
    in-memory value at restart (thus causing errors, as the old relay log does
1393
 
    not exist anymore).
1394
 
  */
1395
 
  flush_relay_log_info(&mi->rli);
1396
 
  pthread_cond_broadcast(&mi->data_cond);
1397
 
  pthread_mutex_unlock(&mi->rli.data_lock);
1398
 
 
1399
 
  unlock_slave_threads(mi);
1400
 
  thd_proc_info(thd, 0);
1401
 
  my_ok(thd);
1402
 
  DBUG_RETURN(FALSE);
1403
 
}
1404
 
 
1405
 
int reset_master(THD* thd)
1406
 
{
1407
 
  if (!mysql_bin_log.is_open())
1408
 
  {
1409
 
    my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
1410
 
               ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
1411
 
    return 1;
1412
 
  }
1413
 
  return mysql_bin_log.reset_logs(thd);
1414
 
}
1415
 
 
1416
 
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
1417
 
                   const char* log_file_name2, ulonglong log_pos2)
1418
 
{
1419
 
  int res;
1420
 
  uint log_file_name1_len=  strlen(log_file_name1);
1421
 
  uint log_file_name2_len=  strlen(log_file_name2);
1422
 
 
1423
 
  //  We assume that both log names match up to '.'
1424
 
  if (log_file_name1_len == log_file_name2_len)
1425
 
  {
1426
 
    if ((res= strcmp(log_file_name1, log_file_name2)))
1427
 
      return res;
1428
 
    return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
1429
 
  }
1430
 
  return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
1431
 
}
1432
 
 
1433
 
 
1434
 
bool mysql_show_binlog_events(THD* thd)
1435
 
{
1436
 
  Protocol *protocol= thd->protocol;
1437
 
  List<Item> field_list;
1438
 
  const char *errmsg = 0;
1439
 
  bool ret = TRUE;
1440
 
  IO_CACHE log;
1441
 
  File file = -1;
1442
 
  DBUG_ENTER("mysql_show_binlog_events");
1443
 
 
1444
 
  Log_event::init_show_field_list(&field_list);
1445
 
  if (protocol->send_fields(&field_list,
1446
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1447
 
    DBUG_RETURN(TRUE);
1448
 
 
1449
 
  Format_description_log_event *description_event= new
1450
 
    Format_description_log_event(3); /* MySQL 4.0 by default */
1451
 
 
1452
 
  /*
1453
 
    Wait for handlers to insert any pending information
1454
 
    into the binlog.  For e.g. ndb which updates the binlog asynchronously
1455
 
    this is needed so that the uses sees all its own commands in the binlog
1456
 
  */
1457
 
  ha_binlog_wait(thd);
1458
 
 
1459
 
  if (mysql_bin_log.is_open())
1460
 
  {
1461
 
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1462
 
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
1463
 
    ha_rows event_count, limit_start, limit_end;
1464
 
    my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1465
 
    char search_file_name[FN_REFLEN], *name;
1466
 
    const char *log_file_name = lex_mi->log_file_name;
1467
 
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1468
 
    LOG_INFO linfo;
1469
 
    Log_event* ev;
1470
 
 
1471
 
    unit->set_limit(thd->lex->current_select);
1472
 
    limit_start= unit->offset_limit_cnt;
1473
 
    limit_end= unit->select_limit_cnt;
1474
 
 
1475
 
    name= search_file_name;
1476
 
    if (log_file_name)
1477
 
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
1478
 
    else
1479
 
      name=0;                                   // Find first log
1480
 
 
1481
 
    linfo.index_file_offset = 0;
1482
 
 
1483
 
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1484
 
    {
1485
 
      errmsg = "Could not find target log";
1486
 
      goto err;
1487
 
    }
1488
 
 
1489
 
    pthread_mutex_lock(&LOCK_thread_count);
1490
 
    thd->current_linfo = &linfo;
1491
 
    pthread_mutex_unlock(&LOCK_thread_count);
1492
 
 
1493
 
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
1494
 
      goto err;
1495
 
 
1496
 
    /*
1497
 
      to account binlog event header size
1498
 
    */
1499
 
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
1500
 
 
1501
 
    pthread_mutex_lock(log_lock);
1502
 
 
1503
 
    /*
1504
 
      open_binlog() sought to position 4.
1505
 
      Read the first event in case it's a Format_description_log_event, to
1506
 
      know the format. If there's no such event, we are 3.23 or 4.x. This
1507
 
      code, like before, can't read 3.23 binlogs.
1508
 
      This code will fail on a mixed relay log (one which has Format_desc then
1509
 
      Rotate then Format_desc).
1510
 
    */
1511
 
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
1512
 
    if (ev)
1513
 
    {
1514
 
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
1515
 
      {
1516
 
        delete description_event;
1517
 
        description_event= (Format_description_log_event*) ev;
1518
 
      }
1519
 
      else
1520
 
        delete ev;
1521
 
    }
1522
 
 
1523
 
    my_b_seek(&log, pos);
1524
 
 
1525
 
    if (!description_event->is_valid())
1526
 
    {
1527
 
      errmsg="Invalid Format_description event; could be out of memory";
1528
 
      goto err;
1529
 
    }
1530
 
 
1531
 
    for (event_count = 0;
1532
 
         (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
1533
 
                                         description_event)); )
1534
 
    {
1535
 
      if (event_count >= limit_start &&
1536
 
          ev->net_send(protocol, linfo.log_file_name, pos))
1537
 
      {
1538
 
        errmsg = "Net error";
1539
 
        delete ev;
1540
 
        pthread_mutex_unlock(log_lock);
1541
 
        goto err;
1542
 
      }
1543
 
 
1544
 
      pos = my_b_tell(&log);
1545
 
      delete ev;
1546
 
 
1547
 
      if (++event_count >= limit_end)
1548
 
        break;
1549
 
    }
1550
 
 
1551
 
    if (event_count < limit_end && log.error)
1552
 
    {
1553
 
      errmsg = "Wrong offset or I/O error";
1554
 
      pthread_mutex_unlock(log_lock);
1555
 
      goto err;
1556
 
    }
1557
 
 
1558
 
    pthread_mutex_unlock(log_lock);
1559
 
  }
1560
 
 
1561
 
  ret= FALSE;
1562
 
 
1563
 
err:
1564
 
  delete description_event;
1565
 
  if (file >= 0)
1566
 
  {
1567
 
    end_io_cache(&log);
1568
 
    (void) my_close(file, MYF(MY_WME));
1569
 
  }
1570
 
 
1571
 
  if (errmsg)
1572
 
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
1573
 
             "SHOW BINLOG EVENTS", errmsg);
1574
 
  else
1575
 
    my_eof(thd);
1576
 
 
1577
 
  pthread_mutex_lock(&LOCK_thread_count);
1578
 
  thd->current_linfo = 0;
1579
 
  pthread_mutex_unlock(&LOCK_thread_count);
1580
 
  DBUG_RETURN(ret);
1581
 
}
1582
 
 
1583
 
 
1584
 
bool show_binlog_info(THD* thd)
1585
 
{
1586
 
  Protocol *protocol= thd->protocol;
1587
 
  DBUG_ENTER("show_binlog_info");
1588
 
  List<Item> field_list;
1589
 
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1590
 
  field_list.push_back(new Item_return_int("Position",20,
1591
 
                                           MYSQL_TYPE_LONGLONG));
1592
 
  field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
1593
 
  field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
1594
 
 
1595
 
  if (protocol->send_fields(&field_list,
1596
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1597
 
    DBUG_RETURN(TRUE);
1598
 
  protocol->prepare_for_resend();
1599
 
 
1600
 
  if (mysql_bin_log.is_open())
1601
 
  {
1602
 
    LOG_INFO li;
1603
 
    mysql_bin_log.get_current_log(&li);
1604
 
    int dir_len = dirname_length(li.log_file_name);
1605
 
    protocol->store(li.log_file_name + dir_len, &my_charset_bin);
1606
 
    protocol->store((ulonglong) li.pos);
1607
 
    protocol->store(binlog_filter->get_do_db());
1608
 
    protocol->store(binlog_filter->get_ignore_db());
1609
 
    if (protocol->write())
1610
 
      DBUG_RETURN(TRUE);
1611
 
  }
1612
 
  my_eof(thd);
1613
 
  DBUG_RETURN(FALSE);
1614
 
}
1615
 
 
1616
 
 
1617
 
/*
1618
 
  Send a list of all binary logs to client
1619
 
 
1620
 
  SYNOPSIS
1621
 
    show_binlogs()
1622
 
    thd         Thread specific variable
1623
 
 
1624
 
  RETURN VALUES
1625
 
    FALSE OK
1626
 
    TRUE  error
1627
 
*/
1628
 
 
1629
 
bool show_binlogs(THD* thd)
1630
 
{
1631
 
  IO_CACHE *index_file;
1632
 
  LOG_INFO cur;
1633
 
  File file;
1634
 
  char fname[FN_REFLEN];
1635
 
  List<Item> field_list;
1636
 
  uint length;
1637
 
  int cur_dir_len;
1638
 
  Protocol *protocol= thd->protocol;
1639
 
  DBUG_ENTER("show_binlogs");
1640
 
 
1641
 
  if (!mysql_bin_log.is_open())
1642
 
  {
1643
 
    my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0));
1644
 
    return 1;
1645
 
  }
1646
 
 
1647
 
  field_list.push_back(new Item_empty_string("Log_name", 255));
1648
 
  field_list.push_back(new Item_return_int("File_size", 20,
1649
 
                                           MYSQL_TYPE_LONGLONG));
1650
 
  if (protocol->send_fields(&field_list,
1651
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1652
 
    DBUG_RETURN(TRUE);
1653
 
  
1654
 
  pthread_mutex_lock(mysql_bin_log.get_log_lock());
1655
 
  mysql_bin_log.lock_index();
1656
 
  index_file=mysql_bin_log.get_index_file();
1657
 
  
1658
 
  mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
1659
 
  pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
1660
 
  
1661
 
  cur_dir_len= dirname_length(cur.log_file_name);
1662
 
 
1663
 
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
1664
 
 
1665
 
  /* The file ends with EOF or empty line */
1666
 
  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
1667
 
  {
1668
 
    int dir_len;
1669
 
    ulonglong file_length= 0;                   // Length if open fails
1670
 
    fname[--length] = '\0';                     // remove the newline
1671
 
 
1672
 
    protocol->prepare_for_resend();
1673
 
    dir_len= dirname_length(fname);
1674
 
    length-= dir_len;
1675
 
    protocol->store(fname + dir_len, length, &my_charset_bin);
1676
 
 
1677
 
    if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
1678
 
      file_length= cur.pos;  /* The active log, use the active position */
1679
 
    else
1680
 
    {
1681
 
      /* this is an old log, open it and find the size */
1682
 
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
1683
 
                         MYF(0))) >= 0)
1684
 
      {
1685
 
        file_length= (ulonglong) my_seek(file, 0L, MY_SEEK_END, MYF(0));
1686
 
        my_close(file, MYF(0));
1687
 
      }
1688
 
    }
1689
 
    protocol->store(file_length);
1690
 
    if (protocol->write())
1691
 
      goto err;
1692
 
  }
1693
 
  mysql_bin_log.unlock_index();
1694
 
  my_eof(thd);
1695
 
  DBUG_RETURN(FALSE);
1696
 
 
1697
 
err:
1698
 
  mysql_bin_log.unlock_index();
1699
 
  DBUG_RETURN(TRUE);
1700
 
}
1701
 
 
1702
 
/**
1703
 
   Load data's io cache specific hook to be executed
1704
 
   before a chunk of data is being read into the cache's buffer
1705
 
   The fuction instantianates and writes into the binlog
1706
 
   replication events along LOAD DATA processing.
1707
 
   
1708
 
   @param file  pointer to io-cache
1709
 
   @return 0
1710
 
*/
1711
 
int log_loaded_block(IO_CACHE* file)
1712
 
{
1713
 
  DBUG_ENTER("log_loaded_block");
1714
 
  LOAD_FILE_INFO *lf_info;
1715
 
  uint block_len;
1716
 
  /* buffer contains position where we started last read */
1717
 
  uchar* buffer= (uchar*) my_b_get_buffer_start(file);
1718
 
  uint max_event_size= current_thd->variables.max_allowed_packet;
1719
 
  lf_info= (LOAD_FILE_INFO*) file->arg;
1720
 
  if (lf_info->thd->current_stmt_binlog_row_based)
1721
 
    DBUG_RETURN(0);
1722
 
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1723
 
      lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1724
 
    DBUG_RETURN(0);
1725
 
  
1726
 
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1727
 
       buffer += min(block_len, max_event_size),
1728
 
       block_len -= min(block_len, max_event_size))
1729
 
  {
1730
 
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1731
 
    if (lf_info->wrote_create_file)
1732
 
    {
1733
 
      Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
1734
 
                               min(block_len, max_event_size),
1735
 
                               lf_info->log_delayed);
1736
 
      mysql_bin_log.write(&a);
1737
 
    }
1738
 
    else
1739
 
    {
1740
 
      Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1741
 
                                   buffer,
1742
 
                                   min(block_len, max_event_size),
1743
 
                                   lf_info->log_delayed);
1744
 
      mysql_bin_log.write(&b);
1745
 
      lf_info->wrote_create_file= 1;
1746
 
      DBUG_SYNC_POINT("debug_lock.created_file_event",10);
1747
 
    }
1748
 
  }
1749
 
  DBUG_RETURN(0);
1750
 
}
1751
 
 
1752
 
/*
1753
 
  Replication System Variables
1754
 
*/
1755
 
 
1756
 
class sys_var_slave_skip_counter :public sys_var
1757
 
{
1758
 
public:
1759
 
  sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
1760
 
    :sys_var(name_arg)
1761
 
  { chain_sys_var(chain); }
1762
 
  bool check(THD *thd, set_var *var);
1763
 
  bool update(THD *thd, set_var *var);
1764
 
  bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
1765
 
  /*
1766
 
    We can't retrieve the value of this, so we don't have to define
1767
 
    type() or value_ptr()
1768
 
  */
1769
 
};
1770
 
 
1771
 
class sys_var_sync_binlog_period :public sys_var_long_ptr
1772
 
{
1773
 
public:
1774
 
  sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg, 
1775
 
                             ulong *value_ptr)
1776
 
    :sys_var_long_ptr(chain, name_arg,value_ptr) {}
1777
 
  bool update(THD *thd, set_var *var);
1778
 
};
1779
 
 
1780
 
static void fix_slave_net_timeout(THD *thd, enum_var_type type)
1781
 
{
1782
 
  DBUG_ENTER("fix_slave_net_timeout");
1783
 
#ifdef HAVE_REPLICATION
1784
 
  pthread_mutex_lock(&LOCK_active_mi);
1785
 
  DBUG_PRINT("info",("slave_net_timeout=%lu mi->heartbeat_period=%.3f",
1786
 
                     slave_net_timeout,
1787
 
                     (active_mi? active_mi->heartbeat_period : 0.0)));
1788
 
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1789
 
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1790
 
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1791
 
                        "The currect value for master_heartbeat_period"
1792
 
                        " exceeds the new value of `slave_net_timeout' sec."
1793
 
                        " A sensible value for the period should be"
1794
 
                        " less than the timeout.");
1795
 
  pthread_mutex_unlock(&LOCK_active_mi);
1796
 
#endif
1797
 
  DBUG_VOID_RETURN;
1798
 
}
1799
 
 
1800
 
static sys_var_chain vars = { NULL, NULL };
1801
 
 
1802
 
static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
1803
 
                                            &relay_log_purge);
1804
 
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
1805
 
                                              &slave_net_timeout,
1806
 
                                              fix_slave_net_timeout);
1807
 
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
1808
 
                                                &slave_trans_retries);
1809
 
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
1810
 
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
1811
 
 
1812
 
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
1813
 
 
1814
 
 
1815
 
static SHOW_VAR fixed_vars[]= {
1816
 
  {"log_slave_updates",       (char*) &opt_log_slave_updates,       SHOW_MY_BOOL},
1817
 
  {"relay_log" , (char*) &opt_relay_logname, SHOW_CHAR_PTR},
1818
 
  {"relay_log_index", (char*) &opt_relaylog_index_name, SHOW_CHAR_PTR},
1819
 
  {"relay_log_info_file", (char*) &relay_log_info_file, SHOW_CHAR_PTR},
1820
 
  {"relay_log_space_limit",   (char*) &relay_log_space_limit,       SHOW_LONGLONG},
1821
 
  {"slave_load_tmpdir",       (char*) &slave_load_tmpdir,           SHOW_CHAR_PTR},
1822
 
  {"slave_skip_errors",       (char*) &show_slave_skip_errors,      SHOW_FUNC},
1823
 
};
1824
 
 
1825
 
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff)
1826
 
{
1827
 
  var->type=SHOW_CHAR;
1828
 
  var->value= buff;
1829
 
  if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
1830
 
  {
1831
 
    var->value= const_cast<char *>("OFF");
1832
 
  }
1833
 
  else if (bitmap_is_set_all(&slave_error_mask))
1834
 
  {
1835
 
    var->value= const_cast<char *>("ALL");
1836
 
  }
1837
 
  else
1838
 
  {
1839
 
    /* 10 is enough assuming errors are max 4 digits */
1840
 
    int i;
1841
 
    var->value= buff;
1842
 
    for (i= 1;
1843
 
         i < MAX_SLAVE_ERROR &&
1844
 
         (buff - var->value) < SHOW_VAR_FUNC_BUFF_SIZE;
1845
 
         i++)
1846
 
    {
1847
 
      if (bitmap_is_set(&slave_error_mask, i))
1848
 
      {
1849
 
        buff= int10_to_str(i, buff, 10);
1850
 
        *buff++= ',';
1851
 
      }
1852
 
    }
1853
 
    if (var->value != buff)
1854
 
      buff--;                           // Remove last ','
1855
 
    if (i < MAX_SLAVE_ERROR)
1856
 
      buff= strmov(buff, "...");  // Couldn't show all errors
1857
 
    *buff=0;
1858
 
  }
1859
 
  return 0;
1860
 
}
1861
 
 
1862
 
bool sys_var_slave_skip_counter::check(THD *thd, set_var *var)
1863
 
{
1864
 
  int result= 0;
1865
 
  pthread_mutex_lock(&LOCK_active_mi);
1866
 
  pthread_mutex_lock(&active_mi->rli.run_lock);
1867
 
  if (active_mi->rli.slave_running)
1868
 
  {
1869
 
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1870
 
    result=1;
1871
 
  }
1872
 
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1873
 
  pthread_mutex_unlock(&LOCK_active_mi);
1874
 
  var->save_result.ulong_value= (ulong) var->value->val_int();
1875
 
  return result;
1876
 
}
1877
 
 
1878
 
 
1879
 
bool sys_var_slave_skip_counter::update(THD *thd, set_var *var)
1880
 
{
1881
 
  pthread_mutex_lock(&LOCK_active_mi);
1882
 
  pthread_mutex_lock(&active_mi->rli.run_lock);
1883
 
  /*
1884
 
    The following test should normally never be true as we test this
1885
 
    in the check function;  To be safe against multiple
1886
 
    SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
1887
 
  */
1888
 
  if (!active_mi->rli.slave_running)
1889
 
  {
1890
 
    pthread_mutex_lock(&active_mi->rli.data_lock);
1891
 
    active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
1892
 
    pthread_mutex_unlock(&active_mi->rli.data_lock);
1893
 
  }
1894
 
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1895
 
  pthread_mutex_unlock(&LOCK_active_mi);
1896
 
  return 0;
1897
 
}
1898
 
 
1899
 
 
1900
 
bool sys_var_sync_binlog_period::update(THD *thd, set_var *var)
1901
 
{
1902
 
  sync_binlog_period= (ulong) var->save_result.ulonglong_value;
1903
 
  return 0;
1904
 
}
1905
 
 
1906
 
int init_replication_sys_vars()
1907
 
{
1908
 
  mysql_append_static_vars(fixed_vars, sizeof(fixed_vars) / sizeof(SHOW_VAR));
1909
 
 
1910
 
  if (mysql_add_sys_var_chain(vars.first, my_long_options))
1911
 
  {
1912
 
    /* should not happen */
1913
 
    fprintf(stderr, "failed to initialize replication system variables");
1914
 
    unireg_abort(1);
1915
 
  }
1916
 
  return 0;
1917
 
}
1918
 
 
1919
 
#endif /* HAVE_REPLICATION */
1920
 
 
1921