~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_repl.cc

  • Committer: Andy Lester
  • Date: 2008-08-10 02:15:48 UTC
  • mto: (266.1.31 use-replace-funcs)
  • mto: This revision was merged to the branch mainline in revision 295.
  • Revision ID: andy@petdance.com-20080810021548-0zx8nhzva6al10k3
Added a proper const qualifer.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2006 MySQL AB & Sasha
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <drizzled/server_includes.h>
 
17
 
 
18
#ifdef HAVE_REPLICATION
 
19
#include "rpl_mi.h"
 
20
#include "sql_repl.h"
 
21
#include "log_event.h"
 
22
#include "rpl_filter.h"
 
23
#include <drizzled/drizzled_error_messages.h>
 
24
 
 
25
int max_binlog_dump_events = 0; // unlimited
 
26
 
 
27
/*
 
28
    fake_rotate_event() builds a fake (=which does not exist physically in any
 
29
    binlog) Rotate event, which contains the name of the binlog we are going to
 
30
    send to the slave (because the slave may not know it if it just asked for
 
31
    MASTER_LOG_FILE='', MASTER_LOG_POS=4).
 
32
    < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
 
33
    After this version we always call it, so that a 3.23.58 slave can rely on
 
34
    it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
 
35
    zeros in the good positions which, by chance, make it possible for the 3.23
 
36
    slave to detect that this event is unexpected) (this is luck which happens
 
37
    because the master and slave disagree on the size of the header of
 
38
    Log_event).
 
39
 
 
40
    Relying on the event length of the Rotate event instead of these
 
41
    well-placed zeros was not possible as Rotate events have a variable-length
 
42
    part.
 
43
*/
 
44
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
 
45
                             uint64_t position, const char** errmsg)
 
46
{
 
47
  char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
 
48
  /*
 
49
    'when' (the timestamp) is set to 0 so that slave could distinguish between
 
50
    real and fake Rotate events (if necessary)
 
51
  */
 
52
  memset(header, 0, 4);
 
53
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
 
54
 
 
55
  char* p = log_file_name+dirname_length(log_file_name);
 
56
  uint ident_len = (uint) strlen(p);
 
57
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
 
58
  int4store(header + SERVER_ID_OFFSET, server_id);
 
59
  int4store(header + EVENT_LEN_OFFSET, event_len);
 
60
  int2store(header + FLAGS_OFFSET, 0);
 
61
 
 
62
  // TODO: check what problems this may cause and fix them
 
63
  int4store(header + LOG_POS_OFFSET, 0);
 
64
 
 
65
  packet->append(header, sizeof(header));
 
66
  int8store(buf+R_POS_OFFSET,position);
 
67
  packet->append(buf, ROTATE_HEADER_LEN);
 
68
  packet->append(p,ident_len);
 
69
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
 
70
  {
 
71
    *errmsg = "failed on my_net_write()";
 
72
    return(-1);
 
73
  }
 
74
  return(0);
 
75
}
 
76
 
 
77
static int send_file(THD *thd)
 
78
{
 
79
  NET* net = &thd->net;
 
80
  int fd = -1, error = 1;
 
81
  size_t bytes;
 
82
  char fname[FN_REFLEN+1];
 
83
  const char *errmsg = 0;
 
84
  int old_timeout;
 
85
  unsigned long packet_len;
 
86
  uchar buf[IO_SIZE];                           // It's safe to alloc this
 
87
 
 
88
  /*
 
89
    The client might be slow loading the data, give him wait_timeout to do
 
90
    the job
 
91
  */
 
92
  old_timeout= net->read_timeout;
 
93
  my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
 
94
 
 
95
  /*
 
96
    We need net_flush here because the client will not know it needs to send
 
97
    us the file name until it has processed the load event entry
 
98
  */
 
99
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
 
100
  {
 
101
    errmsg = _("Failed in send_file() while reading file name");
 
102
    goto err;
 
103
  }
 
104
 
 
105
  // terminate with \0 for fn_format
 
106
  *((char*)net->read_pos +  packet_len) = 0;
 
107
  fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
 
108
  // this is needed to make replicate-ignore-db
 
109
  if (!strcmp(fname,"/dev/null"))
 
110
    goto end;
 
111
 
 
112
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
 
113
  {
 
114
    errmsg = _("Failed in send_file() on open of file");
 
115
    goto err;
 
116
  }
 
117
 
 
118
  while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0)
 
119
  {
 
120
    if (my_net_write(net, buf, bytes))
 
121
    {
 
122
      errmsg = _("Failed in send_file() while writing data to client");
 
123
      goto err;
 
124
    }
 
125
  }
 
126
 
 
127
 end:
 
128
  if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
 
129
      (my_net_read(net) == packet_error))
 
130
  {
 
131
    errmsg = _("Failed in send_file() while negotiating file transfer close");
 
132
    goto err;
 
133
  }
 
134
  error = 0;
 
135
 
 
136
 err:
 
137
  my_net_set_read_timeout(net, old_timeout);
 
138
  if (fd >= 0)
 
139
    (void) my_close(fd, MYF(0));
 
140
  if (errmsg)
 
141
  {
 
142
    sql_print_error(errmsg);
 
143
  }
 
144
  return(error);
 
145
}
 
146
 
 
147
 
 
148
/*
 
149
  Adjust the position pointer in the binary log file for all running slaves
 
150
 
 
151
  SYNOPSIS
 
152
    adjust_linfo_offsets()
 
153
    purge_offset        Number of bytes removed from start of log index file
 
154
 
 
155
  NOTES
 
156
    - This is called when doing a PURGE when we delete lines from the
 
157
      index log file
 
158
 
 
159
  REQUIREMENTS
 
160
    - Before calling this function, we have to ensure that no threads are
 
161
      using any binary log file before purge_offset.a
 
162
 
 
163
  TODO
 
164
    - Inform the slave threads that they should sync the position
 
165
      in the binary log file with flush_relay_log_info.
 
166
      Now they sync is done for next read.
 
167
*/
 
168
 
 
169
void adjust_linfo_offsets(my_off_t purge_offset)
 
170
{
 
171
  THD *tmp;
 
172
 
 
173
  pthread_mutex_lock(&LOCK_thread_count);
 
174
  I_List_iterator<THD> it(threads);
 
175
 
 
176
  while ((tmp=it++))
 
177
  {
 
178
    LOG_INFO* linfo;
 
179
    if ((linfo = tmp->current_linfo))
 
180
    {
 
181
      pthread_mutex_lock(&linfo->lock);
 
182
      /*
 
183
        Index file offset can be less that purge offset only if
 
184
        we just started reading the index file. In that case
 
185
        we have nothing to adjust
 
186
      */
 
187
      if (linfo->index_file_offset < purge_offset)
 
188
        linfo->fatal = (linfo->index_file_offset != 0);
 
189
      else
 
190
        linfo->index_file_offset -= purge_offset;
 
191
      pthread_mutex_unlock(&linfo->lock);
 
192
    }
 
193
  }
 
194
  pthread_mutex_unlock(&LOCK_thread_count);
 
195
}
 
196
 
 
197
 
 
198
bool log_in_use(const char* log_name)
 
199
{
 
200
  int log_name_len = strlen(log_name) + 1;
 
201
  THD *tmp;
 
202
  bool result = 0;
 
203
 
 
204
  pthread_mutex_lock(&LOCK_thread_count);
 
205
  I_List_iterator<THD> it(threads);
 
206
 
 
207
  while ((tmp=it++))
 
208
  {
 
209
    LOG_INFO* linfo;
 
210
    if ((linfo = tmp->current_linfo))
 
211
    {
 
212
      pthread_mutex_lock(&linfo->lock);
 
213
      result = !memcmp(log_name, linfo->log_file_name, log_name_len);
 
214
      pthread_mutex_unlock(&linfo->lock);
 
215
      if (result)
 
216
        break;
 
217
    }
 
218
  }
 
219
 
 
220
  pthread_mutex_unlock(&LOCK_thread_count);
 
221
  return result;
 
222
}
 
223
 
 
224
bool purge_error_message(THD* thd, int res)
 
225
{
 
226
  uint errmsg= 0;
 
227
 
 
228
  switch (res)  {
 
229
  case 0: break;
 
230
  case LOG_INFO_EOF:    errmsg= ER_UNKNOWN_TARGET_BINLOG; break;
 
231
  case LOG_INFO_IO:     errmsg= ER_IO_ERR_LOG_INDEX_READ; break;
 
232
  case LOG_INFO_INVALID:errmsg= ER_BINLOG_PURGE_PROHIBITED; break;
 
233
  case LOG_INFO_SEEK:   errmsg= ER_FSEEK_FAIL; break;
 
234
  case LOG_INFO_MEM:    errmsg= ER_OUT_OF_RESOURCES; break;
 
235
  case LOG_INFO_FATAL:  errmsg= ER_BINLOG_PURGE_FATAL_ERR; break;
 
236
  case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break;
 
237
  case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break;
 
238
  default:              errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break;
 
239
  }
 
240
 
 
241
  if (errmsg)
 
242
  {
 
243
    my_message(errmsg, ER(errmsg), MYF(0));
 
244
    return true;
 
245
  }
 
246
  my_ok(thd);
 
247
  return false;
 
248
}
 
249
 
 
250
 
 
251
bool purge_master_logs(THD* thd, const char* to_log)
 
252
{
 
253
  char search_file_name[FN_REFLEN];
 
254
  if (!mysql_bin_log.is_open())
 
255
  {
 
256
    my_ok(thd);
 
257
    return false;
 
258
  }
 
259
 
 
260
  mysql_bin_log.make_log_name(search_file_name, to_log);
 
261
  return purge_error_message(thd,
 
262
                             mysql_bin_log.purge_logs(search_file_name, 0, 1,
 
263
                                                      1, NULL));
 
264
}
 
265
 
 
266
 
 
267
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
 
268
{
 
269
  if (!mysql_bin_log.is_open())
 
270
  {
 
271
    my_ok(thd);
 
272
    return 0;
 
273
  }
 
274
  return purge_error_message(thd,
 
275
                             mysql_bin_log.purge_logs_before_date(purge_time));
 
276
}
 
277
 
 
278
int test_for_non_eof_log_read_errors(int error, const char **errmsg)
 
279
{
 
280
  if (error == LOG_READ_EOF)
 
281
    return 0;
 
282
  my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
283
  switch (error) {
 
284
  case LOG_READ_BOGUS:
 
285
    *errmsg = "bogus data in log event";
 
286
    break;
 
287
  case LOG_READ_TOO_LARGE:
 
288
    *errmsg = "log event entry exceeded max_allowed_packet; \
 
289
Increase max_allowed_packet on master";
 
290
    break;
 
291
  case LOG_READ_IO:
 
292
    *errmsg = "I/O error reading log event";
 
293
    break;
 
294
  case LOG_READ_MEM:
 
295
    *errmsg = "memory allocation failed reading log event";
 
296
    break;
 
297
  case LOG_READ_TRUNC:
 
298
    *errmsg = "binlog truncated in the middle of event";
 
299
    break;
 
300
  default:
 
301
    *errmsg = "unknown error reading log event on the master";
 
302
    break;
 
303
  }
 
304
  return error;
 
305
}
 
306
 
 
307
 
 
308
/**
 
309
  An auxiliary function for calling in mysql_binlog_send
 
310
  to initialize the heartbeat timeout in waiting for a binlogged event.
 
311
 
 
312
  @param[in]    thd  THD to access a user variable
 
313
 
 
314
  @return        heartbeat period an uint64_t of nanoseconds
 
315
                 or zero if heartbeat was not demanded by slave
 
316
*/ 
 
317
static uint64_t get_heartbeat_period(THD * thd)
 
318
{
 
319
  bool null_value;
 
320
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
 
321
  user_var_entry *entry= 
 
322
    (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
 
323
                                  name.length);
 
324
  return entry? entry->val_int(&null_value) : 0;
 
325
}
 
326
 
 
327
/*
 
328
  Function prepares and sends repliation heartbeat event.
 
329
 
 
330
  @param net                net object of THD
 
331
  @param packet             buffer to store the heartbeat instance
 
332
  @param event_coordinates  binlog file name and position of the last
 
333
                            real event master sent from binlog
 
334
 
 
335
  @note 
 
336
    Among three essential pieces of heartbeat data Log_event::when
 
337
    is computed locally.
 
338
    The  error to send is serious and should force terminating
 
339
    the dump thread.
 
340
*/
 
341
static int send_heartbeat_event(NET* net, String* packet,
 
342
                                const struct event_coordinates *coord)
 
343
{
 
344
  char header[LOG_EVENT_HEADER_LEN];
 
345
  /*
 
346
    'when' (the timestamp) is set to 0 so that slave could distinguish between
 
347
    real and fake Rotate events (if necessary)
 
348
  */
 
349
  memset(header, 0, 4);  // when
 
350
 
 
351
  header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
 
352
 
 
353
  char* p= coord->file_name + dirname_length(coord->file_name);
 
354
 
 
355
  uint ident_len = strlen(p);
 
356
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
 
357
  int4store(header + SERVER_ID_OFFSET, server_id);
 
358
  int4store(header + EVENT_LEN_OFFSET, event_len);
 
359
  int2store(header + FLAGS_OFFSET, 0);
 
360
 
 
361
  int4store(header + LOG_POS_OFFSET, coord->pos);  // log_pos
 
362
 
 
363
  packet->append(header, sizeof(header));
 
364
  packet->append(p, ident_len);             // log_file_name
 
365
 
 
366
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
 
367
      net_flush(net))
 
368
  {
 
369
    return(-1);
 
370
  }
 
371
  packet->set("\0", 1, &my_charset_bin);
 
372
  return(0);
 
373
}
 
374
 
 
375
/*
 
376
  TODO: Clean up loop to only have one call to send_file()
 
377
*/
 
378
 
 
379
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
 
380
                       ushort flags)
 
381
{
 
382
  LOG_INFO linfo;
 
383
  char *log_file_name = linfo.log_file_name;
 
384
  char search_file_name[FN_REFLEN], *name;
 
385
  IO_CACHE log;
 
386
  File file = -1;
 
387
  String* packet = &thd->packet;
 
388
  int error;
 
389
  const char *errmsg = "Unknown error";
 
390
  NET* net = &thd->net;
 
391
  pthread_mutex_t *log_lock;
 
392
  bool binlog_can_be_corrupted= false;
 
393
 
 
394
  memset(&log, 0, sizeof(log));
 
395
  /* 
 
396
     heartbeat_period from @master_heartbeat_period user variable
 
397
  */
 
398
  uint64_t heartbeat_period= get_heartbeat_period(thd);
 
399
  struct timespec heartbeat_buf;
 
400
  struct event_coordinates coord_buf;
 
401
  struct timespec *heartbeat_ts= NULL;
 
402
  struct event_coordinates *coord= NULL;
 
403
  if (heartbeat_period != 0LL)
 
404
  {
 
405
    heartbeat_ts= &heartbeat_buf;
 
406
    set_timespec_nsec(*heartbeat_ts, 0);
 
407
    coord= &coord_buf;
 
408
    coord->file_name= log_file_name; // initialization basing on what slave remembers
 
409
    coord->pos= pos;
 
410
  }
 
411
 
 
412
  if (!mysql_bin_log.is_open())
 
413
  {
 
414
    errmsg = "Binary log is not open";
 
415
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
416
    goto err;
 
417
  }
 
418
  if (!server_id_supplied)
 
419
  {
 
420
    errmsg = "Misconfigured master - server id was not set";
 
421
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
422
    goto err;
 
423
  }
 
424
 
 
425
  name=search_file_name;
 
426
  if (log_ident[0])
 
427
    mysql_bin_log.make_log_name(search_file_name, log_ident);
 
428
  else
 
429
    name=0;                                     // Find first log
 
430
 
 
431
  linfo.index_file_offset = 0;
 
432
 
 
433
  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
 
434
  {
 
435
    errmsg = "Could not find first log file name in binary log index file";
 
436
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
437
    goto err;
 
438
  }
 
439
 
 
440
  pthread_mutex_lock(&LOCK_thread_count);
 
441
  thd->current_linfo = &linfo;
 
442
  pthread_mutex_unlock(&LOCK_thread_count);
 
443
 
 
444
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
 
445
  {
 
446
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
447
    goto err;
 
448
  }
 
449
  if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
 
450
  {
 
451
    errmsg= "Client requested master to start replication from \
 
452
impossible position";
 
453
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
454
    goto err;
 
455
  }
 
456
 
 
457
  /*
 
458
    We need to start a packet with something other than 255
 
459
    to distinguish it from error
 
460
  */
 
461
  packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
 
462
 
 
463
  /*
 
464
    Tell the client about the log name with a fake Rotate event;
 
465
    this is needed even if we also send a Format_description_log_event
 
466
    just after, because that event does not contain the binlog's name.
 
467
    Note that as this Rotate event is sent before
 
468
    Format_description_log_event, the slave cannot have any info to
 
469
    understand this event's format, so the header len of
 
470
    Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
 
471
    than other events except FORMAT_DESCRIPTION_EVENT).
 
472
    Before 4.0.14 we called fake_rotate_event below only if (pos ==
 
473
    BIN_LOG_HEADER_SIZE), because if this is false then the slave
 
474
    already knows the binlog's name.
 
475
    Since, we always call fake_rotate_event; if the slave already knew
 
476
    the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
 
477
    useless but does not harm much. It is nice for 3.23 (>=.58) slaves
 
478
    which test Rotate events to see if the master is 4.0 (then they
 
479
    choose to stop because they can't replicate 4.0); by always calling
 
480
    fake_rotate_event we are sure that 3.23.58 and newer will detect the
 
481
    problem as soon as replication starts (BUG#198).
 
482
    Always calling fake_rotate_event makes sending of normal
 
483
    (=from-binlog) Rotate events a priori unneeded, but it is not so
 
484
    simple: the 2 Rotate events are not equivalent, the normal one is
 
485
    before the Stop event, the fake one is after. If we don't send the
 
486
    normal one, then the Stop event will be interpreted (by existing 4.0
 
487
    slaves) as "the master stopped", which is wrong. So for safety,
 
488
    given that we want minimum modification of 4.0, we send the normal
 
489
    and fake Rotates.
 
490
  */
 
491
  if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
 
492
  {
 
493
    /*
 
494
       This error code is not perfect, as fake_rotate_event() does not
 
495
       read anything from the binlog; if it fails it's because of an
 
496
       error in my_net_write(), fortunately it will say so in errmsg.
 
497
    */
 
498
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
499
    goto err;
 
500
  }
 
501
  packet->set("\0", 1, &my_charset_bin);
 
502
  /*
 
503
    Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
 
504
    this larger than the corresponding packet (query) sent 
 
505
    from client to master.
 
506
  */
 
507
  thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
 
508
 
 
509
  /*
 
510
    We can set log_lock now, it does not move (it's a member of
 
511
    mysql_bin_log, and it's already inited, and it will be destroyed
 
512
    only at shutdown).
 
513
  */
 
514
  log_lock = mysql_bin_log.get_log_lock();
 
515
  if (pos > BIN_LOG_HEADER_SIZE)
 
516
  {
 
517
     /*
 
518
       Try to find a Format_description_log_event at the beginning of
 
519
       the binlog
 
520
     */
 
521
     if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
 
522
     {
 
523
       /*
 
524
         The packet has offsets equal to the normal offsets in a binlog
 
525
         event +1 (the first character is \0).
 
526
       */
 
527
       if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
 
528
       {
 
529
         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
 
530
                                       LOG_EVENT_BINLOG_IN_USE_F);
 
531
         (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
 
532
         /*
 
533
           mark that this event with "log_pos=0", so the slave
 
534
           should not increment master's binlog position
 
535
           (rli->group_master_log_pos)
 
536
         */
 
537
         int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
 
538
         /*
 
539
           if reconnect master sends FD event with `created' as 0
 
540
           to avoid destroying temp tables.
 
541
          */
 
542
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
 
543
                   ST_CREATED_OFFSET+1, (ulong) 0);
 
544
         /* send it */
 
545
         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
 
546
         {
 
547
           errmsg = "Failed on my_net_write()";
 
548
           my_errno= ER_UNKNOWN_ERROR;
 
549
           goto err;
 
550
         }
 
551
 
 
552
         /*
 
553
           No need to save this event. We are only doing simple reads
 
554
           (no real parsing of the events) so we don't need it. And so
 
555
           we don't need the artificial Format_description_log_event of
 
556
           3.23&4.x.
 
557
         */
 
558
       }
 
559
     }
 
560
     else
 
561
     {
 
562
       if (test_for_non_eof_log_read_errors(error, &errmsg))
 
563
         goto err;
 
564
       /*
 
565
         It's EOF, nothing to do, go on reading next events, the
 
566
         Format_description_log_event will be found naturally if it is written.
 
567
       */
 
568
     }
 
569
     /* reset the packet as we wrote to it in any case */
 
570
     packet->set("\0", 1, &my_charset_bin);
 
571
  } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
 
572
  else
 
573
  {
 
574
    /* The Format_description_log_event event will be found naturally. */
 
575
  }
 
576
 
 
577
  /* seek to the requested position, to start the requested dump */
 
578
  my_b_seek(&log, pos);                 // Seek will done on next read
 
579
 
 
580
  while (!net->error && net->vio != 0 && !thd->killed)
 
581
  {
 
582
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
 
583
    {
 
584
      /*
 
585
        log's filename does not change while it's active
 
586
      */
 
587
      if (coord)
 
588
        coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
 
589
 
 
590
      if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
 
591
      {
 
592
        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
 
593
                                      LOG_EVENT_BINLOG_IN_USE_F);
 
594
        (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
 
595
      }
 
596
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
 
597
        binlog_can_be_corrupted= false;
 
598
 
 
599
      if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
 
600
      {
 
601
        errmsg = "Failed on my_net_write()";
 
602
        my_errno= ER_UNKNOWN_ERROR;
 
603
        goto err;
 
604
      }
 
605
 
 
606
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
 
607
      {
 
608
        if (send_file(thd))
 
609
        {
 
610
          errmsg = "failed in send_file()";
 
611
          my_errno= ER_UNKNOWN_ERROR;
 
612
          goto err;
 
613
        }
 
614
      }
 
615
      packet->set("\0", 1, &my_charset_bin);
 
616
    }
 
617
 
 
618
    /*
 
619
      here we were reading binlog that was not closed properly (as a result
 
620
      of a crash ?). treat any corruption as EOF
 
621
    */
 
622
    if (binlog_can_be_corrupted && error != LOG_READ_MEM)
 
623
      error=LOG_READ_EOF;
 
624
    /*
 
625
      TODO: now that we are logging the offset, check to make sure
 
626
      the recorded offset and the actual match.
 
627
      Guilhem 2003-06: this is not true if this master is a slave
 
628
      <4.0.15 running with --log-slave-updates, because then log_pos may
 
629
      be the offset in the-master-of-this-master's binlog.
 
630
    */
 
631
    if (test_for_non_eof_log_read_errors(error, &errmsg))
 
632
      goto err;
 
633
 
 
634
    if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
 
635
        mysql_bin_log.is_active(log_file_name))
 
636
    {
 
637
      /*
 
638
        Block until there is more data in the log
 
639
      */
 
640
      if (net_flush(net))
 
641
      {
 
642
        errmsg = "failed on net_flush()";
 
643
        my_errno= ER_UNKNOWN_ERROR;
 
644
        goto err;
 
645
      }
 
646
 
 
647
      /*
 
648
        We may have missed the update broadcast from the log
 
649
        that has just happened, let's try to catch it if it did.
 
650
        If we did not miss anything, we just wait for other threads
 
651
        to signal us.
 
652
      */
 
653
      {
 
654
        log.error=0;
 
655
        bool read_packet = 0, fatal_error = 0;
 
656
 
 
657
        /*
 
658
          No one will update the log while we are reading
 
659
          now, but we'll be quick and just read one record
 
660
 
 
661
          TODO:
 
662
          Add an counter that is incremented for each time we update the
 
663
          binary log.  We can avoid the following read if the counter
 
664
          has not been updated since last read.
 
665
        */
 
666
 
 
667
        pthread_mutex_lock(log_lock);
 
668
        switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
 
669
        case 0:
 
670
          /* we read successfully, so we'll need to send it to the slave */
 
671
          pthread_mutex_unlock(log_lock);
 
672
          read_packet = 1;
 
673
          if (coord)
 
674
            coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
 
675
          break;
 
676
 
 
677
        case LOG_READ_EOF:
 
678
        {
 
679
          int ret;
 
680
          if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
 
681
          {
 
682
            pthread_mutex_unlock(log_lock);
 
683
            goto end;
 
684
          }
 
685
 
 
686
          do 
 
687
          {
 
688
            if (coord)
 
689
            {
 
690
              assert(heartbeat_ts && heartbeat_period != 0LL);
 
691
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
 
692
            }
 
693
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
 
694
            assert(ret == 0 || (heartbeat_period != 0LL && coord != NULL));
 
695
            if (ret == ETIMEDOUT || ret == ETIME)
 
696
            {
 
697
              if (send_heartbeat_event(net, packet, coord))
 
698
              {
 
699
                errmsg = "Failed on my_net_write()";
 
700
                my_errno= ER_UNKNOWN_ERROR;
 
701
                pthread_mutex_unlock(log_lock);
 
702
                goto err;
 
703
              }
 
704
            }
 
705
            else
 
706
            {
 
707
              assert(ret == 0);
 
708
            }
 
709
          } while (ret != 0 && coord != NULL && !thd->killed);
 
710
          pthread_mutex_unlock(log_lock);
 
711
        }    
 
712
        break;
 
713
            
 
714
        default:
 
715
          pthread_mutex_unlock(log_lock);
 
716
          fatal_error = 1;
 
717
          break;
 
718
        }
 
719
 
 
720
        if (read_packet)
 
721
        {
 
722
          thd_proc_info(thd, "Sending binlog event to slave");
 
723
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
 
724
          {
 
725
            errmsg = "Failed on my_net_write()";
 
726
            my_errno= ER_UNKNOWN_ERROR;
 
727
            goto err;
 
728
          }
 
729
 
 
730
          if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
 
731
          {
 
732
            if (send_file(thd))
 
733
            {
 
734
              errmsg = "failed in send_file()";
 
735
              my_errno= ER_UNKNOWN_ERROR;
 
736
              goto err;
 
737
            }
 
738
          }
 
739
          packet->set("\0", 1, &my_charset_bin);
 
740
          /*
 
741
            No need to net_flush because we will get to flush later when
 
742
            we hit EOF pretty quick
 
743
          */
 
744
        }
 
745
 
 
746
        if (fatal_error)
 
747
        {
 
748
          errmsg = "error reading log entry";
 
749
          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
750
          goto err;
 
751
        }
 
752
        log.error=0;
 
753
      }
 
754
    }
 
755
    else
 
756
    {
 
757
      bool loop_breaker = 0;
 
758
      /* need this to break out of the for loop from switch */
 
759
 
 
760
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
 
761
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
 
762
      case LOG_INFO_EOF:
 
763
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
 
764
        break;
 
765
      case 0:
 
766
        break;
 
767
      default:
 
768
        errmsg = "could not find next log";
 
769
        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
770
        goto err;
 
771
      }
 
772
 
 
773
      if (loop_breaker)
 
774
        break;
 
775
 
 
776
      end_io_cache(&log);
 
777
      (void) my_close(file, MYF(MY_WME));
 
778
 
 
779
      /*
 
780
        Call fake_rotate_event() in case the previous log (the one which
 
781
        we have just finished reading) did not contain a Rotate event
 
782
        (for example (I don't know any other example) the previous log
 
783
        was the last one before the master was shutdown & restarted).
 
784
        This way we tell the slave about the new log's name and
 
785
        position.  If the binlog is 5.0, the next event we are going to
 
786
        read and send is Format_description_log_event.
 
787
      */
 
788
      if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
 
789
          fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
 
790
                            &errmsg))
 
791
      {
 
792
        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
793
        goto err;
 
794
      }
 
795
 
 
796
      packet->length(0);
 
797
      packet->append('\0');
 
798
      if (coord)
 
799
        coord->file_name= log_file_name; // reset to the next
 
800
    }
 
801
  }
 
802
 
 
803
end:
 
804
  end_io_cache(&log);
 
805
  (void)my_close(file, MYF(MY_WME));
 
806
 
 
807
  my_eof(thd);
 
808
  thd_proc_info(thd, "Waiting to finalize termination");
 
809
  pthread_mutex_lock(&LOCK_thread_count);
 
810
  thd->current_linfo = 0;
 
811
  pthread_mutex_unlock(&LOCK_thread_count);
 
812
  return;
 
813
 
 
814
err:
 
815
  thd_proc_info(thd, "Waiting to finalize termination");
 
816
  end_io_cache(&log);
 
817
  /*
 
818
    Exclude  iteration through thread list
 
819
    this is needed for purge_logs() - it will iterate through
 
820
    thread list and update thd->current_linfo->index_file_offset
 
821
    this mutex will make sure that it never tried to update our linfo
 
822
    after we return from this stack frame
 
823
  */
 
824
  pthread_mutex_lock(&LOCK_thread_count);
 
825
  thd->current_linfo = 0;
 
826
  pthread_mutex_unlock(&LOCK_thread_count);
 
827
  if (file >= 0)
 
828
    (void) my_close(file, MYF(MY_WME));
 
829
 
 
830
  my_message(my_errno, errmsg, MYF(0));
 
831
  return;
 
832
}
 
833
 
 
834
int start_slave(THD* thd , Master_info* mi,  bool net_report)
 
835
{
 
836
  int slave_errno= 0;
 
837
  int thread_mask;
 
838
 
 
839
  lock_slave_threads(mi);  // this allows us to cleanly read slave_running
 
840
  // Get a mask of _stopped_ threads
 
841
  init_thread_mask(&thread_mask,mi,1 /* inverse */);
 
842
  /*
 
843
    Below we will start all stopped threads.  But if the user wants to
 
844
    start only one thread, do as if the other thread was running (as we
 
845
    don't wan't to touch the other thread), so set the bit to 0 for the
 
846
    other thread
 
847
  */
 
848
  if (thd->lex->slave_thd_opt)
 
849
    thread_mask&= thd->lex->slave_thd_opt;
 
850
  if (thread_mask) //some threads are stopped, start them
 
851
  {
 
852
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
 
853
                         thread_mask))
 
854
      slave_errno=ER_MASTER_INFO;
 
855
    else if (server_id_supplied && *mi->host)
 
856
    {
 
857
      /*
 
858
        If we will start SQL thread we will care about UNTIL options If
 
859
        not and they are specified we will ignore them and warn user
 
860
        about this fact.
 
861
      */
 
862
      if (thread_mask & SLAVE_SQL)
 
863
      {
 
864
        pthread_mutex_lock(&mi->rli.data_lock);
 
865
 
 
866
        if (thd->lex->mi.pos)
 
867
        {
 
868
          mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
 
869
          mi->rli.until_log_pos= thd->lex->mi.pos;
 
870
          /*
 
871
             We don't check thd->lex->mi.log_file_name for NULL here
 
872
             since it is checked in sql_yacc.yy
 
873
          */
 
874
          strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
 
875
                  sizeof(mi->rli.until_log_name)-1);
 
876
        }
 
877
        else if (thd->lex->mi.relay_log_pos)
 
878
        {
 
879
          mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
 
880
          mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
 
881
          strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
 
882
                  sizeof(mi->rli.until_log_name)-1);
 
883
        }
 
884
        else
 
885
          mi->rli.clear_until_condition();
 
886
 
 
887
        if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
 
888
        {
 
889
          /* Preparing members for effective until condition checking */
 
890
          const char *p= fn_ext(mi->rli.until_log_name);
 
891
          char *p_end;
 
892
          if (*p)
 
893
          {
 
894
            //p points to '.'
 
895
            mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
 
896
            /*
 
897
              p_end points to the first invalid character. If it equals
 
898
              to p, no digits were found, error. If it contains '\0' it
 
899
              means  conversion went ok.
 
900
            */
 
901
            if (p_end==p || *p_end)
 
902
              slave_errno=ER_BAD_SLAVE_UNTIL_COND;
 
903
          }
 
904
          else
 
905
            slave_errno=ER_BAD_SLAVE_UNTIL_COND;
 
906
 
 
907
          /* mark the cached result of the UNTIL comparison as "undefined" */
 
908
          mi->rli.until_log_names_cmp_result=
 
909
            Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
 
910
 
 
911
          /* Issuing warning then started without --skip-slave-start */
 
912
          if (!opt_skip_slave_start)
 
913
            push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
914
                         ER_MISSING_SKIP_SLAVE,
 
915
                         ER(ER_MISSING_SKIP_SLAVE));
 
916
        }
 
917
 
 
918
        pthread_mutex_unlock(&mi->rli.data_lock);
 
919
      }
 
920
      else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
 
921
        push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
 
922
                     ER(ER_UNTIL_COND_IGNORED));
 
923
 
 
924
      if (!slave_errno)
 
925
        slave_errno = start_slave_threads(0 /*no mutex */,
 
926
                                        1 /* wait for start */,
 
927
                                        mi,
 
928
                                        master_info_file,relay_log_info_file,
 
929
                                        thread_mask);
 
930
    }
 
931
    else
 
932
      slave_errno = ER_BAD_SLAVE;
 
933
  }
 
934
  else
 
935
  {
 
936
    /* no error if all threads are already started, only a warning */
 
937
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
 
938
                 ER(ER_SLAVE_WAS_RUNNING));
 
939
  }
 
940
 
 
941
  unlock_slave_threads(mi);
 
942
 
 
943
  if (slave_errno)
 
944
  {
 
945
    if (net_report)
 
946
      my_message(slave_errno, ER(slave_errno), MYF(0));
 
947
    return(1);
 
948
  }
 
949
  else if (net_report)
 
950
    my_ok(thd);
 
951
 
 
952
  return(0);
 
953
}
 
954
 
 
955
 
 
956
int stop_slave(THD* thd, Master_info* mi, bool net_report )
 
957
{
 
958
  int slave_errno;
 
959
  if (!thd)
 
960
    thd = current_thd;
 
961
 
 
962
  thd_proc_info(thd, "Killing slave");
 
963
  int thread_mask;
 
964
  lock_slave_threads(mi);
 
965
  // Get a mask of _running_ threads
 
966
  init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
 
967
  /*
 
968
    Below we will stop all running threads.
 
969
    But if the user wants to stop only one thread, do as if the other thread
 
970
    was stopped (as we don't wan't to touch the other thread), so set the
 
971
    bit to 0 for the other thread
 
972
  */
 
973
  if (thd->lex->slave_thd_opt)
 
974
    thread_mask &= thd->lex->slave_thd_opt;
 
975
 
 
976
  if (thread_mask)
 
977
  {
 
978
    slave_errno= terminate_slave_threads(mi,thread_mask,
 
979
                                         1 /*skip lock */);
 
980
  }
 
981
  else
 
982
  {
 
983
    //no error if both threads are already stopped, only a warning
 
984
    slave_errno= 0;
 
985
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
 
986
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
 
987
  }
 
988
  unlock_slave_threads(mi);
 
989
  thd_proc_info(thd, 0);
 
990
 
 
991
  if (slave_errno)
 
992
  {
 
993
    if (net_report)
 
994
      my_message(slave_errno, ER(slave_errno), MYF(0));
 
995
    return(1);
 
996
  }
 
997
  else if (net_report)
 
998
    my_ok(thd);
 
999
 
 
1000
  return(0);
 
1001
}
 
1002
 
 
1003
 
 
1004
/*
 
1005
  Remove all relay logs and start replication from the start
 
1006
 
 
1007
  SYNOPSIS
 
1008
    reset_slave()
 
1009
    thd                 Thread handler
 
1010
    mi                  Master info for the slave
 
1011
 
 
1012
  RETURN
 
1013
    0   ok
 
1014
    1   error
 
1015
*/
 
1016
 
 
1017
 
 
1018
int reset_slave(THD *thd, Master_info* mi)
 
1019
{
 
1020
  struct stat stat_area;
 
1021
  char fname[FN_REFLEN];
 
1022
  int thread_mask= 0, error= 0;
 
1023
  uint sql_errno=0;
 
1024
  const char* errmsg=0;
 
1025
 
 
1026
  lock_slave_threads(mi);
 
1027
  init_thread_mask(&thread_mask,mi,0 /* not inverse */);
 
1028
  if (thread_mask) // We refuse if any slave thread is running
 
1029
  {
 
1030
    sql_errno= ER_SLAVE_MUST_STOP;
 
1031
    error=1;
 
1032
    goto err;
 
1033
  }
 
1034
 
 
1035
  ha_reset_slave(thd);
 
1036
 
 
1037
  // delete relay logs, clear relay log coordinates
 
1038
  if ((error= purge_relay_logs(&mi->rli, thd,
 
1039
                               1 /* just reset */,
 
1040
                               &errmsg)))
 
1041
    goto err;
 
1042
 
 
1043
  /* Clear master's log coordinates */
 
1044
  init_master_log_pos(mi);
 
1045
  /*
 
1046
     Reset errors (the idea is that we forget about the
 
1047
     old master).
 
1048
  */
 
1049
  mi->rli.clear_error();
 
1050
  mi->rli.clear_until_condition();
 
1051
 
 
1052
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
 
1053
  end_master_info(mi);
 
1054
  // and delete these two files
 
1055
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
 
1056
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
 
1057
  {
 
1058
    error=1;
 
1059
    goto err;
 
1060
  }
 
1061
  // delete relay_log_info_file
 
1062
  fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
 
1063
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
 
1064
  {
 
1065
    error=1;
 
1066
    goto err;
 
1067
  }
 
1068
 
 
1069
err:
 
1070
  unlock_slave_threads(mi);
 
1071
  if (error)
 
1072
    my_error(sql_errno, MYF(0), errmsg);
 
1073
  return(error);
 
1074
}
 
1075
 
 
1076
/*
 
1077
 
 
1078
  Kill all Binlog_dump threads which previously talked to the same slave
 
1079
  ("same" means with the same server id). Indeed, if the slave stops, if the
 
1080
  Binlog_dump thread is waiting (pthread_cond_wait) for binlog update, then it
 
1081
  will keep existing until a query is written to the binlog. If the master is
 
1082
  idle, then this could last long, and if the slave reconnects, we could have 2
 
1083
  Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
 
1084
  binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
 
1085
  the master kills any existing thread with the slave's server id (if this id is
 
1086
  not zero; it will be true for real slaves, but false for mysqlbinlog when it
 
1087
  sends COM_BINLOG_DUMP to get a remote binlog dump).
 
1088
 
 
1089
  SYNOPSIS
 
1090
    kill_zombie_dump_threads()
 
1091
    slave_server_id     the slave's server id
 
1092
 
 
1093
*/
 
1094
 
 
1095
 
 
1096
void kill_zombie_dump_threads(uint32_t slave_server_id)
 
1097
{
 
1098
  pthread_mutex_lock(&LOCK_thread_count);
 
1099
  I_List_iterator<THD> it(threads);
 
1100
  THD *tmp;
 
1101
 
 
1102
  while ((tmp=it++))
 
1103
  {
 
1104
    if (tmp->command == COM_BINLOG_DUMP &&
 
1105
       tmp->server_id == slave_server_id)
 
1106
    {
 
1107
      pthread_mutex_lock(&tmp->LOCK_delete);    // Lock from delete
 
1108
      break;
 
1109
    }
 
1110
  }
 
1111
  pthread_mutex_unlock(&LOCK_thread_count);
 
1112
  if (tmp)
 
1113
  {
 
1114
    /*
 
1115
      Here we do not call kill_one_thread() as
 
1116
      it will be slow because it will iterate through the list
 
1117
      again. We just to do kill the thread ourselves.
 
1118
    */
 
1119
    tmp->awake(THD::KILL_QUERY);
 
1120
    pthread_mutex_unlock(&tmp->LOCK_delete);
 
1121
  }
 
1122
}
 
1123
 
 
1124
 
 
1125
bool change_master(THD* thd, Master_info* mi)
 
1126
{
 
1127
  int thread_mask;
 
1128
  const char* errmsg= 0;
 
1129
  bool need_relay_log_purge= 1;
 
1130
 
 
1131
  lock_slave_threads(mi);
 
1132
  init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
 
1133
  if (thread_mask) // We refuse if any slave thread is running
 
1134
  {
 
1135
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
 
1136
    unlock_slave_threads(mi);
 
1137
    return(true);
 
1138
  }
 
1139
 
 
1140
  thd_proc_info(thd, "Changing master");
 
1141
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
 
1142
  // TODO: see if needs re-write
 
1143
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
 
1144
                       thread_mask))
 
1145
  {
 
1146
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
 
1147
    unlock_slave_threads(mi);
 
1148
    return(true);
 
1149
  }
 
1150
 
 
1151
  /*
 
1152
    Data lock not needed since we have already stopped the running threads,
 
1153
    and we have the hold on the run locks which will keep all threads that
 
1154
    could possibly modify the data structures from running
 
1155
  */
 
1156
 
 
1157
  /*
 
1158
    If the user specified host or port without binlog or position,
 
1159
    reset binlog's name to FIRST and position to 4.
 
1160
  */
 
1161
 
 
1162
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
 
1163
  {
 
1164
    mi->master_log_name[0] = 0;
 
1165
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
 
1166
  }
 
1167
 
 
1168
  if (lex_mi->log_file_name)
 
1169
    strmake(mi->master_log_name, lex_mi->log_file_name,
 
1170
            sizeof(mi->master_log_name)-1);
 
1171
  if (lex_mi->pos)
 
1172
  {
 
1173
    mi->master_log_pos= lex_mi->pos;
 
1174
  }
 
1175
 
 
1176
  if (lex_mi->host)
 
1177
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
 
1178
  if (lex_mi->user)
 
1179
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
 
1180
  if (lex_mi->password)
 
1181
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
 
1182
  if (lex_mi->port)
 
1183
    mi->port = lex_mi->port;
 
1184
  if (lex_mi->connect_retry)
 
1185
    mi->connect_retry = lex_mi->connect_retry;
 
1186
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1187
    mi->heartbeat_period = lex_mi->heartbeat_period;
 
1188
  else
 
1189
    mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
 
1190
                                      (slave_net_timeout/2.0));
 
1191
  mi->received_heartbeats= 0LL; // counter lives until master is CHANGEd
 
1192
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1193
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
1194
 
 
1195
  if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1196
    mi->ssl_verify_server_cert=
 
1197
      (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
1198
 
 
1199
  if (lex_mi->ssl_ca)
 
1200
    strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
 
1201
  if (lex_mi->ssl_capath)
 
1202
    strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
 
1203
  if (lex_mi->ssl_cert)
 
1204
    strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
 
1205
  if (lex_mi->ssl_cipher)
 
1206
    strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
 
1207
  if (lex_mi->ssl_key)
 
1208
    strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
 
1209
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
 
1210
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
 
1211
      lex_mi->ssl_verify_server_cert )
 
1212
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
1213
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
 
1214
 
 
1215
  if (lex_mi->relay_log_name)
 
1216
  {
 
1217
    need_relay_log_purge= 0;
 
1218
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
 
1219
            sizeof(mi->rli.group_relay_log_name)-1);
 
1220
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
 
1221
            sizeof(mi->rli.event_relay_log_name)-1);
 
1222
  }
 
1223
 
 
1224
  if (lex_mi->relay_log_pos)
 
1225
  {
 
1226
    need_relay_log_purge= 0;
 
1227
    mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
 
1228
  }
 
1229
 
 
1230
  /*
 
1231
    If user did specify neither host nor port nor any log name nor any log
 
1232
    pos, i.e. he specified only user/password/master_connect_retry, he probably
 
1233
    wants replication to resume from where it had left, i.e. from the
 
1234
    coordinates of the **SQL** thread (imagine the case where the I/O is ahead
 
1235
    of the SQL; restarting from the coordinates of the I/O would lose some
 
1236
    events which is probably unwanted when you are just doing minor changes
 
1237
    like changing master_connect_retry).
 
1238
    A side-effect is that if only the I/O thread was started, this thread may
 
1239
    restart from ''/4 after the CHANGE MASTER. That's a minor problem (it is a
 
1240
    much more unlikely situation than the one we are fixing here).
 
1241
    Note: coordinates of the SQL thread must be read here, before the
 
1242
    'if (need_relay_log_purge)' block which resets them.
 
1243
  */
 
1244
  if (!lex_mi->host && !lex_mi->port &&
 
1245
      !lex_mi->log_file_name && !lex_mi->pos &&
 
1246
      need_relay_log_purge)
 
1247
   {
 
1248
     /*
 
1249
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
 
1250
       not initialized), so we use a max().
 
1251
       What happens to mi->rli.master_log_pos during the initialization stages
 
1252
       of replication is not 100% clear, so we guard against problems using
 
1253
       max().
 
1254
      */
 
1255
     mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
 
1256
                              mi->rli.group_master_log_pos);
 
1257
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
 
1258
             sizeof(mi->master_log_name)-1);
 
1259
  }
 
1260
  /*
 
1261
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
 
1262
    a slave before).
 
1263
  */
 
1264
  if (flush_master_info(mi, 0))
 
1265
  {
 
1266
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
 
1267
    unlock_slave_threads(mi);
 
1268
    return(true);
 
1269
  }
 
1270
  if (need_relay_log_purge)
 
1271
  {
 
1272
    relay_log_purge= 1;
 
1273
    thd_proc_info(thd, "Purging old relay logs");
 
1274
    if (purge_relay_logs(&mi->rli, thd,
 
1275
                         0 /* not only reset, but also reinit */,
 
1276
                         &errmsg))
 
1277
    {
 
1278
      my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
 
1279
      unlock_slave_threads(mi);
 
1280
      return(true);
 
1281
    }
 
1282
  }
 
1283
  else
 
1284
  {
 
1285
    const char* msg;
 
1286
    relay_log_purge= 0;
 
1287
    /* Relay log is already initialized */
 
1288
    if (init_relay_log_pos(&mi->rli,
 
1289
                           mi->rli.group_relay_log_name,
 
1290
                           mi->rli.group_relay_log_pos,
 
1291
                           0 /*no data lock*/,
 
1292
                           &msg, 0))
 
1293
    {
 
1294
      my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
 
1295
      unlock_slave_threads(mi);
 
1296
      return(true);
 
1297
    }
 
1298
  }
 
1299
  /*
 
1300
    Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block,
 
1301
    so restore them to good values. If we left them to ''/0, that would work;
 
1302
    but that would fail in the case of 2 successive CHANGE MASTER (without a
 
1303
    START SLAVE in between): because first one would set the coords in mi to
 
1304
    the good values of those in rli, the set those in rli to ''/0, then
 
1305
    second CHANGE MASTER would set the coords in mi to those of rli, i.e. to
 
1306
    ''/0: we have lost all copies of the original good coordinates.
 
1307
    That's why we always save good coords in rli.
 
1308
  */
 
1309
  mi->rli.group_master_log_pos= mi->master_log_pos;
 
1310
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
 
1311
          sizeof(mi->rli.group_master_log_name)-1);
 
1312
 
 
1313
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
 
1314
    mi->rli.group_master_log_pos=0;
 
1315
 
 
1316
  pthread_mutex_lock(&mi->rli.data_lock);
 
1317
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
 
1318
  /* Clear the errors, for a clean start */
 
1319
  mi->rli.clear_error();
 
1320
  mi->rli.clear_until_condition();
 
1321
  /*
 
1322
    If we don't write new coordinates to disk now, then old will remain in
 
1323
    relay-log.info until START SLAVE is issued; but if mysqld is shutdown
 
1324
    before START SLAVE, then old will remain in relay-log.info, and will be the
 
1325
    in-memory value at restart (thus causing errors, as the old relay log does
 
1326
    not exist anymore).
 
1327
  */
 
1328
  flush_relay_log_info(&mi->rli);
 
1329
  pthread_cond_broadcast(&mi->data_cond);
 
1330
  pthread_mutex_unlock(&mi->rli.data_lock);
 
1331
 
 
1332
  unlock_slave_threads(mi);
 
1333
  thd_proc_info(thd, 0);
 
1334
  my_ok(thd);
 
1335
  return(false);
 
1336
}
 
1337
 
 
1338
int reset_master(THD* thd)
 
1339
{
 
1340
  if (!mysql_bin_log.is_open())
 
1341
  {
 
1342
    my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
 
1343
               ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
 
1344
    return 1;
 
1345
  }
 
1346
  return mysql_bin_log.reset_logs(thd);
 
1347
}
 
1348
 
 
1349
int cmp_master_pos(const char* log_file_name1, uint64_t log_pos1,
 
1350
                   const char* log_file_name2, uint64_t log_pos2)
 
1351
{
 
1352
  int res;
 
1353
  uint log_file_name1_len=  strlen(log_file_name1);
 
1354
  uint log_file_name2_len=  strlen(log_file_name2);
 
1355
 
 
1356
  //  We assume that both log names match up to '.'
 
1357
  if (log_file_name1_len == log_file_name2_len)
 
1358
  {
 
1359
    if ((res= strcmp(log_file_name1, log_file_name2)))
 
1360
      return res;
 
1361
    return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
 
1362
  }
 
1363
  return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
 
1364
}
 
1365
 
 
1366
 
 
1367
bool mysql_show_binlog_events(THD* thd)
 
1368
{
 
1369
  Protocol *protocol= thd->protocol;
 
1370
  List<Item> field_list;
 
1371
  const char *errmsg= 0;
 
1372
  bool ret= true;
 
1373
  IO_CACHE log;
 
1374
  File file= -1;
 
1375
 
 
1376
  Log_event::init_show_field_list(&field_list);
 
1377
  if (protocol->send_fields(&field_list,
 
1378
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1379
    return(true);
 
1380
 
 
1381
  Format_description_log_event *description_event= new
 
1382
    Format_description_log_event(3); /* MySQL 4.0 by default */
 
1383
 
 
1384
  /*
 
1385
    Wait for handlers to insert any pending information
 
1386
    into the binlog.  For e.g. ndb which updates the binlog asynchronously
 
1387
    this is needed so that the uses sees all its own commands in the binlog
 
1388
  */
 
1389
  ha_binlog_wait(thd);
 
1390
 
 
1391
  if (mysql_bin_log.is_open())
 
1392
  {
 
1393
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
 
1394
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
 
1395
    ha_rows event_count, limit_start, limit_end;
 
1396
    my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
 
1397
    char search_file_name[FN_REFLEN], *name;
 
1398
    const char *log_file_name = lex_mi->log_file_name;
 
1399
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
 
1400
    LOG_INFO linfo;
 
1401
    Log_event* ev;
 
1402
 
 
1403
    unit->set_limit(thd->lex->current_select);
 
1404
    limit_start= unit->offset_limit_cnt;
 
1405
    limit_end= unit->select_limit_cnt;
 
1406
 
 
1407
    name= search_file_name;
 
1408
    if (log_file_name)
 
1409
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
 
1410
    else
 
1411
      name=0;                                   // Find first log
 
1412
 
 
1413
    linfo.index_file_offset = 0;
 
1414
 
 
1415
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
 
1416
    {
 
1417
      errmsg = "Could not find target log";
 
1418
      goto err;
 
1419
    }
 
1420
 
 
1421
    pthread_mutex_lock(&LOCK_thread_count);
 
1422
    thd->current_linfo = &linfo;
 
1423
    pthread_mutex_unlock(&LOCK_thread_count);
 
1424
 
 
1425
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
 
1426
      goto err;
 
1427
 
 
1428
    /*
 
1429
      to account binlog event header size
 
1430
    */
 
1431
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
 
1432
 
 
1433
    pthread_mutex_lock(log_lock);
 
1434
 
 
1435
    /*
 
1436
      open_binlog() sought to position 4.
 
1437
      Read the first event in case it's a Format_description_log_event, to
 
1438
      know the format. If there's no such event, we are 3.23 or 4.x. This
 
1439
      code, like before, can't read 3.23 binlogs.
 
1440
      This code will fail on a mixed relay log (one which has Format_desc then
 
1441
      Rotate then Format_desc).
 
1442
    */
 
1443
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
 
1444
    if (ev)
 
1445
    {
 
1446
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
 
1447
      {
 
1448
        delete description_event;
 
1449
        description_event= (Format_description_log_event*) ev;
 
1450
      }
 
1451
      else
 
1452
        delete ev;
 
1453
    }
 
1454
 
 
1455
    my_b_seek(&log, pos);
 
1456
 
 
1457
    if (!description_event->is_valid())
 
1458
    {
 
1459
      errmsg="Invalid Format_description event; could be out of memory";
 
1460
      goto err;
 
1461
    }
 
1462
 
 
1463
    for (event_count = 0;
 
1464
         (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
 
1465
                                         description_event)); )
 
1466
    {
 
1467
      if (event_count >= limit_start &&
 
1468
          ev->net_send(protocol, linfo.log_file_name, pos))
 
1469
      {
 
1470
        errmsg = "Net error";
 
1471
        delete ev;
 
1472
        pthread_mutex_unlock(log_lock);
 
1473
        goto err;
 
1474
      }
 
1475
 
 
1476
      pos = my_b_tell(&log);
 
1477
      delete ev;
 
1478
 
 
1479
      if (++event_count >= limit_end)
 
1480
        break;
 
1481
    }
 
1482
 
 
1483
    if (event_count < limit_end && log.error)
 
1484
    {
 
1485
      errmsg = "Wrong offset or I/O error";
 
1486
      pthread_mutex_unlock(log_lock);
 
1487
      goto err;
 
1488
    }
 
1489
 
 
1490
    pthread_mutex_unlock(log_lock);
 
1491
  }
 
1492
 
 
1493
  ret= false;
 
1494
 
 
1495
err:
 
1496
  delete description_event;
 
1497
  if (file >= 0)
 
1498
  {
 
1499
    end_io_cache(&log);
 
1500
    (void) my_close(file, MYF(MY_WME));
 
1501
  }
 
1502
 
 
1503
  if (errmsg)
 
1504
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
 
1505
             "SHOW BINLOG EVENTS", errmsg);
 
1506
  else
 
1507
    my_eof(thd);
 
1508
 
 
1509
  pthread_mutex_lock(&LOCK_thread_count);
 
1510
  thd->current_linfo = 0;
 
1511
  pthread_mutex_unlock(&LOCK_thread_count);
 
1512
  return(ret);
 
1513
}
 
1514
 
 
1515
 
 
1516
bool show_binlog_info(THD* thd)
 
1517
{
 
1518
  Protocol *protocol= thd->protocol;
 
1519
  List<Item> field_list;
 
1520
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
 
1521
  field_list.push_back(new Item_return_int("Position",20,
 
1522
                                           DRIZZLE_TYPE_LONGLONG));
 
1523
  field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
 
1524
  field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
 
1525
 
 
1526
  if (protocol->send_fields(&field_list,
 
1527
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1528
    return(true);
 
1529
  protocol->prepare_for_resend();
 
1530
 
 
1531
  if (mysql_bin_log.is_open())
 
1532
  {
 
1533
    LOG_INFO li;
 
1534
    mysql_bin_log.get_current_log(&li);
 
1535
    int dir_len = dirname_length(li.log_file_name);
 
1536
    protocol->store(li.log_file_name + dir_len, &my_charset_bin);
 
1537
    protocol->store((uint64_t) li.pos);
 
1538
    protocol->store(binlog_filter->get_do_db());
 
1539
    protocol->store(binlog_filter->get_ignore_db());
 
1540
    if (protocol->write())
 
1541
      return(true);
 
1542
  }
 
1543
  my_eof(thd);
 
1544
  return(false);
 
1545
}
 
1546
 
 
1547
 
 
1548
/*
 
1549
  Send a list of all binary logs to client
 
1550
 
 
1551
  SYNOPSIS
 
1552
    show_binlogs()
 
1553
    thd         Thread specific variable
 
1554
 
 
1555
  RETURN VALUES
 
1556
    false OK
 
1557
    true  error
 
1558
*/
 
1559
 
 
1560
bool show_binlogs(THD* thd)
 
1561
{
 
1562
  IO_CACHE *index_file;
 
1563
  LOG_INFO cur;
 
1564
  File file;
 
1565
  char fname[FN_REFLEN];
 
1566
  List<Item> field_list;
 
1567
  uint length;
 
1568
  int cur_dir_len;
 
1569
  Protocol *protocol= thd->protocol;
 
1570
 
 
1571
  if (!mysql_bin_log.is_open())
 
1572
  {
 
1573
    my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0));
 
1574
    return 1;
 
1575
  }
 
1576
 
 
1577
  field_list.push_back(new Item_empty_string("Log_name", 255));
 
1578
  field_list.push_back(new Item_return_int("File_size", 20,
 
1579
                                           DRIZZLE_TYPE_LONGLONG));
 
1580
  if (protocol->send_fields(&field_list,
 
1581
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1582
    return(true);
 
1583
  
 
1584
  pthread_mutex_lock(mysql_bin_log.get_log_lock());
 
1585
  mysql_bin_log.lock_index();
 
1586
  index_file=mysql_bin_log.get_index_file();
 
1587
  
 
1588
  mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
 
1589
  pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
 
1590
  
 
1591
  cur_dir_len= dirname_length(cur.log_file_name);
 
1592
 
 
1593
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
 
1594
 
 
1595
  /* The file ends with EOF or empty line */
 
1596
  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
 
1597
  {
 
1598
    int dir_len;
 
1599
    uint64_t file_length= 0;                   // Length if open fails
 
1600
    fname[--length] = '\0';                     // remove the newline
 
1601
 
 
1602
    protocol->prepare_for_resend();
 
1603
    dir_len= dirname_length(fname);
 
1604
    length-= dir_len;
 
1605
    protocol->store(fname + dir_len, length, &my_charset_bin);
 
1606
 
 
1607
    if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
 
1608
      file_length= cur.pos;  /* The active log, use the active position */
 
1609
    else
 
1610
    {
 
1611
      /* this is an old log, open it and find the size */
 
1612
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
 
1613
                         MYF(0))) >= 0)
 
1614
      {
 
1615
        file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));
 
1616
        my_close(file, MYF(0));
 
1617
      }
 
1618
    }
 
1619
    protocol->store(file_length);
 
1620
    if (protocol->write())
 
1621
      goto err;
 
1622
  }
 
1623
  mysql_bin_log.unlock_index();
 
1624
  my_eof(thd);
 
1625
  return(false);
 
1626
 
 
1627
err:
 
1628
  mysql_bin_log.unlock_index();
 
1629
  return(true);
 
1630
}
 
1631
 
 
1632
/**
 
1633
   Load data's io cache specific hook to be executed
 
1634
   before a chunk of data is being read into the cache's buffer
 
1635
   The fuction instantianates and writes into the binlog
 
1636
   replication events along LOAD DATA processing.
 
1637
   
 
1638
   @param file  pointer to io-cache
 
1639
   @return 0
 
1640
*/
 
1641
int log_loaded_block(IO_CACHE* file)
 
1642
{
 
1643
  LOAD_FILE_INFO *lf_info;
 
1644
  uint block_len;
 
1645
  /* buffer contains position where we started last read */
 
1646
  uchar* buffer= (uchar*) my_b_get_buffer_start(file);
 
1647
  uint max_event_size= current_thd->variables.max_allowed_packet;
 
1648
  lf_info= (LOAD_FILE_INFO*) file->arg;
 
1649
  if (lf_info->thd->current_stmt_binlog_row_based)
 
1650
    return(0);
 
1651
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
 
1652
      lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
 
1653
    return(0);
 
1654
  
 
1655
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
 
1656
       buffer += min(block_len, max_event_size),
 
1657
       block_len -= min(block_len, max_event_size))
 
1658
  {
 
1659
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
 
1660
    if (lf_info->wrote_create_file)
 
1661
    {
 
1662
      Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
 
1663
                               min(block_len, max_event_size),
 
1664
                               lf_info->log_delayed);
 
1665
      mysql_bin_log.write(&a);
 
1666
    }
 
1667
    else
 
1668
    {
 
1669
      Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
 
1670
                                   buffer,
 
1671
                                   min(block_len, max_event_size),
 
1672
                                   lf_info->log_delayed);
 
1673
      mysql_bin_log.write(&b);
 
1674
      lf_info->wrote_create_file= 1;
 
1675
    }
 
1676
  }
 
1677
  return(0);
 
1678
}
 
1679
 
 
1680
/*
 
1681
  Replication System Variables
 
1682
*/
 
1683
 
 
1684
class sys_var_slave_skip_counter :public sys_var
 
1685
{
 
1686
public:
 
1687
  sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
 
1688
    :sys_var(name_arg)
 
1689
  { chain_sys_var(chain); }
 
1690
  bool check(THD *thd, set_var *var);
 
1691
  bool update(THD *thd, set_var *var);
 
1692
  bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
 
1693
  /*
 
1694
    We can't retrieve the value of this, so we don't have to define
 
1695
    type() or value_ptr()
 
1696
  */
 
1697
};
 
1698
 
 
1699
class sys_var_sync_binlog_period :public sys_var_long_ptr
 
1700
{
 
1701
public:
 
1702
  sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
 
1703
                             ulong *value_ptr)
 
1704
    :sys_var_long_ptr(chain, name_arg,value_ptr) {}
 
1705
  bool update(THD *thd, set_var *var);
 
1706
};
 
1707
 
 
1708
static void fix_slave_net_timeout(THD *thd,
 
1709
                                  enum_var_type type __attribute__((unused)))
 
1710
{
 
1711
#ifdef HAVE_REPLICATION
 
1712
  pthread_mutex_lock(&LOCK_active_mi);
 
1713
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
 
1714
    push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1715
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
 
1716
                        "The currect value for master_heartbeat_period"
 
1717
                        " exceeds the new value of `slave_net_timeout' sec."
 
1718
                        " A sensible value for the period should be"
 
1719
                        " less than the timeout.");
 
1720
  pthread_mutex_unlock(&LOCK_active_mi);
 
1721
#endif
 
1722
  return;
 
1723
}
 
1724
 
 
1725
static sys_var_chain vars = { NULL, NULL };
 
1726
 
 
1727
static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
 
1728
                                            &relay_log_purge);
 
1729
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
 
1730
                                              &slave_net_timeout,
 
1731
                                              fix_slave_net_timeout);
 
1732
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
 
1733
                                                &slave_trans_retries);
 
1734
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
 
1735
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
 
1736
 
 
1737
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
 
1738
 
 
1739
 
 
1740
static int show_slave_skip_errors(THD *thd __attribute__((unused)),
 
1741
                                  SHOW_VAR *var, char *buff)
 
1742
{
 
1743
  var->type=SHOW_CHAR;
 
1744
  var->value= buff;
 
1745
  if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
 
1746
  {
 
1747
    var->value= const_cast<char *>("OFF");
 
1748
  }
 
1749
  else if (bitmap_is_set_all(&slave_error_mask))
 
1750
  {
 
1751
    var->value= const_cast<char *>("ALL");
 
1752
  }
 
1753
  else
 
1754
  {
 
1755
    /* 10 is enough assuming errors are max 4 digits */
 
1756
    int i;
 
1757
    var->value= buff;
 
1758
    for (i= 1;
 
1759
         i < MAX_SLAVE_ERROR &&
 
1760
         (buff - var->value) < SHOW_VAR_FUNC_BUFF_SIZE;
 
1761
         i++)
 
1762
    {
 
1763
      if (bitmap_is_set(&slave_error_mask, i))
 
1764
      {
 
1765
        buff= int10_to_str(i, buff, 10);
 
1766
        *buff++= ',';
 
1767
      }
 
1768
    }
 
1769
    if (var->value != buff)
 
1770
      buff--;                           // Remove last ','
 
1771
    if (i < MAX_SLAVE_ERROR)
 
1772
      buff= stpcpy(buff, "...");  // Couldn't show all errors
 
1773
    *buff=0;
 
1774
  }
 
1775
  return 0;
 
1776
}
 
1777
 
 
1778
static st_show_var_func_container
 
1779
show_slave_skip_errors_cont = { &show_slave_skip_errors };
 
1780
 
 
1781
 
 
1782
static SHOW_VAR fixed_vars[]= {
 
1783
  {"log_slave_updates",       (char*) &opt_log_slave_updates,       SHOW_MY_BOOL},
 
1784
  {"relay_log" , (char*) &opt_relay_logname, SHOW_CHAR_PTR},
 
1785
  {"relay_log_index", (char*) &opt_relaylog_index_name, SHOW_CHAR_PTR},
 
1786
  {"relay_log_info_file", (char*) &relay_log_info_file, SHOW_CHAR_PTR},
 
1787
  {"relay_log_space_limit",   (char*) &relay_log_space_limit,       SHOW_LONGLONG},
 
1788
  {"slave_load_tmpdir",       (char*) &slave_load_tmpdir,           SHOW_CHAR_PTR},
 
1789
  {"slave_skip_errors",       (char*) &show_slave_skip_errors_cont,      SHOW_FUNC},
 
1790
};
 
1791
 
 
1792
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((unused)),
 
1793
                                       set_var *var)
 
1794
{
 
1795
  int result= 0;
 
1796
  pthread_mutex_lock(&LOCK_active_mi);
 
1797
  pthread_mutex_lock(&active_mi->rli.run_lock);
 
1798
  if (active_mi->rli.slave_running)
 
1799
  {
 
1800
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
 
1801
    result=1;
 
1802
  }
 
1803
  pthread_mutex_unlock(&active_mi->rli.run_lock);
 
1804
  pthread_mutex_unlock(&LOCK_active_mi);
 
1805
  var->save_result.ulong_value= (ulong) var->value->val_int();
 
1806
  return result;
 
1807
}
 
1808
 
 
1809
 
 
1810
bool sys_var_slave_skip_counter::update(THD *thd __attribute__((unused)),
 
1811
                                        set_var *var)
 
1812
{
 
1813
  pthread_mutex_lock(&LOCK_active_mi);
 
1814
  pthread_mutex_lock(&active_mi->rli.run_lock);
 
1815
  /*
 
1816
    The following test should normally never be true as we test this
 
1817
    in the check function;  To be safe against multiple
 
1818
    SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
 
1819
  */
 
1820
  if (!active_mi->rli.slave_running)
 
1821
  {
 
1822
    pthread_mutex_lock(&active_mi->rli.data_lock);
 
1823
    active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
 
1824
    pthread_mutex_unlock(&active_mi->rli.data_lock);
 
1825
  }
 
1826
  pthread_mutex_unlock(&active_mi->rli.run_lock);
 
1827
  pthread_mutex_unlock(&LOCK_active_mi);
 
1828
  return 0;
 
1829
}
 
1830
 
 
1831
 
 
1832
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((unused)),
 
1833
                                        set_var *var)
 
1834
{
 
1835
  sync_binlog_period= (ulong) var->save_result.uint64_t_value;
 
1836
  return 0;
 
1837
}
 
1838
 
 
1839
int init_replication_sys_vars()
 
1840
{
 
1841
  mysql_append_static_vars(fixed_vars, sizeof(fixed_vars) / sizeof(SHOW_VAR));
 
1842
 
 
1843
  if (mysql_add_sys_var_chain(vars.first, my_long_options))
 
1844
  {
 
1845
    /* should not happen */
 
1846
    fprintf(stderr, "failed to initialize replication system variables");
 
1847
    unireg_abort(1);
 
1848
  }
 
1849
  return 0;
 
1850
}
 
1851
 
 
1852
#endif /* HAVE_REPLICATION */