~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_repl.cc

  • Committer: Brian Aker
  • Date: 2009-05-15 17:06:35 UTC
  • mto: This revision was merged to the branch mainline in revision 1023.
  • Revision ID: brian@gaz-20090515170635-croy1u63a3gqdn9n
Dead convert functions for character sets.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2000-2006 MySQL AB & Sasha
2
 
 
3
 
   This program is free software; you can redistribute it and/or modify
4
 
   it under the terms of the GNU General Public License as published by
5
 
   the Free Software Foundation; version 2 of the License.
6
 
 
7
 
   This program is distributed in the hope that it will be useful,
8
 
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 
   GNU General Public License for more details.
11
 
 
12
 
   You should have received a copy of the GNU General Public License
13
 
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
 
 
16
 
#include <drizzled/server_includes.h>
17
 
 
18
 
#include "rpl_mi.h"
19
 
#include "sql_repl.h"
20
 
#include "log_event.h"
21
 
#include "rpl_filter.h"
22
 
#include <drizzled/drizzled_error_messages.h>
23
 
 
24
 
int max_binlog_dump_events = 0; // unlimited
25
 
 
26
 
/*
27
 
    fake_rotate_event() builds a fake (=which does not exist physically in any
28
 
    binlog) Rotate event, which contains the name of the binlog we are going to
29
 
    send to the slave (because the slave may not know it if it just asked for
30
 
    MASTER_LOG_FILE='', MASTER_LOG_POS=4).
31
 
    < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
32
 
    After this version we always call it, so that a 3.23.58 slave can rely on
33
 
    it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
34
 
    zeros in the good positions which, by chance, make it possible for the 3.23
35
 
    slave to detect that this event is unexpected) (this is luck which happens
36
 
    because the master and slave disagree on the size of the header of
37
 
    Log_event).
38
 
 
39
 
    Relying on the event length of the Rotate event instead of these
40
 
    well-placed zeros was not possible as Rotate events have a variable-length
41
 
    part.
42
 
*/
43
 
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
44
 
                             uint64_t position, const char** errmsg)
45
 
{
46
 
  char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
47
 
  /*
48
 
    'when' (the timestamp) is set to 0 so that slave could distinguish between
49
 
    real and fake Rotate events (if necessary)
50
 
  */
51
 
  memset(header, 0, 4);
52
 
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
53
 
 
54
 
  char* p = log_file_name+dirname_length(log_file_name);
55
 
  uint32_t ident_len = (uint32_t) strlen(p);
56
 
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
57
 
  int4store(header + SERVER_ID_OFFSET, server_id);
58
 
  int4store(header + EVENT_LEN_OFFSET, event_len);
59
 
  int2store(header + FLAGS_OFFSET, 0);
60
 
 
61
 
  // TODO: check what problems this may cause and fix them
62
 
  int4store(header + LOG_POS_OFFSET, 0);
63
 
 
64
 
  packet->append(header, sizeof(header));
65
 
  int8store(buf+R_POS_OFFSET,position);
66
 
  packet->append(buf, ROTATE_HEADER_LEN);
67
 
  packet->append(p,ident_len);
68
 
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
69
 
  {
70
 
    *errmsg = "failed on my_net_write()";
71
 
    return(-1);
72
 
  }
73
 
  return(0);
74
 
}
75
 
 
76
 
static int send_file(THD *thd)
77
 
{
78
 
  NET* net = &thd->net;
79
 
  int fd = -1, error = 1;
80
 
  size_t bytes;
81
 
  char fname[FN_REFLEN+1];
82
 
  const char *errmsg = 0;
83
 
  int old_timeout;
84
 
  unsigned long packet_len;
85
 
  unsigned char buf[IO_SIZE];                           // It's safe to alloc this
86
 
 
87
 
  /*
88
 
    The client might be slow loading the data, give him wait_timeout to do
89
 
    the job
90
 
  */
91
 
  old_timeout= net->read_timeout;
92
 
  my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
93
 
 
94
 
  /*
95
 
    We need net_flush here because the client will not know it needs to send
96
 
    us the file name until it has processed the load event entry
97
 
  */
98
 
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
99
 
  {
100
 
    errmsg = _("Failed in send_file() while reading file name");
101
 
    goto err;
102
 
  }
103
 
 
104
 
  // terminate with \0 for fn_format
105
 
  *((char*)net->read_pos +  packet_len) = 0;
106
 
  fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
107
 
  // this is needed to make replicate-ignore-db
108
 
  if (!strcmp(fname,"/dev/null"))
109
 
    goto end;
110
 
 
111
 
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
112
 
  {
113
 
    errmsg = _("Failed in send_file() on open of file");
114
 
    goto err;
115
 
  }
116
 
 
117
 
  while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0)
118
 
  {
119
 
    if (my_net_write(net, buf, bytes))
120
 
    {
121
 
      errmsg = _("Failed in send_file() while writing data to client");
122
 
      goto err;
123
 
    }
124
 
  }
125
 
 
126
 
 end:
127
 
  if (my_net_write(net, (unsigned char*) "", 0) || net_flush(net) ||
128
 
      (my_net_read(net) == packet_error))
129
 
  {
130
 
    errmsg = _("Failed in send_file() while negotiating file transfer close");
131
 
    goto err;
132
 
  }
133
 
  error = 0;
134
 
 
135
 
 err:
136
 
  my_net_set_read_timeout(net, old_timeout);
137
 
  if (fd >= 0)
138
 
    (void) my_close(fd, MYF(0));
139
 
  if (errmsg)
140
 
  {
141
 
    sql_print_error(errmsg);
142
 
  }
143
 
  return(error);
144
 
}
145
 
 
146
 
 
147
 
/*
148
 
  Adjust the position pointer in the binary log file for all running slaves
149
 
 
150
 
  SYNOPSIS
151
 
    adjust_linfo_offsets()
152
 
    purge_offset        Number of bytes removed from start of log index file
153
 
 
154
 
  NOTES
155
 
    - This is called when doing a PURGE when we delete lines from the
156
 
      index log file
157
 
 
158
 
  REQUIREMENTS
159
 
    - Before calling this function, we have to ensure that no threads are
160
 
      using any binary log file before purge_offset.a
161
 
 
162
 
  TODO
163
 
    - Inform the slave threads that they should sync the position
164
 
      in the binary log file with flush_relay_log_info.
165
 
      Now they sync is done for next read.
166
 
*/
167
 
 
168
 
void adjust_linfo_offsets(my_off_t purge_offset)
169
 
{
170
 
  THD *tmp;
171
 
 
172
 
  pthread_mutex_lock(&LOCK_thread_count);
173
 
  I_List_iterator<THD> it(threads);
174
 
 
175
 
  while ((tmp=it++))
176
 
  {
177
 
    LOG_INFO* linfo;
178
 
    if ((linfo = tmp->current_linfo))
179
 
    {
180
 
      pthread_mutex_lock(&linfo->lock);
181
 
      /*
182
 
        Index file offset can be less that purge offset only if
183
 
        we just started reading the index file. In that case
184
 
        we have nothing to adjust
185
 
      */
186
 
      if (linfo->index_file_offset < purge_offset)
187
 
        linfo->fatal = (linfo->index_file_offset != 0);
188
 
      else
189
 
        linfo->index_file_offset -= purge_offset;
190
 
      pthread_mutex_unlock(&linfo->lock);
191
 
    }
192
 
  }
193
 
  pthread_mutex_unlock(&LOCK_thread_count);
194
 
}
195
 
 
196
 
 
197
 
bool log_in_use(const char* log_name)
198
 
{
199
 
  int log_name_len = strlen(log_name) + 1;
200
 
  THD *tmp;
201
 
  bool result = 0;
202
 
 
203
 
  pthread_mutex_lock(&LOCK_thread_count);
204
 
  I_List_iterator<THD> it(threads);
205
 
 
206
 
  while ((tmp=it++))
207
 
  {
208
 
    LOG_INFO* linfo;
209
 
    if ((linfo = tmp->current_linfo))
210
 
    {
211
 
      pthread_mutex_lock(&linfo->lock);
212
 
      result = !memcmp(log_name, linfo->log_file_name, log_name_len);
213
 
      pthread_mutex_unlock(&linfo->lock);
214
 
      if (result)
215
 
        break;
216
 
    }
217
 
  }
218
 
 
219
 
  pthread_mutex_unlock(&LOCK_thread_count);
220
 
  return result;
221
 
}
222
 
 
223
 
bool purge_error_message(THD* thd, int res)
224
 
{
225
 
  uint32_t errmsg= 0;
226
 
 
227
 
  switch (res)  {
228
 
  case 0: break;
229
 
  case LOG_INFO_EOF:    errmsg= ER_UNKNOWN_TARGET_BINLOG; break;
230
 
  case LOG_INFO_IO:     errmsg= ER_IO_ERR_LOG_INDEX_READ; break;
231
 
  case LOG_INFO_INVALID:errmsg= ER_BINLOG_PURGE_PROHIBITED; break;
232
 
  case LOG_INFO_SEEK:   errmsg= ER_FSEEK_FAIL; break;
233
 
  case LOG_INFO_MEM:    errmsg= ER_OUT_OF_RESOURCES; break;
234
 
  case LOG_INFO_FATAL:  errmsg= ER_BINLOG_PURGE_FATAL_ERR; break;
235
 
  case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break;
236
 
  case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break;
237
 
  default:              errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break;
238
 
  }
239
 
 
240
 
  if (errmsg)
241
 
  {
242
 
    my_message(errmsg, ER(errmsg), MYF(0));
243
 
    return true;
244
 
  }
245
 
  my_ok(thd);
246
 
  return false;
247
 
}
248
 
 
249
 
 
250
 
bool purge_master_logs(THD* thd, const char* to_log)
251
 
{
252
 
  char search_file_name[FN_REFLEN];
253
 
  if (!mysql_bin_log.is_open())
254
 
  {
255
 
    my_ok(thd);
256
 
    return false;
257
 
  }
258
 
 
259
 
  mysql_bin_log.make_log_name(search_file_name, to_log);
260
 
  return purge_error_message(thd,
261
 
                             mysql_bin_log.purge_logs(search_file_name, 0, 1,
262
 
                                                      1, NULL));
263
 
}
264
 
 
265
 
 
266
 
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
267
 
{
268
 
  if (!mysql_bin_log.is_open())
269
 
  {
270
 
    my_ok(thd);
271
 
    return 0;
272
 
  }
273
 
  return purge_error_message(thd,
274
 
                             mysql_bin_log.purge_logs_before_date(purge_time));
275
 
}
276
 
 
277
 
int test_for_non_eof_log_read_errors(int error, const char **errmsg)
278
 
{
279
 
  if (error == LOG_READ_EOF)
280
 
    return 0;
281
 
  my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
282
 
  switch (error) {
283
 
  case LOG_READ_BOGUS:
284
 
    *errmsg = "bogus data in log event";
285
 
    break;
286
 
  case LOG_READ_TOO_LARGE:
287
 
    *errmsg = "log event entry exceeded max_allowed_packet; \
288
 
Increase max_allowed_packet on master";
289
 
    break;
290
 
  case LOG_READ_IO:
291
 
    *errmsg = "I/O error reading log event";
292
 
    break;
293
 
  case LOG_READ_MEM:
294
 
    *errmsg = "memory allocation failed reading log event";
295
 
    break;
296
 
  case LOG_READ_TRUNC:
297
 
    *errmsg = "binlog truncated in the middle of event";
298
 
    break;
299
 
  default:
300
 
    *errmsg = "unknown error reading log event on the master";
301
 
    break;
302
 
  }
303
 
  return error;
304
 
}
305
 
 
306
 
 
307
 
/**
308
 
  An auxiliary function for calling in mysql_binlog_send
309
 
  to initialize the heartbeat timeout in waiting for a binlogged event.
310
 
 
311
 
  @param[in]    thd  THD to access a user variable
312
 
 
313
 
  @return        heartbeat period an uint64_t of nanoseconds
314
 
                 or zero if heartbeat was not demanded by slave
315
 
*/ 
316
 
static uint64_t get_heartbeat_period(THD * thd)
317
 
{
318
 
  bool null_value;
319
 
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
320
 
  user_var_entry *entry= 
321
 
    (user_var_entry*) hash_search(&thd->user_vars, (unsigned char*) name.str,
322
 
                                  name.length);
323
 
  return entry? entry->val_int(&null_value) : 0;
324
 
}
325
 
 
326
 
/*
327
 
  Function prepares and sends repliation heartbeat event.
328
 
 
329
 
  @param net                net object of THD
330
 
  @param packet             buffer to store the heartbeat instance
331
 
  @param event_coordinates  binlog file name and position of the last
332
 
                            real event master sent from binlog
333
 
 
334
 
  @note 
335
 
    Among three essential pieces of heartbeat data Log_event::when
336
 
    is computed locally.
337
 
    The  error to send is serious and should force terminating
338
 
    the dump thread.
339
 
*/
340
 
static int send_heartbeat_event(NET* net, String* packet,
341
 
                                const struct event_coordinates *coord)
342
 
{
343
 
  char header[LOG_EVENT_HEADER_LEN];
344
 
  /*
345
 
    'when' (the timestamp) is set to 0 so that slave could distinguish between
346
 
    real and fake Rotate events (if necessary)
347
 
  */
348
 
  memset(header, 0, 4);  // when
349
 
 
350
 
  header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
351
 
 
352
 
  char* p= coord->file_name + dirname_length(coord->file_name);
353
 
 
354
 
  uint32_t ident_len = strlen(p);
355
 
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN;
356
 
  int4store(header + SERVER_ID_OFFSET, server_id);
357
 
  int4store(header + EVENT_LEN_OFFSET, event_len);
358
 
  int2store(header + FLAGS_OFFSET, 0);
359
 
 
360
 
  int4store(header + LOG_POS_OFFSET, coord->pos);  // log_pos
361
 
 
362
 
  packet->append(header, sizeof(header));
363
 
  packet->append(p, ident_len);             // log_file_name
364
 
 
365
 
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) ||
366
 
      net_flush(net))
367
 
  {
368
 
    return(-1);
369
 
  }
370
 
  packet->set("\0", 1, &my_charset_bin);
371
 
  return(0);
372
 
}
373
 
 
374
 
/*
375
 
  TODO: Clean up loop to only have one call to send_file()
376
 
*/
377
 
 
378
 
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
379
 
                       uint16_t flags)
380
 
{
381
 
  LOG_INFO linfo;
382
 
  char *log_file_name = linfo.log_file_name;
383
 
  char search_file_name[FN_REFLEN], *name;
384
 
  IO_CACHE log;
385
 
  File file = -1;
386
 
  String* packet = &thd->packet;
387
 
  int error;
388
 
  const char *errmsg = "Unknown error";
389
 
  NET* net = &thd->net;
390
 
  pthread_mutex_t *log_lock;
391
 
  bool binlog_can_be_corrupted= false;
392
 
 
393
 
  memset(&log, 0, sizeof(log));
394
 
  /* 
395
 
     heartbeat_period from @master_heartbeat_period user variable
396
 
  */
397
 
  uint64_t heartbeat_period= get_heartbeat_period(thd);
398
 
  struct timespec heartbeat_buf;
399
 
  struct event_coordinates coord_buf;
400
 
  struct timespec *heartbeat_ts= NULL;
401
 
  struct event_coordinates *coord= NULL;
402
 
  if (heartbeat_period != 0L)
403
 
  {
404
 
    heartbeat_ts= &heartbeat_buf;
405
 
    set_timespec_nsec(*heartbeat_ts, 0);
406
 
    coord= &coord_buf;
407
 
    coord->file_name= log_file_name; // initialization basing on what slave remembers
408
 
    coord->pos= pos;
409
 
  }
410
 
 
411
 
  if (!mysql_bin_log.is_open())
412
 
  {
413
 
    errmsg = "Binary log is not open";
414
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
415
 
    goto err;
416
 
  }
417
 
  if (!server_id_supplied)
418
 
  {
419
 
    errmsg = "Misconfigured master - server id was not set";
420
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
421
 
    goto err;
422
 
  }
423
 
 
424
 
  name=search_file_name;
425
 
  if (log_ident[0])
426
 
    mysql_bin_log.make_log_name(search_file_name, log_ident);
427
 
  else
428
 
    name=0;                                     // Find first log
429
 
 
430
 
  linfo.index_file_offset = 0;
431
 
 
432
 
  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
433
 
  {
434
 
    errmsg = "Could not find first log file name in binary log index file";
435
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
436
 
    goto err;
437
 
  }
438
 
 
439
 
  pthread_mutex_lock(&LOCK_thread_count);
440
 
  thd->current_linfo = &linfo;
441
 
  pthread_mutex_unlock(&LOCK_thread_count);
442
 
 
443
 
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
444
 
  {
445
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
446
 
    goto err;
447
 
  }
448
 
  if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
449
 
  {
450
 
    errmsg= "Client requested master to start replication from \
451
 
impossible position";
452
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
453
 
    goto err;
454
 
  }
455
 
 
456
 
  /*
457
 
    We need to start a packet with something other than 255
458
 
    to distinguish it from error
459
 
  */
460
 
  packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
461
 
 
462
 
  /*
463
 
    Tell the client about the log name with a fake Rotate event;
464
 
    this is needed even if we also send a Format_description_log_event
465
 
    just after, because that event does not contain the binlog's name.
466
 
    Note that as this Rotate event is sent before
467
 
    Format_description_log_event, the slave cannot have any info to
468
 
    understand this event's format, so the header len of
469
 
    Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
470
 
    than other events except FORMAT_DESCRIPTION_EVENT).
471
 
    Before 4.0.14 we called fake_rotate_event below only if (pos ==
472
 
    BIN_LOG_HEADER_SIZE), because if this is false then the slave
473
 
    already knows the binlog's name.
474
 
    Since, we always call fake_rotate_event; if the slave already knew
475
 
    the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
476
 
    useless but does not harm much. It is nice for 3.23 (>=.58) slaves
477
 
    which test Rotate events to see if the master is 4.0 (then they
478
 
    choose to stop because they can't replicate 4.0); by always calling
479
 
    fake_rotate_event we are sure that 3.23.58 and newer will detect the
480
 
    problem as soon as replication starts (BUG#198).
481
 
    Always calling fake_rotate_event makes sending of normal
482
 
    (=from-binlog) Rotate events a priori unneeded, but it is not so
483
 
    simple: the 2 Rotate events are not equivalent, the normal one is
484
 
    before the Stop event, the fake one is after. If we don't send the
485
 
    normal one, then the Stop event will be interpreted (by existing 4.0
486
 
    slaves) as "the master stopped", which is wrong. So for safety,
487
 
    given that we want minimum modification of 4.0, we send the normal
488
 
    and fake Rotates.
489
 
  */
490
 
  if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
491
 
  {
492
 
    /*
493
 
       This error code is not perfect, as fake_rotate_event() does not
494
 
       read anything from the binlog; if it fails it's because of an
495
 
       error in my_net_write(), fortunately it will say so in errmsg.
496
 
    */
497
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
498
 
    goto err;
499
 
  }
500
 
  packet->set("\0", 1, &my_charset_bin);
501
 
  /*
502
 
    Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
503
 
    this larger than the corresponding packet (query) sent 
504
 
    from client to master.
505
 
  */
506
 
  thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
507
 
 
508
 
  /*
509
 
    We can set log_lock now, it does not move (it's a member of
510
 
    mysql_bin_log, and it's already inited, and it will be destroyed
511
 
    only at shutdown).
512
 
  */
513
 
  log_lock = mysql_bin_log.get_log_lock();
514
 
  if (pos > BIN_LOG_HEADER_SIZE)
515
 
  {
516
 
     /*
517
 
       Try to find a Format_description_log_event at the beginning of
518
 
       the binlog
519
 
     */
520
 
     if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
521
 
     {
522
 
       /*
523
 
         The packet has offsets equal to the normal offsets in a binlog
524
 
         event +1 (the first character is \0).
525
 
       */
526
 
       if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
527
 
       {
528
 
         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
529
 
                                       LOG_EVENT_BINLOG_IN_USE_F);
530
 
         (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
531
 
         /*
532
 
           mark that this event with "log_pos=0", so the slave
533
 
           should not increment master's binlog position
534
 
           (rli->group_master_log_pos)
535
 
         */
536
 
         int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
537
 
         /*
538
 
           if reconnect master sends FD event with `created' as 0
539
 
           to avoid destroying temp tables.
540
 
          */
541
 
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
542
 
                   ST_CREATED_OFFSET+1, (uint32_t) 0);
543
 
         /* send it */
544
 
         if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
545
 
         {
546
 
           errmsg = "Failed on my_net_write()";
547
 
           my_errno= ER_UNKNOWN_ERROR;
548
 
           goto err;
549
 
         }
550
 
 
551
 
         /*
552
 
           No need to save this event. We are only doing simple reads
553
 
           (no real parsing of the events) so we don't need it. And so
554
 
           we don't need the artificial Format_description_log_event of
555
 
           3.23&4.x.
556
 
         */
557
 
       }
558
 
     }
559
 
     else
560
 
     {
561
 
       if (test_for_non_eof_log_read_errors(error, &errmsg))
562
 
         goto err;
563
 
       /*
564
 
         It's EOF, nothing to do, go on reading next events, the
565
 
         Format_description_log_event will be found naturally if it is written.
566
 
       */
567
 
     }
568
 
     /* reset the packet as we wrote to it in any case */
569
 
     packet->set("\0", 1, &my_charset_bin);
570
 
  } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
571
 
  else
572
 
  {
573
 
    /* The Format_description_log_event event will be found naturally. */
574
 
  }
575
 
 
576
 
  /* seek to the requested position, to start the requested dump */
577
 
  my_b_seek(&log, pos);                 // Seek will done on next read
578
 
 
579
 
  while (!net->error && net->vio != 0 && !thd->killed)
580
 
  {
581
 
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
582
 
    {
583
 
      /*
584
 
        log's filename does not change while it's active
585
 
      */
586
 
      if (coord)
587
 
        coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
588
 
 
589
 
      if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
590
 
      {
591
 
        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
592
 
                                      LOG_EVENT_BINLOG_IN_USE_F);
593
 
        (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
594
 
      }
595
 
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
596
 
        binlog_can_be_corrupted= false;
597
 
 
598
 
      if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
599
 
      {
600
 
        errmsg = "Failed on my_net_write()";
601
 
        my_errno= ER_UNKNOWN_ERROR;
602
 
        goto err;
603
 
      }
604
 
 
605
 
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
606
 
      {
607
 
        if (send_file(thd))
608
 
        {
609
 
          errmsg = "failed in send_file()";
610
 
          my_errno= ER_UNKNOWN_ERROR;
611
 
          goto err;
612
 
        }
613
 
      }
614
 
      packet->set("\0", 1, &my_charset_bin);
615
 
    }
616
 
 
617
 
    /*
618
 
      here we were reading binlog that was not closed properly (as a result
619
 
      of a crash ?). treat any corruption as EOF
620
 
    */
621
 
    if (binlog_can_be_corrupted && error != LOG_READ_MEM)
622
 
      error=LOG_READ_EOF;
623
 
    /*
624
 
      TODO: now that we are logging the offset, check to make sure
625
 
      the recorded offset and the actual match.
626
 
      Guilhem 2003-06: this is not true if this master is a slave
627
 
      <4.0.15 running with --log-slave-updates, because then log_pos may
628
 
      be the offset in the-master-of-this-master's binlog.
629
 
    */
630
 
    if (test_for_non_eof_log_read_errors(error, &errmsg))
631
 
      goto err;
632
 
 
633
 
    if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
634
 
        mysql_bin_log.is_active(log_file_name))
635
 
    {
636
 
      /*
637
 
        Block until there is more data in the log
638
 
      */
639
 
      if (net_flush(net))
640
 
      {
641
 
        errmsg = "failed on net_flush()";
642
 
        my_errno= ER_UNKNOWN_ERROR;
643
 
        goto err;
644
 
      }
645
 
 
646
 
      /*
647
 
        We may have missed the update broadcast from the log
648
 
        that has just happened, let's try to catch it if it did.
649
 
        If we did not miss anything, we just wait for other threads
650
 
        to signal us.
651
 
      */
652
 
      {
653
 
        log.error=0;
654
 
        bool read_packet = 0, fatal_error = 0;
655
 
 
656
 
        /*
657
 
          No one will update the log while we are reading
658
 
          now, but we'll be quick and just read one record
659
 
 
660
 
          TODO:
661
 
          Add an counter that is incremented for each time we update the
662
 
          binary log.  We can avoid the following read if the counter
663
 
          has not been updated since last read.
664
 
        */
665
 
 
666
 
        pthread_mutex_lock(log_lock);
667
 
        switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
668
 
        case 0:
669
 
          /* we read successfully, so we'll need to send it to the slave */
670
 
          pthread_mutex_unlock(log_lock);
671
 
          read_packet = 1;
672
 
          if (coord)
673
 
            coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
674
 
          break;
675
 
 
676
 
        case LOG_READ_EOF:
677
 
        {
678
 
          int ret;
679
 
          if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
680
 
          {
681
 
            pthread_mutex_unlock(log_lock);
682
 
            goto end;
683
 
          }
684
 
 
685
 
          do 
686
 
          {
687
 
            if (coord)
688
 
            {
689
 
              assert(heartbeat_ts && heartbeat_period != 0L);
690
 
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
691
 
            }
692
 
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
693
 
            assert(ret == 0 || (heartbeat_period != 0L && coord != NULL));
694
 
            if (ret == ETIMEDOUT || ret == ETIME)
695
 
            {
696
 
              if (send_heartbeat_event(net, packet, coord))
697
 
              {
698
 
                errmsg = "Failed on my_net_write()";
699
 
                my_errno= ER_UNKNOWN_ERROR;
700
 
                pthread_mutex_unlock(log_lock);
701
 
                goto err;
702
 
              }
703
 
            }
704
 
            else
705
 
            {
706
 
              assert(ret == 0);
707
 
            }
708
 
          } while (ret != 0 && coord != NULL && !thd->killed);
709
 
          pthread_mutex_unlock(log_lock);
710
 
        }    
711
 
        break;
712
 
            
713
 
        default:
714
 
          pthread_mutex_unlock(log_lock);
715
 
          fatal_error = 1;
716
 
          break;
717
 
        }
718
 
 
719
 
        if (read_packet)
720
 
        {
721
 
          thd_proc_info(thd, "Sending binlog event to slave");
722
 
          if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) )
723
 
          {
724
 
            errmsg = "Failed on my_net_write()";
725
 
            my_errno= ER_UNKNOWN_ERROR;
726
 
            goto err;
727
 
          }
728
 
 
729
 
          if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
730
 
          {
731
 
            if (send_file(thd))
732
 
            {
733
 
              errmsg = "failed in send_file()";
734
 
              my_errno= ER_UNKNOWN_ERROR;
735
 
              goto err;
736
 
            }
737
 
          }
738
 
          packet->set("\0", 1, &my_charset_bin);
739
 
          /*
740
 
            No need to net_flush because we will get to flush later when
741
 
            we hit EOF pretty quick
742
 
          */
743
 
        }
744
 
 
745
 
        if (fatal_error)
746
 
        {
747
 
          errmsg = "error reading log entry";
748
 
          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
749
 
          goto err;
750
 
        }
751
 
        log.error=0;
752
 
      }
753
 
    }
754
 
    else
755
 
    {
756
 
      bool loop_breaker = 0;
757
 
      /* need this to break out of the for loop from switch */
758
 
 
759
 
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
760
 
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
761
 
      case LOG_INFO_EOF:
762
 
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
763
 
        break;
764
 
      case 0:
765
 
        break;
766
 
      default:
767
 
        errmsg = "could not find next log";
768
 
        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
769
 
        goto err;
770
 
      }
771
 
 
772
 
      if (loop_breaker)
773
 
        break;
774
 
 
775
 
      end_io_cache(&log);
776
 
      (void) my_close(file, MYF(MY_WME));
777
 
 
778
 
      /*
779
 
        Call fake_rotate_event() in case the previous log (the one which
780
 
        we have just finished reading) did not contain a Rotate event
781
 
        (for example (I don't know any other example) the previous log
782
 
        was the last one before the master was shutdown & restarted).
783
 
        This way we tell the slave about the new log's name and
784
 
        position.  If the binlog is 5.0, the next event we are going to
785
 
        read and send is Format_description_log_event.
786
 
      */
787
 
      if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
788
 
          fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
789
 
                            &errmsg))
790
 
      {
791
 
        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
792
 
        goto err;
793
 
      }
794
 
 
795
 
      packet->length(0);
796
 
      packet->append('\0');
797
 
      if (coord)
798
 
        coord->file_name= log_file_name; // reset to the next
799
 
    }
800
 
  }
801
 
 
802
 
end:
803
 
  end_io_cache(&log);
804
 
  (void)my_close(file, MYF(MY_WME));
805
 
 
806
 
  my_eof(thd);
807
 
  thd_proc_info(thd, "Waiting to finalize termination");
808
 
  pthread_mutex_lock(&LOCK_thread_count);
809
 
  thd->current_linfo = 0;
810
 
  pthread_mutex_unlock(&LOCK_thread_count);
811
 
  return;
812
 
 
813
 
err:
814
 
  thd_proc_info(thd, "Waiting to finalize termination");
815
 
  end_io_cache(&log);
816
 
  /*
817
 
    Exclude  iteration through thread list
818
 
    this is needed for purge_logs() - it will iterate through
819
 
    thread list and update thd->current_linfo->index_file_offset
820
 
    this mutex will make sure that it never tried to update our linfo
821
 
    after we return from this stack frame
822
 
  */
823
 
  pthread_mutex_lock(&LOCK_thread_count);
824
 
  thd->current_linfo = 0;
825
 
  pthread_mutex_unlock(&LOCK_thread_count);
826
 
  if (file >= 0)
827
 
    (void) my_close(file, MYF(MY_WME));
828
 
 
829
 
  my_message(my_errno, errmsg, MYF(0));
830
 
  return;
831
 
}
832
 
 
833
 
int start_slave(THD* thd , Master_info* mi,  bool net_report)
834
 
{
835
 
  int slave_errno= 0;
836
 
  int thread_mask;
837
 
 
838
 
  lock_slave_threads(mi);  // this allows us to cleanly read slave_running
839
 
  // Get a mask of _stopped_ threads
840
 
  init_thread_mask(&thread_mask,mi,1 /* inverse */);
841
 
  /*
842
 
    Below we will start all stopped threads.  But if the user wants to
843
 
    start only one thread, do as if the other thread was running (as we
844
 
    don't wan't to touch the other thread), so set the bit to 0 for the
845
 
    other thread
846
 
  */
847
 
  if (thd->lex->slave_thd_opt)
848
 
    thread_mask&= thd->lex->slave_thd_opt;
849
 
  if (thread_mask) //some threads are stopped, start them
850
 
  {
851
 
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
852
 
                         thread_mask))
853
 
      slave_errno=ER_MASTER_INFO;
854
 
    else if (server_id_supplied && *mi->host)
855
 
    {
856
 
      /*
857
 
        If we will start SQL thread we will care about UNTIL options If
858
 
        not and they are specified we will ignore them and warn user
859
 
        about this fact.
860
 
      */
861
 
      if (thread_mask & SLAVE_SQL)
862
 
      {
863
 
        pthread_mutex_lock(&mi->rli.data_lock);
864
 
 
865
 
        if (thd->lex->mi.pos)
866
 
        {
867
 
          mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
868
 
          mi->rli.until_log_pos= thd->lex->mi.pos;
869
 
          /*
870
 
             We don't check thd->lex->mi.log_file_name for NULL here
871
 
             since it is checked in sql_yacc.yy
872
 
          */
873
 
          strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
874
 
                  sizeof(mi->rli.until_log_name)-1);
875
 
        }
876
 
        else if (thd->lex->mi.relay_log_pos)
877
 
        {
878
 
          mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
879
 
          mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
880
 
          strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
881
 
                  sizeof(mi->rli.until_log_name)-1);
882
 
        }
883
 
        else
884
 
          mi->rli.clear_until_condition();
885
 
 
886
 
        if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
887
 
        {
888
 
          /* Preparing members for effective until condition checking */
889
 
          const char *p= fn_ext(mi->rli.until_log_name);
890
 
          char *p_end;
891
 
          if (*p)
892
 
          {
893
 
            //p points to '.'
894
 
            mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
895
 
            /*
896
 
              p_end points to the first invalid character. If it equals
897
 
              to p, no digits were found, error. If it contains '\0' it
898
 
              means  conversion went ok.
899
 
            */
900
 
            if (p_end==p || *p_end)
901
 
              slave_errno=ER_BAD_SLAVE_UNTIL_COND;
902
 
          }
903
 
          else
904
 
            slave_errno=ER_BAD_SLAVE_UNTIL_COND;
905
 
 
906
 
          /* mark the cached result of the UNTIL comparison as "undefined" */
907
 
          mi->rli.until_log_names_cmp_result=
908
 
            Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
909
 
 
910
 
          /* Issuing warning then started without --skip-slave-start */
911
 
          if (!opt_skip_slave_start)
912
 
            push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
913
 
                         ER_MISSING_SKIP_SLAVE,
914
 
                         ER(ER_MISSING_SKIP_SLAVE));
915
 
        }
916
 
 
917
 
        pthread_mutex_unlock(&mi->rli.data_lock);
918
 
      }
919
 
      else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
920
 
        push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
921
 
                     ER(ER_UNTIL_COND_IGNORED));
922
 
 
923
 
      if (!slave_errno)
924
 
        slave_errno = start_slave_threads(0 /*no mutex */,
925
 
                                        1 /* wait for start */,
926
 
                                        mi,
927
 
                                        master_info_file,relay_log_info_file,
928
 
                                        thread_mask);
929
 
    }
930
 
    else
931
 
      slave_errno = ER_BAD_SLAVE;
932
 
  }
933
 
  else
934
 
  {
935
 
    /* no error if all threads are already started, only a warning */
936
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
937
 
                 ER(ER_SLAVE_WAS_RUNNING));
938
 
  }
939
 
 
940
 
  unlock_slave_threads(mi);
941
 
 
942
 
  if (slave_errno)
943
 
  {
944
 
    if (net_report)
945
 
      my_message(slave_errno, ER(slave_errno), MYF(0));
946
 
    return(1);
947
 
  }
948
 
  else if (net_report)
949
 
    my_ok(thd);
950
 
 
951
 
  return(0);
952
 
}
953
 
 
954
 
 
955
 
int stop_slave(THD* thd, Master_info* mi, bool net_report )
956
 
{
957
 
  int slave_errno;
958
 
  if (!thd)
959
 
    thd = current_thd;
960
 
 
961
 
  thd_proc_info(thd, "Killing slave");
962
 
  int thread_mask;
963
 
  lock_slave_threads(mi);
964
 
  // Get a mask of _running_ threads
965
 
  init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
966
 
  /*
967
 
    Below we will stop all running threads.
968
 
    But if the user wants to stop only one thread, do as if the other thread
969
 
    was stopped (as we don't wan't to touch the other thread), so set the
970
 
    bit to 0 for the other thread
971
 
  */
972
 
  if (thd->lex->slave_thd_opt)
973
 
    thread_mask &= thd->lex->slave_thd_opt;
974
 
 
975
 
  if (thread_mask)
976
 
  {
977
 
    slave_errno= terminate_slave_threads(mi,thread_mask,
978
 
                                         1 /*skip lock */);
979
 
  }
980
 
  else
981
 
  {
982
 
    //no error if both threads are already stopped, only a warning
983
 
    slave_errno= 0;
984
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
985
 
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
986
 
  }
987
 
  unlock_slave_threads(mi);
988
 
  thd_proc_info(thd, 0);
989
 
 
990
 
  if (slave_errno)
991
 
  {
992
 
    if (net_report)
993
 
      my_message(slave_errno, ER(slave_errno), MYF(0));
994
 
    return(1);
995
 
  }
996
 
  else if (net_report)
997
 
    my_ok(thd);
998
 
 
999
 
  return(0);
1000
 
}
1001
 
 
1002
 
 
1003
 
/*
1004
 
  Remove all relay logs and start replication from the start
1005
 
 
1006
 
  SYNOPSIS
1007
 
    reset_slave()
1008
 
    thd                 Thread handler
1009
 
    mi                  Master info for the slave
1010
 
 
1011
 
  RETURN
1012
 
    0   ok
1013
 
    1   error
1014
 
*/
1015
 
 
1016
 
 
1017
 
int reset_slave(THD *thd, Master_info* mi)
1018
 
{
1019
 
  struct stat stat_area;
1020
 
  char fname[FN_REFLEN];
1021
 
  int thread_mask= 0, error= 0;
1022
 
  uint32_t sql_errno=0;
1023
 
  const char* errmsg=0;
1024
 
 
1025
 
  lock_slave_threads(mi);
1026
 
  init_thread_mask(&thread_mask,mi,0 /* not inverse */);
1027
 
  if (thread_mask) // We refuse if any slave thread is running
1028
 
  {
1029
 
    sql_errno= ER_SLAVE_MUST_STOP;
1030
 
    error=1;
1031
 
    goto err;
1032
 
  }
1033
 
 
1034
 
  // delete relay logs, clear relay log coordinates
1035
 
  if ((error= purge_relay_logs(&mi->rli, thd,
1036
 
                               1 /* just reset */,
1037
 
                               &errmsg)))
1038
 
    goto err;
1039
 
 
1040
 
  /* Clear master's log coordinates */
1041
 
  init_master_log_pos(mi);
1042
 
  /*
1043
 
     Reset errors (the idea is that we forget about the
1044
 
     old master).
1045
 
  */
1046
 
  mi->rli.clear_error();
1047
 
  mi->rli.clear_until_condition();
1048
 
 
1049
 
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1050
 
  end_master_info(mi);
1051
 
  // and delete these two files
1052
 
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
1053
 
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1054
 
  {
1055
 
    error=1;
1056
 
    goto err;
1057
 
  }
1058
 
  // delete relay_log_info_file
1059
 
  fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
1060
 
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1061
 
  {
1062
 
    error=1;
1063
 
    goto err;
1064
 
  }
1065
 
 
1066
 
err:
1067
 
  unlock_slave_threads(mi);
1068
 
  if (error)
1069
 
    my_error(sql_errno, MYF(0), errmsg);
1070
 
  return(error);
1071
 
}
1072
 
 
1073
 
/*
1074
 
 
1075
 
  Kill all Binlog_dump threads which previously talked to the same slave
1076
 
  ("same" means with the same server id). Indeed, if the slave stops, if the
1077
 
  Binlog_dump thread is waiting (pthread_cond_wait) for binlog update, then it
1078
 
  will keep existing until a query is written to the binlog. If the master is
1079
 
  idle, then this could last long, and if the slave reconnects, we could have 2
1080
 
  Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
1081
 
  binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
1082
 
  the master kills any existing thread with the slave's server id (if this id is
1083
 
  not zero; it will be true for real slaves, but false for mysqlbinlog when it
1084
 
  sends COM_BINLOG_DUMP to get a remote binlog dump).
1085
 
 
1086
 
  SYNOPSIS
1087
 
    kill_zombie_dump_threads()
1088
 
    slave_server_id     the slave's server id
1089
 
 
1090
 
*/
1091
 
 
1092
 
 
1093
 
void kill_zombie_dump_threads(uint32_t slave_server_id)
1094
 
{
1095
 
  pthread_mutex_lock(&LOCK_thread_count);
1096
 
  I_List_iterator<THD> it(threads);
1097
 
  THD *tmp;
1098
 
 
1099
 
  while ((tmp=it++))
1100
 
  {
1101
 
    if (tmp->command == COM_BINLOG_DUMP &&
1102
 
       tmp->server_id == slave_server_id)
1103
 
    {
1104
 
      pthread_mutex_lock(&tmp->LOCK_delete);    // Lock from delete
1105
 
      break;
1106
 
    }
1107
 
  }
1108
 
  pthread_mutex_unlock(&LOCK_thread_count);
1109
 
  if (tmp)
1110
 
  {
1111
 
    /*
1112
 
      Here we do not call kill_one_thread() as
1113
 
      it will be slow because it will iterate through the list
1114
 
      again. We just to do kill the thread ourselves.
1115
 
    */
1116
 
    tmp->awake(THD::KILL_QUERY);
1117
 
    pthread_mutex_unlock(&tmp->LOCK_delete);
1118
 
  }
1119
 
}
1120
 
 
1121
 
 
1122
 
bool change_master(THD* thd, Master_info* mi)
1123
 
{
1124
 
  int thread_mask;
1125
 
  const char* errmsg= 0;
1126
 
  bool need_relay_log_purge= 1;
1127
 
 
1128
 
  lock_slave_threads(mi);
1129
 
  init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
1130
 
  if (thread_mask) // We refuse if any slave thread is running
1131
 
  {
1132
 
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1133
 
    unlock_slave_threads(mi);
1134
 
    return(true);
1135
 
  }
1136
 
 
1137
 
  thd_proc_info(thd, "Changing master");
1138
 
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1139
 
  // TODO: see if needs re-write
1140
 
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
1141
 
                       thread_mask))
1142
 
  {
1143
 
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1144
 
    unlock_slave_threads(mi);
1145
 
    return(true);
1146
 
  }
1147
 
 
1148
 
  /*
1149
 
    Data lock not needed since we have already stopped the running threads,
1150
 
    and we have the hold on the run locks which will keep all threads that
1151
 
    could possibly modify the data structures from running
1152
 
  */
1153
 
 
1154
 
  /*
1155
 
    If the user specified host or port without binlog or position,
1156
 
    reset binlog's name to FIRST and position to 4.
1157
 
  */
1158
 
 
1159
 
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1160
 
  {
1161
 
    mi->master_log_name[0] = 0;
1162
 
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
1163
 
  }
1164
 
 
1165
 
  if (lex_mi->log_file_name)
1166
 
    strmake(mi->master_log_name, lex_mi->log_file_name,
1167
 
            sizeof(mi->master_log_name)-1);
1168
 
  if (lex_mi->pos)
1169
 
  {
1170
 
    mi->master_log_pos= lex_mi->pos;
1171
 
  }
1172
 
 
1173
 
  if (lex_mi->host)
1174
 
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1175
 
  if (lex_mi->user)
1176
 
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
1177
 
  if (lex_mi->password)
1178
 
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
1179
 
  if (lex_mi->port)
1180
 
    mi->port = lex_mi->port;
1181
 
  if (lex_mi->connect_retry)
1182
 
    mi->connect_retry = lex_mi->connect_retry;
1183
 
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1184
 
    mi->heartbeat_period = lex_mi->heartbeat_period;
1185
 
  else
1186
 
    mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
1187
 
                                      (slave_net_timeout/2.0));
1188
 
  mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
1189
 
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1190
 
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
1191
 
 
1192
 
  if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1193
 
    mi->ssl_verify_server_cert=
1194
 
      (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
1195
 
 
1196
 
  if (lex_mi->ssl_ca)
1197
 
    strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
1198
 
  if (lex_mi->ssl_capath)
1199
 
    strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
1200
 
  if (lex_mi->ssl_cert)
1201
 
    strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
1202
 
  if (lex_mi->ssl_cipher)
1203
 
    strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
1204
 
  if (lex_mi->ssl_key)
1205
 
    strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
1206
 
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
1207
 
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
1208
 
      lex_mi->ssl_verify_server_cert )
1209
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1210
 
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1211
 
 
1212
 
  if (lex_mi->relay_log_name)
1213
 
  {
1214
 
    need_relay_log_purge= 0;
1215
 
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
1216
 
            sizeof(mi->rli.group_relay_log_name)-1);
1217
 
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
1218
 
            sizeof(mi->rli.event_relay_log_name)-1);
1219
 
  }
1220
 
 
1221
 
  if (lex_mi->relay_log_pos)
1222
 
  {
1223
 
    need_relay_log_purge= 0;
1224
 
    mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
1225
 
  }
1226
 
 
1227
 
  /*
1228
 
    If user did specify neither host nor port nor any log name nor any log
1229
 
    pos, i.e. he specified only user/password/master_connect_retry, he probably
1230
 
    wants replication to resume from where it had left, i.e. from the
1231
 
    coordinates of the **SQL** thread (imagine the case where the I/O is ahead
1232
 
    of the SQL; restarting from the coordinates of the I/O would lose some
1233
 
    events which is probably unwanted when you are just doing minor changes
1234
 
    like changing master_connect_retry).
1235
 
    A side-effect is that if only the I/O thread was started, this thread may
1236
 
    restart from ''/4 after the CHANGE MASTER. That's a minor problem (it is a
1237
 
    much more unlikely situation than the one we are fixing here).
1238
 
    Note: coordinates of the SQL thread must be read here, before the
1239
 
    'if (need_relay_log_purge)' block which resets them.
1240
 
  */
1241
 
  if (!lex_mi->host && !lex_mi->port &&
1242
 
      !lex_mi->log_file_name && !lex_mi->pos &&
1243
 
      need_relay_log_purge)
1244
 
   {
1245
 
     /*
1246
 
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1247
 
       not initialized), so we use a cmax().
1248
 
       What happens to mi->rli.master_log_pos during the initialization stages
1249
 
       of replication is not 100% clear, so we guard against problems using
1250
 
       cmax().
1251
 
      */
1252
 
     mi->master_log_pos = ((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1253
 
                           ? BIN_LOG_HEADER_SIZE
1254
 
                           : mi->rli.group_master_log_pos);
1255
 
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
1256
 
             sizeof(mi->master_log_name)-1);
1257
 
  }
1258
 
  /*
1259
 
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1260
 
    a slave before).
1261
 
  */
1262
 
  if (flush_master_info(mi, 0))
1263
 
  {
1264
 
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1265
 
    unlock_slave_threads(mi);
1266
 
    return(true);
1267
 
  }
1268
 
  if (need_relay_log_purge)
1269
 
  {
1270
 
    relay_log_purge= 1;
1271
 
    thd_proc_info(thd, "Purging old relay logs");
1272
 
    if (purge_relay_logs(&mi->rli, thd,
1273
 
                         0 /* not only reset, but also reinit */,
1274
 
                         &errmsg))
1275
 
    {
1276
 
      my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
1277
 
      unlock_slave_threads(mi);
1278
 
      return(true);
1279
 
    }
1280
 
  }
1281
 
  else
1282
 
  {
1283
 
    const char* msg;
1284
 
    relay_log_purge= 0;
1285
 
    /* Relay log is already initialized */
1286
 
    if (init_relay_log_pos(&mi->rli,
1287
 
                           mi->rli.group_relay_log_name,
1288
 
                           mi->rli.group_relay_log_pos,
1289
 
                           0 /*no data lock*/,
1290
 
                           &msg, 0))
1291
 
    {
1292
 
      my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
1293
 
      unlock_slave_threads(mi);
1294
 
      return(true);
1295
 
    }
1296
 
  }
1297
 
  /*
1298
 
    Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block,
1299
 
    so restore them to good values. If we left them to ''/0, that would work;
1300
 
    but that would fail in the case of 2 successive CHANGE MASTER (without a
1301
 
    START SLAVE in between): because first one would set the coords in mi to
1302
 
    the good values of those in rli, the set those in rli to ''/0, then
1303
 
    second CHANGE MASTER would set the coords in mi to those of rli, i.e. to
1304
 
    ''/0: we have lost all copies of the original good coordinates.
1305
 
    That's why we always save good coords in rli.
1306
 
  */
1307
 
  mi->rli.group_master_log_pos= mi->master_log_pos;
1308
 
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
1309
 
          sizeof(mi->rli.group_master_log_name)-1);
1310
 
 
1311
 
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
1312
 
    mi->rli.group_master_log_pos=0;
1313
 
 
1314
 
  pthread_mutex_lock(&mi->rli.data_lock);
1315
 
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1316
 
  /* Clear the errors, for a clean start */
1317
 
  mi->rli.clear_error();
1318
 
  mi->rli.clear_until_condition();
1319
 
  /*
1320
 
    If we don't write new coordinates to disk now, then old will remain in
1321
 
    relay-log.info until START SLAVE is issued; but if mysqld is shutdown
1322
 
    before START SLAVE, then old will remain in relay-log.info, and will be the
1323
 
    in-memory value at restart (thus causing errors, as the old relay log does
1324
 
    not exist anymore).
1325
 
  */
1326
 
  flush_relay_log_info(&mi->rli);
1327
 
  pthread_cond_broadcast(&mi->data_cond);
1328
 
  pthread_mutex_unlock(&mi->rli.data_lock);
1329
 
 
1330
 
  unlock_slave_threads(mi);
1331
 
  thd_proc_info(thd, 0);
1332
 
  my_ok(thd);
1333
 
  return(false);
1334
 
}
1335
 
 
1336
 
int reset_master(THD* thd)
1337
 
{
1338
 
  if (!mysql_bin_log.is_open())
1339
 
  {
1340
 
    my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
1341
 
               ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
1342
 
    return 1;
1343
 
  }
1344
 
  return mysql_bin_log.reset_logs(thd);
1345
 
}
1346
 
 
1347
 
int cmp_master_pos(const char* log_file_name1, uint64_t log_pos1,
1348
 
                   const char* log_file_name2, uint64_t log_pos2)
1349
 
{
1350
 
  int res;
1351
 
  uint32_t log_file_name1_len=  strlen(log_file_name1);
1352
 
  uint32_t log_file_name2_len=  strlen(log_file_name2);
1353
 
 
1354
 
  //  We assume that both log names match up to '.'
1355
 
  if (log_file_name1_len == log_file_name2_len)
1356
 
  {
1357
 
    if ((res= strcmp(log_file_name1, log_file_name2)))
1358
 
      return res;
1359
 
    return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
1360
 
  }
1361
 
  return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
1362
 
}
1363
 
 
1364
 
 
1365
 
bool mysql_show_binlog_events(THD* thd)
1366
 
{
1367
 
  Protocol *protocol= thd->protocol;
1368
 
  List<Item> field_list;
1369
 
  const char *errmsg= 0;
1370
 
  bool ret= true;
1371
 
  IO_CACHE log;
1372
 
  File file= -1;
1373
 
 
1374
 
  Log_event::init_show_field_list(&field_list);
1375
 
  if (protocol->send_fields(&field_list,
1376
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1377
 
    return(true);
1378
 
 
1379
 
  Format_description_log_event *description_event= new
1380
 
    Format_description_log_event(3); /* MySQL 4.0 by default */
1381
 
 
1382
 
  if (mysql_bin_log.is_open())
1383
 
  {
1384
 
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1385
 
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
1386
 
    ha_rows event_count, limit_start, limit_end;
1387
 
    my_off_t pos = cmax((uint64_t)BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1388
 
    char search_file_name[FN_REFLEN], *name;
1389
 
    const char *log_file_name = lex_mi->log_file_name;
1390
 
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1391
 
    LOG_INFO linfo;
1392
 
    Log_event* ev;
1393
 
 
1394
 
    unit->set_limit(thd->lex->current_select);
1395
 
    limit_start= unit->offset_limit_cnt;
1396
 
    limit_end= unit->select_limit_cnt;
1397
 
 
1398
 
    name= search_file_name;
1399
 
    if (log_file_name)
1400
 
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
1401
 
    else
1402
 
      name=0;                                   // Find first log
1403
 
 
1404
 
    linfo.index_file_offset = 0;
1405
 
 
1406
 
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1407
 
    {
1408
 
      errmsg = "Could not find target log";
1409
 
      goto err;
1410
 
    }
1411
 
 
1412
 
    pthread_mutex_lock(&LOCK_thread_count);
1413
 
    thd->current_linfo = &linfo;
1414
 
    pthread_mutex_unlock(&LOCK_thread_count);
1415
 
 
1416
 
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
1417
 
      goto err;
1418
 
 
1419
 
    /*
1420
 
      to account binlog event header size
1421
 
    */
1422
 
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
1423
 
 
1424
 
    pthread_mutex_lock(log_lock);
1425
 
 
1426
 
    /*
1427
 
      open_binlog() sought to position 4.
1428
 
      Read the first event in case it's a Format_description_log_event, to
1429
 
      know the format. If there's no such event, we are 3.23 or 4.x. This
1430
 
      code, like before, can't read 3.23 binlogs.
1431
 
      This code will fail on a mixed relay log (one which has Format_desc then
1432
 
      Rotate then Format_desc).
1433
 
    */
1434
 
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
1435
 
    if (ev)
1436
 
    {
1437
 
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
1438
 
      {
1439
 
        delete description_event;
1440
 
        description_event= (Format_description_log_event*) ev;
1441
 
      }
1442
 
      else
1443
 
        delete ev;
1444
 
    }
1445
 
 
1446
 
    my_b_seek(&log, pos);
1447
 
 
1448
 
    if (!description_event->is_valid())
1449
 
    {
1450
 
      errmsg="Invalid Format_description event; could be out of memory";
1451
 
      goto err;
1452
 
    }
1453
 
 
1454
 
    for (event_count = 0;
1455
 
         (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
1456
 
                                         description_event)); )
1457
 
    {
1458
 
      if (event_count >= limit_start &&
1459
 
          ev->net_send(protocol, linfo.log_file_name, pos))
1460
 
      {
1461
 
        errmsg = "Net error";
1462
 
        delete ev;
1463
 
        pthread_mutex_unlock(log_lock);
1464
 
        goto err;
1465
 
      }
1466
 
 
1467
 
      pos = my_b_tell(&log);
1468
 
      delete ev;
1469
 
 
1470
 
      if (++event_count >= limit_end)
1471
 
        break;
1472
 
    }
1473
 
 
1474
 
    if (event_count < limit_end && log.error)
1475
 
    {
1476
 
      errmsg = "Wrong offset or I/O error";
1477
 
      pthread_mutex_unlock(log_lock);
1478
 
      goto err;
1479
 
    }
1480
 
 
1481
 
    pthread_mutex_unlock(log_lock);
1482
 
  }
1483
 
 
1484
 
  ret= false;
1485
 
 
1486
 
err:
1487
 
  delete description_event;
1488
 
  if (file >= 0)
1489
 
  {
1490
 
    end_io_cache(&log);
1491
 
    (void) my_close(file, MYF(MY_WME));
1492
 
  }
1493
 
 
1494
 
  if (errmsg)
1495
 
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
1496
 
             "SHOW BINLOG EVENTS", errmsg);
1497
 
  else
1498
 
    my_eof(thd);
1499
 
 
1500
 
  pthread_mutex_lock(&LOCK_thread_count);
1501
 
  thd->current_linfo = 0;
1502
 
  pthread_mutex_unlock(&LOCK_thread_count);
1503
 
  return(ret);
1504
 
}
1505
 
 
1506
 
 
1507
 
bool show_binlog_info(THD* thd)
1508
 
{
1509
 
  Protocol *protocol= thd->protocol;
1510
 
  List<Item> field_list;
1511
 
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1512
 
  field_list.push_back(new Item_return_int("Position",20,
1513
 
                                           DRIZZLE_TYPE_LONGLONG));
1514
 
  field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
1515
 
  field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
1516
 
 
1517
 
  if (protocol->send_fields(&field_list,
1518
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1519
 
    return(true);
1520
 
  protocol->prepare_for_resend();
1521
 
 
1522
 
  if (mysql_bin_log.is_open())
1523
 
  {
1524
 
    LOG_INFO li;
1525
 
    mysql_bin_log.get_current_log(&li);
1526
 
    int dir_len = dirname_length(li.log_file_name);
1527
 
    protocol->store(li.log_file_name + dir_len, &my_charset_bin);
1528
 
    protocol->store((uint64_t) li.pos);
1529
 
    protocol->store(binlog_filter->get_do_db());
1530
 
    protocol->store(binlog_filter->get_ignore_db());
1531
 
    if (protocol->write())
1532
 
      return(true);
1533
 
  }
1534
 
  my_eof(thd);
1535
 
  return(false);
1536
 
}
1537
 
 
1538
 
 
1539
 
/*
1540
 
  Send a list of all binary logs to client
1541
 
 
1542
 
  SYNOPSIS
1543
 
    show_binlogs()
1544
 
    thd         Thread specific variable
1545
 
 
1546
 
  RETURN VALUES
1547
 
    false OK
1548
 
    true  error
1549
 
*/
1550
 
 
1551
 
bool show_binlogs(THD* thd)
1552
 
{
1553
 
  IO_CACHE *index_file;
1554
 
  LOG_INFO cur;
1555
 
  File file;
1556
 
  char fname[FN_REFLEN];
1557
 
  List<Item> field_list;
1558
 
  uint32_t length;
1559
 
  int cur_dir_len;
1560
 
  Protocol *protocol= thd->protocol;
1561
 
 
1562
 
  if (!mysql_bin_log.is_open())
1563
 
  {
1564
 
    my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0));
1565
 
    return 1;
1566
 
  }
1567
 
 
1568
 
  field_list.push_back(new Item_empty_string("Log_name", 255));
1569
 
  field_list.push_back(new Item_return_int("File_size", 20,
1570
 
                                           DRIZZLE_TYPE_LONGLONG));
1571
 
  if (protocol->send_fields(&field_list,
1572
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1573
 
    return(true);
1574
 
  
1575
 
  pthread_mutex_lock(mysql_bin_log.get_log_lock());
1576
 
  mysql_bin_log.lock_index();
1577
 
  index_file=mysql_bin_log.get_index_file();
1578
 
  
1579
 
  mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
1580
 
  pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
1581
 
  
1582
 
  cur_dir_len= dirname_length(cur.log_file_name);
1583
 
 
1584
 
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
1585
 
 
1586
 
  /* The file ends with EOF or empty line */
1587
 
  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
1588
 
  {
1589
 
    int dir_len;
1590
 
    uint64_t file_length= 0;                   // Length if open fails
1591
 
    fname[--length] = '\0';                     // remove the newline
1592
 
 
1593
 
    protocol->prepare_for_resend();
1594
 
    dir_len= dirname_length(fname);
1595
 
    length-= dir_len;
1596
 
    protocol->store(fname + dir_len, length, &my_charset_bin);
1597
 
 
1598
 
    if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
1599
 
      file_length= cur.pos;  /* The active log, use the active position */
1600
 
    else
1601
 
    {
1602
 
      /* this is an old log, open it and find the size */
1603
 
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
1604
 
                         MYF(0))) >= 0)
1605
 
      {
1606
 
        file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));
1607
 
        my_close(file, MYF(0));
1608
 
      }
1609
 
    }
1610
 
    protocol->store(file_length);
1611
 
    if (protocol->write())
1612
 
      goto err;
1613
 
  }
1614
 
  mysql_bin_log.unlock_index();
1615
 
  my_eof(thd);
1616
 
  return(false);
1617
 
 
1618
 
err:
1619
 
  mysql_bin_log.unlock_index();
1620
 
  return(true);
1621
 
}
1622
 
 
1623
 
/**
1624
 
   Load data's io cache specific hook to be executed
1625
 
   before a chunk of data is being read into the cache's buffer
1626
 
   The fuction instantianates and writes into the binlog
1627
 
   replication events along LOAD DATA processing.
1628
 
   
1629
 
   @param file  pointer to io-cache
1630
 
   @return 0
1631
 
*/
1632
 
int log_loaded_block(IO_CACHE* file)
1633
 
{
1634
 
  LOAD_FILE_INFO *lf_info;
1635
 
  uint32_t block_len;
1636
 
  /* buffer contains position where we started last read */
1637
 
  unsigned char* buffer= (unsigned char*) my_b_get_buffer_start(file);
1638
 
  uint32_t max_event_size= current_thd->variables.max_allowed_packet;
1639
 
  lf_info= (LOAD_FILE_INFO*) file->arg;
1640
 
  if (lf_info->thd->current_stmt_binlog_row_based)
1641
 
    return(0);
1642
 
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1643
 
      lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1644
 
    return(0);
1645
 
  
1646
 
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1647
 
       buffer += cmin(block_len, max_event_size),
1648
 
       block_len -= cmin(block_len, max_event_size))
1649
 
  {
1650
 
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1651
 
    if (lf_info->wrote_create_file)
1652
 
    {
1653
 
      Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
1654
 
                               cmin(block_len, max_event_size),
1655
 
                               lf_info->log_delayed);
1656
 
      mysql_bin_log.write(&a);
1657
 
    }
1658
 
    else
1659
 
    {
1660
 
      Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1661
 
                                   buffer,
1662
 
                                   cmin(block_len, max_event_size),
1663
 
                                   lf_info->log_delayed);
1664
 
      mysql_bin_log.write(&b);
1665
 
      lf_info->wrote_create_file= 1;
1666
 
    }
1667
 
  }
1668
 
  return(0);
1669
 
}
1670
 
 
1671
 
/*
1672
 
  Replication System Variables
1673
 
*/
1674
 
 
1675
 
class sys_var_slave_skip_counter :public sys_var
1676
 
{
1677
 
public:
1678
 
  sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
1679
 
    :sys_var(name_arg)
1680
 
  { chain_sys_var(chain); }
1681
 
  bool check(THD *thd, set_var *var);
1682
 
  bool update(THD *thd, set_var *var);
1683
 
  bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
1684
 
  /*
1685
 
    We can't retrieve the value of this, so we don't have to define
1686
 
    type() or value_ptr()
1687
 
  */
1688
 
};
1689
 
 
1690
 
class sys_var_sync_binlog_period :public sys_var_long_ptr
1691
 
{
1692
 
public:
1693
 
  sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
1694
 
                             ulong *value_ptr)
1695
 
    :sys_var_long_ptr(chain, name_arg,value_ptr) {}
1696
 
  bool update(THD *thd, set_var *var);
1697
 
};
1698
 
 
1699
 
static void fix_slave_net_timeout(THD *thd,
1700
 
                                  enum_var_type type __attribute__((unused)))
1701
 
{
1702
 
  pthread_mutex_lock(&LOCK_active_mi);
1703
 
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1704
 
    push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1705
 
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1706
 
                        "The currect value for master_heartbeat_period"
1707
 
                        " exceeds the new value of `slave_net_timeout' sec."
1708
 
                        " A sensible value for the period should be"
1709
 
                        " less than the timeout.");
1710
 
  pthread_mutex_unlock(&LOCK_active_mi);
1711
 
  return;
1712
 
}
1713
 
 
1714
 
static sys_var_chain vars = { NULL, NULL };
1715
 
 
1716
 
static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
1717
 
                                            &relay_log_purge);
1718
 
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
1719
 
                                              &slave_net_timeout,
1720
 
                                              fix_slave_net_timeout);
1721
 
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
1722
 
                                                &slave_trans_retries);
1723
 
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
1724
 
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
1725
 
 
1726
 
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
1727
 
 
1728
 
 
1729
 
static int show_slave_skip_errors(THD *thd __attribute__((unused)),
1730
 
                                  SHOW_VAR *var, char *buff)
1731
 
{
1732
 
  var->type=SHOW_CHAR;
1733
 
  var->value= buff;
1734
 
  if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
1735
 
  {
1736
 
    var->value= const_cast<char *>("OFF");
1737
 
  }
1738
 
  else if (bitmap_is_set_all(&slave_error_mask))
1739
 
  {
1740
 
    var->value= const_cast<char *>("ALL");
1741
 
  }
1742
 
  else
1743
 
  {
1744
 
    /* 10 is enough assuming errors are max 4 digits */
1745
 
    int i;
1746
 
    var->value= buff;
1747
 
    for (i= 1;
1748
 
         i < MAX_SLAVE_ERROR &&
1749
 
         (buff - var->value) < SHOW_VAR_FUNC_BUFF_SIZE;
1750
 
         i++)
1751
 
    {
1752
 
      if (bitmap_is_set(&slave_error_mask, i))
1753
 
      {
1754
 
        buff= int10_to_str(i, buff, 10);
1755
 
        *buff++= ',';
1756
 
      }
1757
 
    }
1758
 
    if (var->value != buff)
1759
 
      buff--;                           // Remove last ','
1760
 
    if (i < MAX_SLAVE_ERROR)
1761
 
      buff= my_stpcpy(buff, "...");  // Couldn't show all errors
1762
 
    *buff=0;
1763
 
  }
1764
 
  return 0;
1765
 
}
1766
 
 
1767
 
static st_show_var_func_container
1768
 
show_slave_skip_errors_cont = { &show_slave_skip_errors };
1769
 
 
1770
 
 
1771
 
static SHOW_VAR fixed_vars[]= {
1772
 
  {"log_slave_updates",       (char*) &opt_log_slave_updates,       SHOW_MY_BOOL},
1773
 
  {"relay_log" , (char*) &opt_relay_logname, SHOW_CHAR_PTR},
1774
 
  {"relay_log_index", (char*) &opt_relaylog_index_name, SHOW_CHAR_PTR},
1775
 
  {"relay_log_info_file", (char*) &relay_log_info_file, SHOW_CHAR_PTR},
1776
 
  {"relay_log_space_limit",   (char*) &relay_log_space_limit,       SHOW_LONGLONG},
1777
 
  {"slave_load_tmpdir",       (char*) &slave_load_tmpdir,           SHOW_CHAR_PTR},
1778
 
  {"slave_skip_errors",       (char*) &show_slave_skip_errors_cont,      SHOW_FUNC},
1779
 
};
1780
 
 
1781
 
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((unused)),
1782
 
                                       set_var *var)
1783
 
{
1784
 
  int result= 0;
1785
 
  pthread_mutex_lock(&LOCK_active_mi);
1786
 
  pthread_mutex_lock(&active_mi->rli.run_lock);
1787
 
  if (active_mi->rli.slave_running)
1788
 
  {
1789
 
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1790
 
    result=1;
1791
 
  }
1792
 
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1793
 
  pthread_mutex_unlock(&LOCK_active_mi);
1794
 
  var->save_result.ulong_value= (ulong) var->value->val_int();
1795
 
  return result;
1796
 
}
1797
 
 
1798
 
 
1799
 
bool sys_var_slave_skip_counter::update(THD *thd __attribute__((unused)),
1800
 
                                        set_var *var)
1801
 
{
1802
 
  pthread_mutex_lock(&LOCK_active_mi);
1803
 
  pthread_mutex_lock(&active_mi->rli.run_lock);
1804
 
  /*
1805
 
    The following test should normally never be true as we test this
1806
 
    in the check function;  To be safe against multiple
1807
 
    SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
1808
 
  */
1809
 
  if (!active_mi->rli.slave_running)
1810
 
  {
1811
 
    pthread_mutex_lock(&active_mi->rli.data_lock);
1812
 
    active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
1813
 
    pthread_mutex_unlock(&active_mi->rli.data_lock);
1814
 
  }
1815
 
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1816
 
  pthread_mutex_unlock(&LOCK_active_mi);
1817
 
  return 0;
1818
 
}
1819
 
 
1820
 
 
1821
 
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((unused)),
1822
 
                                        set_var *var)
1823
 
{
1824
 
  sync_binlog_period= (uint32_t) var->save_result.uint64_t_value;
1825
 
  return 0;
1826
 
}
1827
 
 
1828
 
int init_replication_sys_vars()
1829
 
{
1830
 
  mysql_append_static_vars(fixed_vars, sizeof(fixed_vars) / sizeof(SHOW_VAR));
1831
 
 
1832
 
  if (mysql_add_sys_var_chain(vars.first, my_long_options))
1833
 
  {
1834
 
    /* should not happen */
1835
 
    fprintf(stderr, "failed to initialize replication system variables");
1836
 
    unireg_abort(1);
1837
 
  }
1838
 
  return 0;
1839
 
}