~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_repl.cc

  • Committer: Brian Aker
  • Date: 2008-10-06 06:47:29 UTC
  • Revision ID: brian@tangent.org-20081006064729-2i9mhjkzyvow9xsm
Remove uint.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2006 MySQL AB & Sasha
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <drizzled/server_includes.h>
 
17
 
 
18
#include "rpl_mi.h"
 
19
#include "sql_repl.h"
 
20
#include "log_event.h"
 
21
#include "rpl_filter.h"
 
22
#include <drizzled/drizzled_error_messages.h>
 
23
 
 
24
int max_binlog_dump_events = 0; // unlimited
 
25
 
 
26
/*
 
27
    fake_rotate_event() builds a fake (=which does not exist physically in any
 
28
    binlog) Rotate event, which contains the name of the binlog we are going to
 
29
    send to the slave (because the slave may not know it if it just asked for
 
30
    MASTER_LOG_FILE='', MASTER_LOG_POS=4).
 
31
    < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
 
32
    After this version we always call it, so that a 3.23.58 slave can rely on
 
33
    it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
 
34
    zeros in the good positions which, by chance, make it possible for the 3.23
 
35
    slave to detect that this event is unexpected) (this is luck which happens
 
36
    because the master and slave disagree on the size of the header of
 
37
    Log_event).
 
38
 
 
39
    Relying on the event length of the Rotate event instead of these
 
40
    well-placed zeros was not possible as Rotate events have a variable-length
 
41
    part.
 
42
*/
 
43
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
 
44
                             uint64_t position, const char** errmsg)
 
45
{
 
46
  char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
 
47
  /*
 
48
    'when' (the timestamp) is set to 0 so that slave could distinguish between
 
49
    real and fake Rotate events (if necessary)
 
50
  */
 
51
  memset(header, 0, 4);
 
52
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
 
53
 
 
54
  char* p = log_file_name+dirname_length(log_file_name);
 
55
  uint32_t ident_len = (uint32_t) strlen(p);
 
56
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
 
57
  int4store(header + SERVER_ID_OFFSET, server_id);
 
58
  int4store(header + EVENT_LEN_OFFSET, event_len);
 
59
  int2store(header + FLAGS_OFFSET, 0);
 
60
 
 
61
  // TODO: check what problems this may cause and fix them
 
62
  int4store(header + LOG_POS_OFFSET, 0);
 
63
 
 
64
  packet->append(header, sizeof(header));
 
65
  int8store(buf+R_POS_OFFSET,position);
 
66
  packet->append(buf, ROTATE_HEADER_LEN);
 
67
  packet->append(p,ident_len);
 
68
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
69
  {
 
70
    *errmsg = "failed on my_net_write()";
 
71
    return(-1);
 
72
  }
 
73
  return(0);
 
74
}
 
75
 
 
76
static int send_file(THD *thd)
 
77
{
 
78
  NET* net = &thd->net;
 
79
  int fd = -1, error = 1;
 
80
  size_t bytes;
 
81
  char fname[FN_REFLEN+1];
 
82
  const char *errmsg = 0;
 
83
  int old_timeout;
 
84
  unsigned long packet_len;
 
85
  unsigned char buf[IO_SIZE];                           // It's safe to alloc this
 
86
 
 
87
  /*
 
88
    The client might be slow loading the data, give him wait_timeout to do
 
89
    the job
 
90
  */
 
91
  old_timeout= net->read_timeout;
 
92
  my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
 
93
 
 
94
  /*
 
95
    We need net_flush here because the client will not know it needs to send
 
96
    us the file name until it has processed the load event entry
 
97
  */
 
98
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
 
99
  {
 
100
    errmsg = _("Failed in send_file() while reading file name");
 
101
    goto err;
 
102
  }
 
103
 
 
104
  // terminate with \0 for fn_format
 
105
  *((char*)net->read_pos +  packet_len) = 0;
 
106
  fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
 
107
  // this is needed to make replicate-ignore-db
 
108
  if (!strcmp(fname,"/dev/null"))
 
109
    goto end;
 
110
 
 
111
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
 
112
  {
 
113
    errmsg = _("Failed in send_file() on open of file");
 
114
    goto err;
 
115
  }
 
116
 
 
117
  while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0)
 
118
  {
 
119
    if (my_net_write(net, buf, bytes))
 
120
    {
 
121
      errmsg = _("Failed in send_file() while writing data to client");
 
122
      goto err;
 
123
    }
 
124
  }
 
125
 
 
126
 end:
 
127
  if (my_net_write(net, (unsigned char*) "", 0) || net_flush(net) ||
 
128
      (my_net_read(net) == packet_error))
 
129
  {
 
130
    errmsg = _("Failed in send_file() while negotiating file transfer close");
 
131
    goto err;
 
132
  }
 
133
  error = 0;
 
134
 
 
135
 err:
 
136
  my_net_set_read_timeout(net, old_timeout);
 
137
  if (fd >= 0)
 
138
    (void) my_close(fd, MYF(0));
 
139
  if (errmsg)
 
140
  {
 
141
    sql_print_error(errmsg);
 
142
  }
 
143
  return(error);
 
144
}
 
145
 
 
146
 
 
147
/*
 
148
  Adjust the position pointer in the binary log file for all running slaves
 
149
 
 
150
  SYNOPSIS
 
151
    adjust_linfo_offsets()
 
152
    purge_offset        Number of bytes removed from start of log index file
 
153
 
 
154
  NOTES
 
155
    - This is called when doing a PURGE when we delete lines from the
 
156
      index log file
 
157
 
 
158
  REQUIREMENTS
 
159
    - Before calling this function, we have to ensure that no threads are
 
160
      using any binary log file before purge_offset.a
 
161
 
 
162
  TODO
 
163
    - Inform the slave threads that they should sync the position
 
164
      in the binary log file with flush_relay_log_info.
 
165
      Now they sync is done for next read.
 
166
*/
 
167
 
 
168
void adjust_linfo_offsets(my_off_t purge_offset)
 
169
{
 
170
  THD *tmp;
 
171
 
 
172
  pthread_mutex_lock(&LOCK_thread_count);
 
173
  I_List_iterator<THD> it(threads);
 
174
 
 
175
  while ((tmp=it++))
 
176
  {
 
177
    LOG_INFO* linfo;
 
178
    if ((linfo = tmp->current_linfo))
 
179
    {
 
180
      pthread_mutex_lock(&linfo->lock);
 
181
      /*
 
182
        Index file offset can be less that purge offset only if
 
183
        we just started reading the index file. In that case
 
184
        we have nothing to adjust
 
185
      */
 
186
      if (linfo->index_file_offset < purge_offset)
 
187
        linfo->fatal = (linfo->index_file_offset != 0);
 
188
      else
 
189
        linfo->index_file_offset -= purge_offset;
 
190
      pthread_mutex_unlock(&linfo->lock);
 
191
    }
 
192
  }
 
193
  pthread_mutex_unlock(&LOCK_thread_count);
 
194
}
 
195
 
 
196
 
 
197
bool log_in_use(const char* log_name)
 
198
{
 
199
  int log_name_len = strlen(log_name) + 1;
 
200
  THD *tmp;
 
201
  bool result = 0;
 
202
 
 
203
  pthread_mutex_lock(&LOCK_thread_count);
 
204
  I_List_iterator<THD> it(threads);
 
205
 
 
206
  while ((tmp=it++))
 
207
  {
 
208
    LOG_INFO* linfo;
 
209
    if ((linfo = tmp->current_linfo))
 
210
    {
 
211
      pthread_mutex_lock(&linfo->lock);
 
212
      result = !memcmp(log_name, linfo->log_file_name, log_name_len);
 
213
      pthread_mutex_unlock(&linfo->lock);
 
214
      if (result)
 
215
        break;
 
216
    }
 
217
  }
 
218
 
 
219
  pthread_mutex_unlock(&LOCK_thread_count);
 
220
  return result;
 
221
}
 
222
 
 
223
bool purge_error_message(THD* thd, int res)
 
224
{
 
225
  uint32_t errmsg= 0;
 
226
 
 
227
  switch (res)  {
 
228
  case 0: break;
 
229
  case LOG_INFO_EOF:    errmsg= ER_UNKNOWN_TARGET_BINLOG; break;
 
230
  case LOG_INFO_IO:     errmsg= ER_IO_ERR_LOG_INDEX_READ; break;
 
231
  case LOG_INFO_INVALID:errmsg= ER_BINLOG_PURGE_PROHIBITED; break;
 
232
  case LOG_INFO_SEEK:   errmsg= ER_FSEEK_FAIL; break;
 
233
  case LOG_INFO_MEM:    errmsg= ER_OUT_OF_RESOURCES; break;
 
234
  case LOG_INFO_FATAL:  errmsg= ER_BINLOG_PURGE_FATAL_ERR; break;
 
235
  case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break;
 
236
  case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break;
 
237
  default:              errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break;
 
238
  }
 
239
 
 
240
  if (errmsg)
 
241
  {
 
242
    my_message(errmsg, ER(errmsg), MYF(0));
 
243
    return true;
 
244
  }
 
245
  my_ok(thd);
 
246
  return false;
 
247
}
 
248
 
 
249
 
 
250
bool purge_master_logs(THD* thd, const char* to_log)
 
251
{
 
252
  char search_file_name[FN_REFLEN];
 
253
  if (!mysql_bin_log.is_open())
 
254
  {
 
255
    my_ok(thd);
 
256
    return false;
 
257
  }
 
258
 
 
259
  mysql_bin_log.make_log_name(search_file_name, to_log);
 
260
  return purge_error_message(thd,
 
261
                             mysql_bin_log.purge_logs(search_file_name, 0, 1,
 
262
                                                      1, NULL));
 
263
}
 
264
 
 
265
 
 
266
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
 
267
{
 
268
  if (!mysql_bin_log.is_open())
 
269
  {
 
270
    my_ok(thd);
 
271
    return 0;
 
272
  }
 
273
  return purge_error_message(thd,
 
274
                             mysql_bin_log.purge_logs_before_date(purge_time));
 
275
}
 
276
 
 
277
int test_for_non_eof_log_read_errors(int error, const char **errmsg)
 
278
{
 
279
  if (error == LOG_READ_EOF)
 
280
    return 0;
 
281
  my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
282
  switch (error) {
 
283
  case LOG_READ_BOGUS:
 
284
    *errmsg = "bogus data in log event";
 
285
    break;
 
286
  case LOG_READ_TOO_LARGE:
 
287
    *errmsg = "log event entry exceeded max_allowed_packet; \
 
288
Increase max_allowed_packet on master";
 
289
    break;
 
290
  case LOG_READ_IO:
 
291
    *errmsg = "I/O error reading log event";
 
292
    break;
 
293
  case LOG_READ_MEM:
 
294
    *errmsg = "memory allocation failed reading log event";
 
295
    break;
 
296
  case LOG_READ_TRUNC:
 
297
    *errmsg = "binlog truncated in the middle of event";
 
298
    break;
 
299
  default:
 
300
    *errmsg = "unknown error reading log event on the master";
 
301
    break;
 
302
  }
 
303
  return error;
 
304
}
 
305
 
 
306
 
 
307
/**
 
308
  An auxiliary function for calling in mysql_binlog_send
 
309
  to initialize the heartbeat timeout in waiting for a binlogged event.
 
310
 
 
311
  @param[in]    thd  THD to access a user variable
 
312
 
 
313
  @return        heartbeat period an uint64_t of nanoseconds
 
314
                 or zero if heartbeat was not demanded by slave
 
315
*/ 
 
316
static uint64_t get_heartbeat_period(THD * thd)
 
317
{
 
318
  bool null_value;
 
319
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
 
320
  user_var_entry *entry= 
 
321
    (user_var_entry*) hash_search(&thd->user_vars, (unsigned char*) name.str,
 
322
                                  name.length);
 
323
  return entry? entry->val_int(&null_value) : 0;
 
324
}
 
325
 
 
326
/*
 
327
  Function prepares and sends repliation heartbeat event.
 
328
 
 
329
  @param net                net object of THD
 
330
  @param packet             buffer to store the heartbeat instance
 
331
  @param event_coordinates  binlog file name and position of the last
 
332
                            real event master sent from binlog
 
333
 
 
334
  @note 
 
335
    Among three essential pieces of heartbeat data Log_event::when
 
336
    is computed locally.
 
337
    The  error to send is serious and should force terminating
 
338
    the dump thread.
 
339
*/
 
340
static int send_heartbeat_event(NET* net, String* packet,
 
341
                                const struct event_coordinates *coord)
 
342
{
 
343
  char header[LOG_EVENT_HEADER_LEN];
 
344
  /*
 
345
    'when' (the timestamp) is set to 0 so that slave could distinguish between
 
346
    real and fake Rotate events (if necessary)
 
347
  */
 
348
  memset(header, 0, 4);  // when
 
349
 
 
350
  header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
 
351
 
 
352
  char* p= coord->file_name + dirname_length(coord->file_name);
 
353
 
 
354
  uint32_t ident_len = strlen(p);
 
355
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN;
 
356
  int4store(header + SERVER_ID_OFFSET, server_id);
 
357
  int4store(header + EVENT_LEN_OFFSET, event_len);
 
358
  int2store(header + FLAGS_OFFSET, 0);
 
359
 
 
360
  int4store(header + LOG_POS_OFFSET, coord->pos);  // log_pos
 
361
 
 
362
  packet->append(header, sizeof(header));
 
363
  packet->append(p, ident_len);             // log_file_name
 
364
 
 
365
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) ||
 
366
      net_flush(net))
 
367
  {
 
368
    return(-1);
 
369
  }
 
370
  packet->set("\0", 1, &my_charset_bin);
 
371
  return(0);
 
372
}
 
373
 
 
374
/*
 
375
  TODO: Clean up loop to only have one call to send_file()
 
376
*/
 
377
 
 
378
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
 
379
                       uint16_t flags)
 
380
{
 
381
  LOG_INFO linfo;
 
382
  char *log_file_name = linfo.log_file_name;
 
383
  char search_file_name[FN_REFLEN], *name;
 
384
  IO_CACHE log;
 
385
  File file = -1;
 
386
  String* packet = &thd->packet;
 
387
  int error;
 
388
  const char *errmsg = "Unknown error";
 
389
  NET* net = &thd->net;
 
390
  pthread_mutex_t *log_lock;
 
391
  bool binlog_can_be_corrupted= false;
 
392
 
 
393
  memset(&log, 0, sizeof(log));
 
394
  /* 
 
395
     heartbeat_period from @master_heartbeat_period user variable
 
396
  */
 
397
  uint64_t heartbeat_period= get_heartbeat_period(thd);
 
398
  struct timespec heartbeat_buf;
 
399
  struct event_coordinates coord_buf;
 
400
  struct timespec *heartbeat_ts= NULL;
 
401
  struct event_coordinates *coord= NULL;
 
402
  if (heartbeat_period != 0L)
 
403
  {
 
404
    heartbeat_ts= &heartbeat_buf;
 
405
    set_timespec_nsec(*heartbeat_ts, 0);
 
406
    coord= &coord_buf;
 
407
    coord->file_name= log_file_name; // initialization basing on what slave remembers
 
408
    coord->pos= pos;
 
409
  }
 
410
 
 
411
  if (!mysql_bin_log.is_open())
 
412
  {
 
413
    errmsg = "Binary log is not open";
 
414
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
415
    goto err;
 
416
  }
 
417
  if (!server_id_supplied)
 
418
  {
 
419
    errmsg = "Misconfigured master - server id was not set";
 
420
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
421
    goto err;
 
422
  }
 
423
 
 
424
  name=search_file_name;
 
425
  if (log_ident[0])
 
426
    mysql_bin_log.make_log_name(search_file_name, log_ident);
 
427
  else
 
428
    name=0;                                     // Find first log
 
429
 
 
430
  linfo.index_file_offset = 0;
 
431
 
 
432
  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
 
433
  {
 
434
    errmsg = "Could not find first log file name in binary log index file";
 
435
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
436
    goto err;
 
437
  }
 
438
 
 
439
  pthread_mutex_lock(&LOCK_thread_count);
 
440
  thd->current_linfo = &linfo;
 
441
  pthread_mutex_unlock(&LOCK_thread_count);
 
442
 
 
443
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
 
444
  {
 
445
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
446
    goto err;
 
447
  }
 
448
  if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
 
449
  {
 
450
    errmsg= "Client requested master to start replication from \
 
451
impossible position";
 
452
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
453
    goto err;
 
454
  }
 
455
 
 
456
  /*
 
457
    We need to start a packet with something other than 255
 
458
    to distinguish it from error
 
459
  */
 
460
  packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
 
461
 
 
462
  /*
 
463
    Tell the client about the log name with a fake Rotate event;
 
464
    this is needed even if we also send a Format_description_log_event
 
465
    just after, because that event does not contain the binlog's name.
 
466
    Note that as this Rotate event is sent before
 
467
    Format_description_log_event, the slave cannot have any info to
 
468
    understand this event's format, so the header len of
 
469
    Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
 
470
    than other events except FORMAT_DESCRIPTION_EVENT).
 
471
    Before 4.0.14 we called fake_rotate_event below only if (pos ==
 
472
    BIN_LOG_HEADER_SIZE), because if this is false then the slave
 
473
    already knows the binlog's name.
 
474
    Since, we always call fake_rotate_event; if the slave already knew
 
475
    the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
 
476
    useless but does not harm much. It is nice for 3.23 (>=.58) slaves
 
477
    which test Rotate events to see if the master is 4.0 (then they
 
478
    choose to stop because they can't replicate 4.0); by always calling
 
479
    fake_rotate_event we are sure that 3.23.58 and newer will detect the
 
480
    problem as soon as replication starts (BUG#198).
 
481
    Always calling fake_rotate_event makes sending of normal
 
482
    (=from-binlog) Rotate events a priori unneeded, but it is not so
 
483
    simple: the 2 Rotate events are not equivalent, the normal one is
 
484
    before the Stop event, the fake one is after. If we don't send the
 
485
    normal one, then the Stop event will be interpreted (by existing 4.0
 
486
    slaves) as "the master stopped", which is wrong. So for safety,
 
487
    given that we want minimum modification of 4.0, we send the normal
 
488
    and fake Rotates.
 
489
  */
 
490
  if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
 
491
  {
 
492
    /*
 
493
       This error code is not perfect, as fake_rotate_event() does not
 
494
       read anything from the binlog; if it fails it's because of an
 
495
       error in my_net_write(), fortunately it will say so in errmsg.
 
496
    */
 
497
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
498
    goto err;
 
499
  }
 
500
  packet->set("\0", 1, &my_charset_bin);
 
501
  /*
 
502
    Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
 
503
    this larger than the corresponding packet (query) sent 
 
504
    from client to master.
 
505
  */
 
506
  thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
 
507
 
 
508
  /*
 
509
    We can set log_lock now, it does not move (it's a member of
 
510
    mysql_bin_log, and it's already inited, and it will be destroyed
 
511
    only at shutdown).
 
512
  */
 
513
  log_lock = mysql_bin_log.get_log_lock();
 
514
  if (pos > BIN_LOG_HEADER_SIZE)
 
515
  {
 
516
     /*
 
517
       Try to find a Format_description_log_event at the beginning of
 
518
       the binlog
 
519
     */
 
520
     if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
 
521
     {
 
522
       /*
 
523
         The packet has offsets equal to the normal offsets in a binlog
 
524
         event +1 (the first character is \0).
 
525
       */
 
526
       if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
 
527
       {
 
528
         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
 
529
                                       LOG_EVENT_BINLOG_IN_USE_F);
 
530
         (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
 
531
         /*
 
532
           mark that this event with "log_pos=0", so the slave
 
533
           should not increment master's binlog position
 
534
           (rli->group_master_log_pos)
 
535
         */
 
536
         int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
 
537
         /*
 
538
           if reconnect master sends FD event with `created' as 0
 
539
           to avoid destroying temp tables.
 
540
          */
 
541
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
 
542
                   ST_CREATED_OFFSET+1, (uint32_t) 0);
 
543
         /* send it */
 
544
         if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
545
         {
 
546
           errmsg = "Failed on my_net_write()";
 
547
           my_errno= ER_UNKNOWN_ERROR;
 
548
           goto err;
 
549
         }
 
550
 
 
551
         /*
 
552
           No need to save this event. We are only doing simple reads
 
553
           (no real parsing of the events) so we don't need it. And so
 
554
           we don't need the artificial Format_description_log_event of
 
555
           3.23&4.x.
 
556
         */
 
557
       }
 
558
     }
 
559
     else
 
560
     {
 
561
       if (test_for_non_eof_log_read_errors(error, &errmsg))
 
562
         goto err;
 
563
       /*
 
564
         It's EOF, nothing to do, go on reading next events, the
 
565
         Format_description_log_event will be found naturally if it is written.
 
566
       */
 
567
     }
 
568
     /* reset the packet as we wrote to it in any case */
 
569
     packet->set("\0", 1, &my_charset_bin);
 
570
  } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
 
571
  else
 
572
  {
 
573
    /* The Format_description_log_event event will be found naturally. */
 
574
  }
 
575
 
 
576
  /* seek to the requested position, to start the requested dump */
 
577
  my_b_seek(&log, pos);                 // Seek will done on next read
 
578
 
 
579
  while (!net->error && net->vio != 0 && !thd->killed)
 
580
  {
 
581
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
 
582
    {
 
583
      /*
 
584
        log's filename does not change while it's active
 
585
      */
 
586
      if (coord)
 
587
        coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
 
588
 
 
589
      if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
 
590
      {
 
591
        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
 
592
                                      LOG_EVENT_BINLOG_IN_USE_F);
 
593
        (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
 
594
      }
 
595
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
 
596
        binlog_can_be_corrupted= false;
 
597
 
 
598
      if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
599
      {
 
600
        errmsg = "Failed on my_net_write()";
 
601
        my_errno= ER_UNKNOWN_ERROR;
 
602
        goto err;
 
603
      }
 
604
 
 
605
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
 
606
      {
 
607
        if (send_file(thd))
 
608
        {
 
609
          errmsg = "failed in send_file()";
 
610
          my_errno= ER_UNKNOWN_ERROR;
 
611
          goto err;
 
612
        }
 
613
      }
 
614
      packet->set("\0", 1, &my_charset_bin);
 
615
    }
 
616
 
 
617
    /*
 
618
      here we were reading binlog that was not closed properly (as a result
 
619
      of a crash ?). treat any corruption as EOF
 
620
    */
 
621
    if (binlog_can_be_corrupted && error != LOG_READ_MEM)
 
622
      error=LOG_READ_EOF;
 
623
    /*
 
624
      TODO: now that we are logging the offset, check to make sure
 
625
      the recorded offset and the actual match.
 
626
      Guilhem 2003-06: this is not true if this master is a slave
 
627
      <4.0.15 running with --log-slave-updates, because then log_pos may
 
628
      be the offset in the-master-of-this-master's binlog.
 
629
    */
 
630
    if (test_for_non_eof_log_read_errors(error, &errmsg))
 
631
      goto err;
 
632
 
 
633
    if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
 
634
        mysql_bin_log.is_active(log_file_name))
 
635
    {
 
636
      /*
 
637
        Block until there is more data in the log
 
638
      */
 
639
      if (net_flush(net))
 
640
      {
 
641
        errmsg = "failed on net_flush()";
 
642
        my_errno= ER_UNKNOWN_ERROR;
 
643
        goto err;
 
644
      }
 
645
 
 
646
      /*
 
647
        We may have missed the update broadcast from the log
 
648
        that has just happened, let's try to catch it if it did.
 
649
        If we did not miss anything, we just wait for other threads
 
650
        to signal us.
 
651
      */
 
652
      {
 
653
        log.error=0;
 
654
        bool read_packet = 0, fatal_error = 0;
 
655
 
 
656
        /*
 
657
          No one will update the log while we are reading
 
658
          now, but we'll be quick and just read one record
 
659
 
 
660
          TODO:
 
661
          Add an counter that is incremented for each time we update the
 
662
          binary log.  We can avoid the following read if the counter
 
663
          has not been updated since last read.
 
664
        */
 
665
 
 
666
        pthread_mutex_lock(log_lock);
 
667
        switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
 
668
        case 0:
 
669
          /* we read successfully, so we'll need to send it to the slave */
 
670
          pthread_mutex_unlock(log_lock);
 
671
          read_packet = 1;
 
672
          if (coord)
 
673
            coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
 
674
          break;
 
675
 
 
676
        case LOG_READ_EOF:
 
677
        {
 
678
          int ret;
 
679
          if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
 
680
          {
 
681
            pthread_mutex_unlock(log_lock);
 
682
            goto end;
 
683
          }
 
684
 
 
685
          do 
 
686
          {
 
687
            if (coord)
 
688
            {
 
689
              assert(heartbeat_ts && heartbeat_period != 0L);
 
690
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
 
691
            }
 
692
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
 
693
            assert(ret == 0 || (heartbeat_period != 0L && coord != NULL));
 
694
            if (ret == ETIMEDOUT || ret == ETIME)
 
695
            {
 
696
              if (send_heartbeat_event(net, packet, coord))
 
697
              {
 
698
                errmsg = "Failed on my_net_write()";
 
699
                my_errno= ER_UNKNOWN_ERROR;
 
700
                pthread_mutex_unlock(log_lock);
 
701
                goto err;
 
702
              }
 
703
            }
 
704
            else
 
705
            {
 
706
              assert(ret == 0);
 
707
            }
 
708
          } while (ret != 0 && coord != NULL && !thd->killed);
 
709
          pthread_mutex_unlock(log_lock);
 
710
        }    
 
711
        break;
 
712
            
 
713
        default:
 
714
          pthread_mutex_unlock(log_lock);
 
715
          fatal_error = 1;
 
716
          break;
 
717
        }
 
718
 
 
719
        if (read_packet)
 
720
        {
 
721
          thd_proc_info(thd, "Sending binlog event to slave");
 
722
          if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) )
 
723
          {
 
724
            errmsg = "Failed on my_net_write()";
 
725
            my_errno= ER_UNKNOWN_ERROR;
 
726
            goto err;
 
727
          }
 
728
 
 
729
          if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
 
730
          {
 
731
            if (send_file(thd))
 
732
            {
 
733
              errmsg = "failed in send_file()";
 
734
              my_errno= ER_UNKNOWN_ERROR;
 
735
              goto err;
 
736
            }
 
737
          }
 
738
          packet->set("\0", 1, &my_charset_bin);
 
739
          /*
 
740
            No need to net_flush because we will get to flush later when
 
741
            we hit EOF pretty quick
 
742
          */
 
743
        }
 
744
 
 
745
        if (fatal_error)
 
746
        {
 
747
          errmsg = "error reading log entry";
 
748
          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
749
          goto err;
 
750
        }
 
751
        log.error=0;
 
752
      }
 
753
    }
 
754
    else
 
755
    {
 
756
      bool loop_breaker = 0;
 
757
      /* need this to break out of the for loop from switch */
 
758
 
 
759
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
 
760
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
 
761
      case LOG_INFO_EOF:
 
762
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
 
763
        break;
 
764
      case 0:
 
765
        break;
 
766
      default:
 
767
        errmsg = "could not find next log";
 
768
        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
769
        goto err;
 
770
      }
 
771
 
 
772
      if (loop_breaker)
 
773
        break;
 
774
 
 
775
      end_io_cache(&log);
 
776
      (void) my_close(file, MYF(MY_WME));
 
777
 
 
778
      /*
 
779
        Call fake_rotate_event() in case the previous log (the one which
 
780
        we have just finished reading) did not contain a Rotate event
 
781
        (for example (I don't know any other example) the previous log
 
782
        was the last one before the master was shutdown & restarted).
 
783
        This way we tell the slave about the new log's name and
 
784
        position.  If the binlog is 5.0, the next event we are going to
 
785
        read and send is Format_description_log_event.
 
786
      */
 
787
      if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
 
788
          fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
 
789
                            &errmsg))
 
790
      {
 
791
        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
 
792
        goto err;
 
793
      }
 
794
 
 
795
      packet->length(0);
 
796
      packet->append('\0');
 
797
      if (coord)
 
798
        coord->file_name= log_file_name; // reset to the next
 
799
    }
 
800
  }
 
801
 
 
802
end:
 
803
  end_io_cache(&log);
 
804
  (void)my_close(file, MYF(MY_WME));
 
805
 
 
806
  my_eof(thd);
 
807
  thd_proc_info(thd, "Waiting to finalize termination");
 
808
  pthread_mutex_lock(&LOCK_thread_count);
 
809
  thd->current_linfo = 0;
 
810
  pthread_mutex_unlock(&LOCK_thread_count);
 
811
  return;
 
812
 
 
813
err:
 
814
  thd_proc_info(thd, "Waiting to finalize termination");
 
815
  end_io_cache(&log);
 
816
  /*
 
817
    Exclude  iteration through thread list
 
818
    this is needed for purge_logs() - it will iterate through
 
819
    thread list and update thd->current_linfo->index_file_offset
 
820
    this mutex will make sure that it never tried to update our linfo
 
821
    after we return from this stack frame
 
822
  */
 
823
  pthread_mutex_lock(&LOCK_thread_count);
 
824
  thd->current_linfo = 0;
 
825
  pthread_mutex_unlock(&LOCK_thread_count);
 
826
  if (file >= 0)
 
827
    (void) my_close(file, MYF(MY_WME));
 
828
 
 
829
  my_message(my_errno, errmsg, MYF(0));
 
830
  return;
 
831
}
 
832
 
 
833
int start_slave(THD* thd , Master_info* mi,  bool net_report)
 
834
{
 
835
  int slave_errno= 0;
 
836
  int thread_mask;
 
837
 
 
838
  lock_slave_threads(mi);  // this allows us to cleanly read slave_running
 
839
  // Get a mask of _stopped_ threads
 
840
  init_thread_mask(&thread_mask,mi,1 /* inverse */);
 
841
  /*
 
842
    Below we will start all stopped threads.  But if the user wants to
 
843
    start only one thread, do as if the other thread was running (as we
 
844
    don't wan't to touch the other thread), so set the bit to 0 for the
 
845
    other thread
 
846
  */
 
847
  if (thd->lex->slave_thd_opt)
 
848
    thread_mask&= thd->lex->slave_thd_opt;
 
849
  if (thread_mask) //some threads are stopped, start them
 
850
  {
 
851
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
 
852
                         thread_mask))
 
853
      slave_errno=ER_MASTER_INFO;
 
854
    else if (server_id_supplied && *mi->host)
 
855
    {
 
856
      /*
 
857
        If we will start SQL thread we will care about UNTIL options If
 
858
        not and they are specified we will ignore them and warn user
 
859
        about this fact.
 
860
      */
 
861
      if (thread_mask & SLAVE_SQL)
 
862
      {
 
863
        pthread_mutex_lock(&mi->rli.data_lock);
 
864
 
 
865
        if (thd->lex->mi.pos)
 
866
        {
 
867
          mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
 
868
          mi->rli.until_log_pos= thd->lex->mi.pos;
 
869
          /*
 
870
             We don't check thd->lex->mi.log_file_name for NULL here
 
871
             since it is checked in sql_yacc.yy
 
872
          */
 
873
          strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
 
874
                  sizeof(mi->rli.until_log_name)-1);
 
875
        }
 
876
        else if (thd->lex->mi.relay_log_pos)
 
877
        {
 
878
          mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
 
879
          mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
 
880
          strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
 
881
                  sizeof(mi->rli.until_log_name)-1);
 
882
        }
 
883
        else
 
884
          mi->rli.clear_until_condition();
 
885
 
 
886
        if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
 
887
        {
 
888
          /* Preparing members for effective until condition checking */
 
889
          const char *p= fn_ext(mi->rli.until_log_name);
 
890
          char *p_end;
 
891
          if (*p)
 
892
          {
 
893
            //p points to '.'
 
894
            mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
 
895
            /*
 
896
              p_end points to the first invalid character. If it equals
 
897
              to p, no digits were found, error. If it contains '\0' it
 
898
              means  conversion went ok.
 
899
            */
 
900
            if (p_end==p || *p_end)
 
901
              slave_errno=ER_BAD_SLAVE_UNTIL_COND;
 
902
          }
 
903
          else
 
904
            slave_errno=ER_BAD_SLAVE_UNTIL_COND;
 
905
 
 
906
          /* mark the cached result of the UNTIL comparison as "undefined" */
 
907
          mi->rli.until_log_names_cmp_result=
 
908
            Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
 
909
 
 
910
          /* Issuing warning then started without --skip-slave-start */
 
911
          if (!opt_skip_slave_start)
 
912
            push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
913
                         ER_MISSING_SKIP_SLAVE,
 
914
                         ER(ER_MISSING_SKIP_SLAVE));
 
915
        }
 
916
 
 
917
        pthread_mutex_unlock(&mi->rli.data_lock);
 
918
      }
 
919
      else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
 
920
        push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
 
921
                     ER(ER_UNTIL_COND_IGNORED));
 
922
 
 
923
      if (!slave_errno)
 
924
        slave_errno = start_slave_threads(0 /*no mutex */,
 
925
                                        1 /* wait for start */,
 
926
                                        mi,
 
927
                                        master_info_file,relay_log_info_file,
 
928
                                        thread_mask);
 
929
    }
 
930
    else
 
931
      slave_errno = ER_BAD_SLAVE;
 
932
  }
 
933
  else
 
934
  {
 
935
    /* no error if all threads are already started, only a warning */
 
936
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
 
937
                 ER(ER_SLAVE_WAS_RUNNING));
 
938
  }
 
939
 
 
940
  unlock_slave_threads(mi);
 
941
 
 
942
  if (slave_errno)
 
943
  {
 
944
    if (net_report)
 
945
      my_message(slave_errno, ER(slave_errno), MYF(0));
 
946
    return(1);
 
947
  }
 
948
  else if (net_report)
 
949
    my_ok(thd);
 
950
 
 
951
  return(0);
 
952
}
 
953
 
 
954
 
 
955
int stop_slave(THD* thd, Master_info* mi, bool net_report )
 
956
{
 
957
  int slave_errno;
 
958
  if (!thd)
 
959
    thd = current_thd;
 
960
 
 
961
  thd_proc_info(thd, "Killing slave");
 
962
  int thread_mask;
 
963
  lock_slave_threads(mi);
 
964
  // Get a mask of _running_ threads
 
965
  init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
 
966
  /*
 
967
    Below we will stop all running threads.
 
968
    But if the user wants to stop only one thread, do as if the other thread
 
969
    was stopped (as we don't wan't to touch the other thread), so set the
 
970
    bit to 0 for the other thread
 
971
  */
 
972
  if (thd->lex->slave_thd_opt)
 
973
    thread_mask &= thd->lex->slave_thd_opt;
 
974
 
 
975
  if (thread_mask)
 
976
  {
 
977
    slave_errno= terminate_slave_threads(mi,thread_mask,
 
978
                                         1 /*skip lock */);
 
979
  }
 
980
  else
 
981
  {
 
982
    //no error if both threads are already stopped, only a warning
 
983
    slave_errno= 0;
 
984
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
 
985
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
 
986
  }
 
987
  unlock_slave_threads(mi);
 
988
  thd_proc_info(thd, 0);
 
989
 
 
990
  if (slave_errno)
 
991
  {
 
992
    if (net_report)
 
993
      my_message(slave_errno, ER(slave_errno), MYF(0));
 
994
    return(1);
 
995
  }
 
996
  else if (net_report)
 
997
    my_ok(thd);
 
998
 
 
999
  return(0);
 
1000
}
 
1001
 
 
1002
 
 
1003
/*
 
1004
  Remove all relay logs and start replication from the start
 
1005
 
 
1006
  SYNOPSIS
 
1007
    reset_slave()
 
1008
    thd                 Thread handler
 
1009
    mi                  Master info for the slave
 
1010
 
 
1011
  RETURN
 
1012
    0   ok
 
1013
    1   error
 
1014
*/
 
1015
 
 
1016
 
 
1017
int reset_slave(THD *thd, Master_info* mi)
 
1018
{
 
1019
  struct stat stat_area;
 
1020
  char fname[FN_REFLEN];
 
1021
  int thread_mask= 0, error= 0;
 
1022
  uint32_t sql_errno=0;
 
1023
  const char* errmsg=0;
 
1024
 
 
1025
  lock_slave_threads(mi);
 
1026
  init_thread_mask(&thread_mask,mi,0 /* not inverse */);
 
1027
  if (thread_mask) // We refuse if any slave thread is running
 
1028
  {
 
1029
    sql_errno= ER_SLAVE_MUST_STOP;
 
1030
    error=1;
 
1031
    goto err;
 
1032
  }
 
1033
 
 
1034
  // delete relay logs, clear relay log coordinates
 
1035
  if ((error= purge_relay_logs(&mi->rli, thd,
 
1036
                               1 /* just reset */,
 
1037
                               &errmsg)))
 
1038
    goto err;
 
1039
 
 
1040
  /* Clear master's log coordinates */
 
1041
  init_master_log_pos(mi);
 
1042
  /*
 
1043
     Reset errors (the idea is that we forget about the
 
1044
     old master).
 
1045
  */
 
1046
  mi->rli.clear_error();
 
1047
  mi->rli.clear_until_condition();
 
1048
 
 
1049
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
 
1050
  end_master_info(mi);
 
1051
  // and delete these two files
 
1052
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
 
1053
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
 
1054
  {
 
1055
    error=1;
 
1056
    goto err;
 
1057
  }
 
1058
  // delete relay_log_info_file
 
1059
  fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
 
1060
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
 
1061
  {
 
1062
    error=1;
 
1063
    goto err;
 
1064
  }
 
1065
 
 
1066
err:
 
1067
  unlock_slave_threads(mi);
 
1068
  if (error)
 
1069
    my_error(sql_errno, MYF(0), errmsg);
 
1070
  return(error);
 
1071
}
 
1072
 
 
1073
/*
 
1074
 
 
1075
  Kill all Binlog_dump threads which previously talked to the same slave
 
1076
  ("same" means with the same server id). Indeed, if the slave stops, if the
 
1077
  Binlog_dump thread is waiting (pthread_cond_wait) for binlog update, then it
 
1078
  will keep existing until a query is written to the binlog. If the master is
 
1079
  idle, then this could last long, and if the slave reconnects, we could have 2
 
1080
  Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
 
1081
  binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
 
1082
  the master kills any existing thread with the slave's server id (if this id is
 
1083
  not zero; it will be true for real slaves, but false for mysqlbinlog when it
 
1084
  sends COM_BINLOG_DUMP to get a remote binlog dump).
 
1085
 
 
1086
  SYNOPSIS
 
1087
    kill_zombie_dump_threads()
 
1088
    slave_server_id     the slave's server id
 
1089
 
 
1090
*/
 
1091
 
 
1092
 
 
1093
void kill_zombie_dump_threads(uint32_t slave_server_id)
 
1094
{
 
1095
  pthread_mutex_lock(&LOCK_thread_count);
 
1096
  I_List_iterator<THD> it(threads);
 
1097
  THD *tmp;
 
1098
 
 
1099
  while ((tmp=it++))
 
1100
  {
 
1101
    if (tmp->command == COM_BINLOG_DUMP &&
 
1102
       tmp->server_id == slave_server_id)
 
1103
    {
 
1104
      pthread_mutex_lock(&tmp->LOCK_delete);    // Lock from delete
 
1105
      break;
 
1106
    }
 
1107
  }
 
1108
  pthread_mutex_unlock(&LOCK_thread_count);
 
1109
  if (tmp)
 
1110
  {
 
1111
    /*
 
1112
      Here we do not call kill_one_thread() as
 
1113
      it will be slow because it will iterate through the list
 
1114
      again. We just to do kill the thread ourselves.
 
1115
    */
 
1116
    tmp->awake(THD::KILL_QUERY);
 
1117
    pthread_mutex_unlock(&tmp->LOCK_delete);
 
1118
  }
 
1119
}
 
1120
 
 
1121
 
 
1122
bool change_master(THD* thd, Master_info* mi)
 
1123
{
 
1124
  int thread_mask;
 
1125
  const char* errmsg= 0;
 
1126
  bool need_relay_log_purge= 1;
 
1127
 
 
1128
  lock_slave_threads(mi);
 
1129
  init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
 
1130
  if (thread_mask) // We refuse if any slave thread is running
 
1131
  {
 
1132
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
 
1133
    unlock_slave_threads(mi);
 
1134
    return(true);
 
1135
  }
 
1136
 
 
1137
  thd_proc_info(thd, "Changing master");
 
1138
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
 
1139
  // TODO: see if needs re-write
 
1140
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
 
1141
                       thread_mask))
 
1142
  {
 
1143
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
 
1144
    unlock_slave_threads(mi);
 
1145
    return(true);
 
1146
  }
 
1147
 
 
1148
  /*
 
1149
    Data lock not needed since we have already stopped the running threads,
 
1150
    and we have the hold on the run locks which will keep all threads that
 
1151
    could possibly modify the data structures from running
 
1152
  */
 
1153
 
 
1154
  /*
 
1155
    If the user specified host or port without binlog or position,
 
1156
    reset binlog's name to FIRST and position to 4.
 
1157
  */
 
1158
 
 
1159
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
 
1160
  {
 
1161
    mi->master_log_name[0] = 0;
 
1162
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
 
1163
  }
 
1164
 
 
1165
  if (lex_mi->log_file_name)
 
1166
    strmake(mi->master_log_name, lex_mi->log_file_name,
 
1167
            sizeof(mi->master_log_name)-1);
 
1168
  if (lex_mi->pos)
 
1169
  {
 
1170
    mi->master_log_pos= lex_mi->pos;
 
1171
  }
 
1172
 
 
1173
  if (lex_mi->host)
 
1174
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
 
1175
  if (lex_mi->user)
 
1176
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
 
1177
  if (lex_mi->password)
 
1178
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
 
1179
  if (lex_mi->port)
 
1180
    mi->port = lex_mi->port;
 
1181
  if (lex_mi->connect_retry)
 
1182
    mi->connect_retry = lex_mi->connect_retry;
 
1183
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1184
    mi->heartbeat_period = lex_mi->heartbeat_period;
 
1185
  else
 
1186
    mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
 
1187
                                      (slave_net_timeout/2.0));
 
1188
  mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
 
1189
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1190
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
1191
 
 
1192
  if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1193
    mi->ssl_verify_server_cert=
 
1194
      (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
1195
 
 
1196
  if (lex_mi->ssl_ca)
 
1197
    strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
 
1198
  if (lex_mi->ssl_capath)
 
1199
    strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
 
1200
  if (lex_mi->ssl_cert)
 
1201
    strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
 
1202
  if (lex_mi->ssl_cipher)
 
1203
    strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
 
1204
  if (lex_mi->ssl_key)
 
1205
    strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
 
1206
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
 
1207
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
 
1208
      lex_mi->ssl_verify_server_cert )
 
1209
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
1210
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
 
1211
 
 
1212
  if (lex_mi->relay_log_name)
 
1213
  {
 
1214
    need_relay_log_purge= 0;
 
1215
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
 
1216
            sizeof(mi->rli.group_relay_log_name)-1);
 
1217
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
 
1218
            sizeof(mi->rli.event_relay_log_name)-1);
 
1219
  }
 
1220
 
 
1221
  if (lex_mi->relay_log_pos)
 
1222
  {
 
1223
    need_relay_log_purge= 0;
 
1224
    mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
 
1225
  }
 
1226
 
 
1227
  /*
 
1228
    If user did specify neither host nor port nor any log name nor any log
 
1229
    pos, i.e. he specified only user/password/master_connect_retry, he probably
 
1230
    wants replication to resume from where it had left, i.e. from the
 
1231
    coordinates of the **SQL** thread (imagine the case where the I/O is ahead
 
1232
    of the SQL; restarting from the coordinates of the I/O would lose some
 
1233
    events which is probably unwanted when you are just doing minor changes
 
1234
    like changing master_connect_retry).
 
1235
    A side-effect is that if only the I/O thread was started, this thread may
 
1236
    restart from ''/4 after the CHANGE MASTER. That's a minor problem (it is a
 
1237
    much more unlikely situation than the one we are fixing here).
 
1238
    Note: coordinates of the SQL thread must be read here, before the
 
1239
    'if (need_relay_log_purge)' block which resets them.
 
1240
  */
 
1241
  if (!lex_mi->host && !lex_mi->port &&
 
1242
      !lex_mi->log_file_name && !lex_mi->pos &&
 
1243
      need_relay_log_purge)
 
1244
   {
 
1245
     /*
 
1246
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
 
1247
       not initialized), so we use a cmax().
 
1248
       What happens to mi->rli.master_log_pos during the initialization stages
 
1249
       of replication is not 100% clear, so we guard against problems using
 
1250
       cmax().
 
1251
      */
 
1252
     mi->master_log_pos = ((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
 
1253
                           ? BIN_LOG_HEADER_SIZE
 
1254
                           : mi->rli.group_master_log_pos);
 
1255
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
 
1256
             sizeof(mi->master_log_name)-1);
 
1257
  }
 
1258
  /*
 
1259
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
 
1260
    a slave before).
 
1261
  */
 
1262
  if (flush_master_info(mi, 0))
 
1263
  {
 
1264
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
 
1265
    unlock_slave_threads(mi);
 
1266
    return(true);
 
1267
  }
 
1268
  if (need_relay_log_purge)
 
1269
  {
 
1270
    relay_log_purge= 1;
 
1271
    thd_proc_info(thd, "Purging old relay logs");
 
1272
    if (purge_relay_logs(&mi->rli, thd,
 
1273
                         0 /* not only reset, but also reinit */,
 
1274
                         &errmsg))
 
1275
    {
 
1276
      my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
 
1277
      unlock_slave_threads(mi);
 
1278
      return(true);
 
1279
    }
 
1280
  }
 
1281
  else
 
1282
  {
 
1283
    const char* msg;
 
1284
    relay_log_purge= 0;
 
1285
    /* Relay log is already initialized */
 
1286
    if (init_relay_log_pos(&mi->rli,
 
1287
                           mi->rli.group_relay_log_name,
 
1288
                           mi->rli.group_relay_log_pos,
 
1289
                           0 /*no data lock*/,
 
1290
                           &msg, 0))
 
1291
    {
 
1292
      my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
 
1293
      unlock_slave_threads(mi);
 
1294
      return(true);
 
1295
    }
 
1296
  }
 
1297
  /*
 
1298
    Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block,
 
1299
    so restore them to good values. If we left them to ''/0, that would work;
 
1300
    but that would fail in the case of 2 successive CHANGE MASTER (without a
 
1301
    START SLAVE in between): because first one would set the coords in mi to
 
1302
    the good values of those in rli, the set those in rli to ''/0, then
 
1303
    second CHANGE MASTER would set the coords in mi to those of rli, i.e. to
 
1304
    ''/0: we have lost all copies of the original good coordinates.
 
1305
    That's why we always save good coords in rli.
 
1306
  */
 
1307
  mi->rli.group_master_log_pos= mi->master_log_pos;
 
1308
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
 
1309
          sizeof(mi->rli.group_master_log_name)-1);
 
1310
 
 
1311
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
 
1312
    mi->rli.group_master_log_pos=0;
 
1313
 
 
1314
  pthread_mutex_lock(&mi->rli.data_lock);
 
1315
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
 
1316
  /* Clear the errors, for a clean start */
 
1317
  mi->rli.clear_error();
 
1318
  mi->rli.clear_until_condition();
 
1319
  /*
 
1320
    If we don't write new coordinates to disk now, then old will remain in
 
1321
    relay-log.info until START SLAVE is issued; but if mysqld is shutdown
 
1322
    before START SLAVE, then old will remain in relay-log.info, and will be the
 
1323
    in-memory value at restart (thus causing errors, as the old relay log does
 
1324
    not exist anymore).
 
1325
  */
 
1326
  flush_relay_log_info(&mi->rli);
 
1327
  pthread_cond_broadcast(&mi->data_cond);
 
1328
  pthread_mutex_unlock(&mi->rli.data_lock);
 
1329
 
 
1330
  unlock_slave_threads(mi);
 
1331
  thd_proc_info(thd, 0);
 
1332
  my_ok(thd);
 
1333
  return(false);
 
1334
}
 
1335
 
 
1336
int reset_master(THD* thd)
 
1337
{
 
1338
  if (!mysql_bin_log.is_open())
 
1339
  {
 
1340
    my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
 
1341
               ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
 
1342
    return 1;
 
1343
  }
 
1344
  return mysql_bin_log.reset_logs(thd);
 
1345
}
 
1346
 
 
1347
int cmp_master_pos(const char* log_file_name1, uint64_t log_pos1,
 
1348
                   const char* log_file_name2, uint64_t log_pos2)
 
1349
{
 
1350
  int res;
 
1351
  uint32_t log_file_name1_len=  strlen(log_file_name1);
 
1352
  uint32_t log_file_name2_len=  strlen(log_file_name2);
 
1353
 
 
1354
  //  We assume that both log names match up to '.'
 
1355
  if (log_file_name1_len == log_file_name2_len)
 
1356
  {
 
1357
    if ((res= strcmp(log_file_name1, log_file_name2)))
 
1358
      return res;
 
1359
    return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
 
1360
  }
 
1361
  return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
 
1362
}
 
1363
 
 
1364
 
 
1365
bool mysql_show_binlog_events(THD* thd)
 
1366
{
 
1367
  Protocol *protocol= thd->protocol;
 
1368
  List<Item> field_list;
 
1369
  const char *errmsg= 0;
 
1370
  bool ret= true;
 
1371
  IO_CACHE log;
 
1372
  File file= -1;
 
1373
 
 
1374
  Log_event::init_show_field_list(&field_list);
 
1375
  if (protocol->send_fields(&field_list,
 
1376
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1377
    return(true);
 
1378
 
 
1379
  Format_description_log_event *description_event= new
 
1380
    Format_description_log_event(3); /* MySQL 4.0 by default */
 
1381
 
 
1382
  if (mysql_bin_log.is_open())
 
1383
  {
 
1384
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
 
1385
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
 
1386
    ha_rows event_count, limit_start, limit_end;
 
1387
    my_off_t pos = cmax((uint64_t)BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
 
1388
    char search_file_name[FN_REFLEN], *name;
 
1389
    const char *log_file_name = lex_mi->log_file_name;
 
1390
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
 
1391
    LOG_INFO linfo;
 
1392
    Log_event* ev;
 
1393
 
 
1394
    unit->set_limit(thd->lex->current_select);
 
1395
    limit_start= unit->offset_limit_cnt;
 
1396
    limit_end= unit->select_limit_cnt;
 
1397
 
 
1398
    name= search_file_name;
 
1399
    if (log_file_name)
 
1400
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
 
1401
    else
 
1402
      name=0;                                   // Find first log
 
1403
 
 
1404
    linfo.index_file_offset = 0;
 
1405
 
 
1406
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
 
1407
    {
 
1408
      errmsg = "Could not find target log";
 
1409
      goto err;
 
1410
    }
 
1411
 
 
1412
    pthread_mutex_lock(&LOCK_thread_count);
 
1413
    thd->current_linfo = &linfo;
 
1414
    pthread_mutex_unlock(&LOCK_thread_count);
 
1415
 
 
1416
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
 
1417
      goto err;
 
1418
 
 
1419
    /*
 
1420
      to account binlog event header size
 
1421
    */
 
1422
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
 
1423
 
 
1424
    pthread_mutex_lock(log_lock);
 
1425
 
 
1426
    /*
 
1427
      open_binlog() sought to position 4.
 
1428
      Read the first event in case it's a Format_description_log_event, to
 
1429
      know the format. If there's no such event, we are 3.23 or 4.x. This
 
1430
      code, like before, can't read 3.23 binlogs.
 
1431
      This code will fail on a mixed relay log (one which has Format_desc then
 
1432
      Rotate then Format_desc).
 
1433
    */
 
1434
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
 
1435
    if (ev)
 
1436
    {
 
1437
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
 
1438
      {
 
1439
        delete description_event;
 
1440
        description_event= (Format_description_log_event*) ev;
 
1441
      }
 
1442
      else
 
1443
        delete ev;
 
1444
    }
 
1445
 
 
1446
    my_b_seek(&log, pos);
 
1447
 
 
1448
    if (!description_event->is_valid())
 
1449
    {
 
1450
      errmsg="Invalid Format_description event; could be out of memory";
 
1451
      goto err;
 
1452
    }
 
1453
 
 
1454
    for (event_count = 0;
 
1455
         (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
 
1456
                                         description_event)); )
 
1457
    {
 
1458
      if (event_count >= limit_start &&
 
1459
          ev->net_send(protocol, linfo.log_file_name, pos))
 
1460
      {
 
1461
        errmsg = "Net error";
 
1462
        delete ev;
 
1463
        pthread_mutex_unlock(log_lock);
 
1464
        goto err;
 
1465
      }
 
1466
 
 
1467
      pos = my_b_tell(&log);
 
1468
      delete ev;
 
1469
 
 
1470
      if (++event_count >= limit_end)
 
1471
        break;
 
1472
    }
 
1473
 
 
1474
    if (event_count < limit_end && log.error)
 
1475
    {
 
1476
      errmsg = "Wrong offset or I/O error";
 
1477
      pthread_mutex_unlock(log_lock);
 
1478
      goto err;
 
1479
    }
 
1480
 
 
1481
    pthread_mutex_unlock(log_lock);
 
1482
  }
 
1483
 
 
1484
  ret= false;
 
1485
 
 
1486
err:
 
1487
  delete description_event;
 
1488
  if (file >= 0)
 
1489
  {
 
1490
    end_io_cache(&log);
 
1491
    (void) my_close(file, MYF(MY_WME));
 
1492
  }
 
1493
 
 
1494
  if (errmsg)
 
1495
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
 
1496
             "SHOW BINLOG EVENTS", errmsg);
 
1497
  else
 
1498
    my_eof(thd);
 
1499
 
 
1500
  pthread_mutex_lock(&LOCK_thread_count);
 
1501
  thd->current_linfo = 0;
 
1502
  pthread_mutex_unlock(&LOCK_thread_count);
 
1503
  return(ret);
 
1504
}
 
1505
 
 
1506
 
 
1507
bool show_binlog_info(THD* thd)
 
1508
{
 
1509
  Protocol *protocol= thd->protocol;
 
1510
  List<Item> field_list;
 
1511
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
 
1512
  field_list.push_back(new Item_return_int("Position",20,
 
1513
                                           DRIZZLE_TYPE_LONGLONG));
 
1514
  field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
 
1515
  field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
 
1516
 
 
1517
  if (protocol->send_fields(&field_list,
 
1518
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1519
    return(true);
 
1520
  protocol->prepare_for_resend();
 
1521
 
 
1522
  if (mysql_bin_log.is_open())
 
1523
  {
 
1524
    LOG_INFO li;
 
1525
    mysql_bin_log.get_current_log(&li);
 
1526
    int dir_len = dirname_length(li.log_file_name);
 
1527
    protocol->store(li.log_file_name + dir_len, &my_charset_bin);
 
1528
    protocol->store((uint64_t) li.pos);
 
1529
    protocol->store(binlog_filter->get_do_db());
 
1530
    protocol->store(binlog_filter->get_ignore_db());
 
1531
    if (protocol->write())
 
1532
      return(true);
 
1533
  }
 
1534
  my_eof(thd);
 
1535
  return(false);
 
1536
}
 
1537
 
 
1538
 
 
1539
/*
 
1540
  Send a list of all binary logs to client
 
1541
 
 
1542
  SYNOPSIS
 
1543
    show_binlogs()
 
1544
    thd         Thread specific variable
 
1545
 
 
1546
  RETURN VALUES
 
1547
    false OK
 
1548
    true  error
 
1549
*/
 
1550
 
 
1551
bool show_binlogs(THD* thd)
 
1552
{
 
1553
  IO_CACHE *index_file;
 
1554
  LOG_INFO cur;
 
1555
  File file;
 
1556
  char fname[FN_REFLEN];
 
1557
  List<Item> field_list;
 
1558
  uint32_t length;
 
1559
  int cur_dir_len;
 
1560
  Protocol *protocol= thd->protocol;
 
1561
 
 
1562
  if (!mysql_bin_log.is_open())
 
1563
  {
 
1564
    my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0));
 
1565
    return 1;
 
1566
  }
 
1567
 
 
1568
  field_list.push_back(new Item_empty_string("Log_name", 255));
 
1569
  field_list.push_back(new Item_return_int("File_size", 20,
 
1570
                                           DRIZZLE_TYPE_LONGLONG));
 
1571
  if (protocol->send_fields(&field_list,
 
1572
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1573
    return(true);
 
1574
  
 
1575
  pthread_mutex_lock(mysql_bin_log.get_log_lock());
 
1576
  mysql_bin_log.lock_index();
 
1577
  index_file=mysql_bin_log.get_index_file();
 
1578
  
 
1579
  mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
 
1580
  pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
 
1581
  
 
1582
  cur_dir_len= dirname_length(cur.log_file_name);
 
1583
 
 
1584
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
 
1585
 
 
1586
  /* The file ends with EOF or empty line */
 
1587
  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
 
1588
  {
 
1589
    int dir_len;
 
1590
    uint64_t file_length= 0;                   // Length if open fails
 
1591
    fname[--length] = '\0';                     // remove the newline
 
1592
 
 
1593
    protocol->prepare_for_resend();
 
1594
    dir_len= dirname_length(fname);
 
1595
    length-= dir_len;
 
1596
    protocol->store(fname + dir_len, length, &my_charset_bin);
 
1597
 
 
1598
    if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
 
1599
      file_length= cur.pos;  /* The active log, use the active position */
 
1600
    else
 
1601
    {
 
1602
      /* this is an old log, open it and find the size */
 
1603
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
 
1604
                         MYF(0))) >= 0)
 
1605
      {
 
1606
        file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));
 
1607
        my_close(file, MYF(0));
 
1608
      }
 
1609
    }
 
1610
    protocol->store(file_length);
 
1611
    if (protocol->write())
 
1612
      goto err;
 
1613
  }
 
1614
  mysql_bin_log.unlock_index();
 
1615
  my_eof(thd);
 
1616
  return(false);
 
1617
 
 
1618
err:
 
1619
  mysql_bin_log.unlock_index();
 
1620
  return(true);
 
1621
}
 
1622
 
 
1623
/**
 
1624
   Load data's io cache specific hook to be executed
 
1625
   before a chunk of data is being read into the cache's buffer
 
1626
   The fuction instantianates and writes into the binlog
 
1627
   replication events along LOAD DATA processing.
 
1628
   
 
1629
   @param file  pointer to io-cache
 
1630
   @return 0
 
1631
*/
 
1632
int log_loaded_block(IO_CACHE* file)
 
1633
{
 
1634
  LOAD_FILE_INFO *lf_info;
 
1635
  uint32_t block_len;
 
1636
  /* buffer contains position where we started last read */
 
1637
  unsigned char* buffer= (unsigned char*) my_b_get_buffer_start(file);
 
1638
  uint32_t max_event_size= current_thd->variables.max_allowed_packet;
 
1639
  lf_info= (LOAD_FILE_INFO*) file->arg;
 
1640
  if (lf_info->thd->current_stmt_binlog_row_based)
 
1641
    return(0);
 
1642
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
 
1643
      lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
 
1644
    return(0);
 
1645
  
 
1646
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
 
1647
       buffer += cmin(block_len, max_event_size),
 
1648
       block_len -= cmin(block_len, max_event_size))
 
1649
  {
 
1650
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
 
1651
    if (lf_info->wrote_create_file)
 
1652
    {
 
1653
      Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
 
1654
                               cmin(block_len, max_event_size),
 
1655
                               lf_info->log_delayed);
 
1656
      mysql_bin_log.write(&a);
 
1657
    }
 
1658
    else
 
1659
    {
 
1660
      Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
 
1661
                                   buffer,
 
1662
                                   cmin(block_len, max_event_size),
 
1663
                                   lf_info->log_delayed);
 
1664
      mysql_bin_log.write(&b);
 
1665
      lf_info->wrote_create_file= 1;
 
1666
    }
 
1667
  }
 
1668
  return(0);
 
1669
}
 
1670
 
 
1671
/*
 
1672
  Replication System Variables
 
1673
*/
 
1674
 
 
1675
class sys_var_slave_skip_counter :public sys_var
 
1676
{
 
1677
public:
 
1678
  sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
 
1679
    :sys_var(name_arg)
 
1680
  { chain_sys_var(chain); }
 
1681
  bool check(THD *thd, set_var *var);
 
1682
  bool update(THD *thd, set_var *var);
 
1683
  bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
 
1684
  /*
 
1685
    We can't retrieve the value of this, so we don't have to define
 
1686
    type() or value_ptr()
 
1687
  */
 
1688
};
 
1689
 
 
1690
class sys_var_sync_binlog_period :public sys_var_long_ptr
 
1691
{
 
1692
public:
 
1693
  sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
 
1694
                             ulong *value_ptr)
 
1695
    :sys_var_long_ptr(chain, name_arg,value_ptr) {}
 
1696
  bool update(THD *thd, set_var *var);
 
1697
};
 
1698
 
 
1699
static void fix_slave_net_timeout(THD *thd,
 
1700
                                  enum_var_type type __attribute__((unused)))
 
1701
{
 
1702
  pthread_mutex_lock(&LOCK_active_mi);
 
1703
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
 
1704
    push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1705
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
 
1706
                        "The currect value for master_heartbeat_period"
 
1707
                        " exceeds the new value of `slave_net_timeout' sec."
 
1708
                        " A sensible value for the period should be"
 
1709
                        " less than the timeout.");
 
1710
  pthread_mutex_unlock(&LOCK_active_mi);
 
1711
  return;
 
1712
}
 
1713
 
 
1714
static sys_var_chain vars = { NULL, NULL };
 
1715
 
 
1716
static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
 
1717
                                            &relay_log_purge);
 
1718
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
 
1719
                                              &slave_net_timeout,
 
1720
                                              fix_slave_net_timeout);
 
1721
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
 
1722
                                                &slave_trans_retries);
 
1723
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
 
1724
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
 
1725
 
 
1726
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
 
1727
 
 
1728
 
 
1729
static int show_slave_skip_errors(THD *thd __attribute__((unused)),
 
1730
                                  SHOW_VAR *var, char *buff)
 
1731
{
 
1732
  var->type=SHOW_CHAR;
 
1733
  var->value= buff;
 
1734
  if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
 
1735
  {
 
1736
    var->value= const_cast<char *>("OFF");
 
1737
  }
 
1738
  else if (bitmap_is_set_all(&slave_error_mask))
 
1739
  {
 
1740
    var->value= const_cast<char *>("ALL");
 
1741
  }
 
1742
  else
 
1743
  {
 
1744
    /* 10 is enough assuming errors are max 4 digits */
 
1745
    int i;
 
1746
    var->value= buff;
 
1747
    for (i= 1;
 
1748
         i < MAX_SLAVE_ERROR &&
 
1749
         (buff - var->value) < SHOW_VAR_FUNC_BUFF_SIZE;
 
1750
         i++)
 
1751
    {
 
1752
      if (bitmap_is_set(&slave_error_mask, i))
 
1753
      {
 
1754
        buff= int10_to_str(i, buff, 10);
 
1755
        *buff++= ',';
 
1756
      }
 
1757
    }
 
1758
    if (var->value != buff)
 
1759
      buff--;                           // Remove last ','
 
1760
    if (i < MAX_SLAVE_ERROR)
 
1761
      buff= my_stpcpy(buff, "...");  // Couldn't show all errors
 
1762
    *buff=0;
 
1763
  }
 
1764
  return 0;
 
1765
}
 
1766
 
 
1767
static st_show_var_func_container
 
1768
show_slave_skip_errors_cont = { &show_slave_skip_errors };
 
1769
 
 
1770
 
 
1771
static SHOW_VAR fixed_vars[]= {
 
1772
  {"log_slave_updates",       (char*) &opt_log_slave_updates,       SHOW_MY_BOOL},
 
1773
  {"relay_log" , (char*) &opt_relay_logname, SHOW_CHAR_PTR},
 
1774
  {"relay_log_index", (char*) &opt_relaylog_index_name, SHOW_CHAR_PTR},
 
1775
  {"relay_log_info_file", (char*) &relay_log_info_file, SHOW_CHAR_PTR},
 
1776
  {"relay_log_space_limit",   (char*) &relay_log_space_limit,       SHOW_LONGLONG},
 
1777
  {"slave_load_tmpdir",       (char*) &slave_load_tmpdir,           SHOW_CHAR_PTR},
 
1778
  {"slave_skip_errors",       (char*) &show_slave_skip_errors_cont,      SHOW_FUNC},
 
1779
};
 
1780
 
 
1781
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((unused)),
 
1782
                                       set_var *var)
 
1783
{
 
1784
  int result= 0;
 
1785
  pthread_mutex_lock(&LOCK_active_mi);
 
1786
  pthread_mutex_lock(&active_mi->rli.run_lock);
 
1787
  if (active_mi->rli.slave_running)
 
1788
  {
 
1789
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
 
1790
    result=1;
 
1791
  }
 
1792
  pthread_mutex_unlock(&active_mi->rli.run_lock);
 
1793
  pthread_mutex_unlock(&LOCK_active_mi);
 
1794
  var->save_result.ulong_value= (ulong) var->value->val_int();
 
1795
  return result;
 
1796
}
 
1797
 
 
1798
 
 
1799
bool sys_var_slave_skip_counter::update(THD *thd __attribute__((unused)),
 
1800
                                        set_var *var)
 
1801
{
 
1802
  pthread_mutex_lock(&LOCK_active_mi);
 
1803
  pthread_mutex_lock(&active_mi->rli.run_lock);
 
1804
  /*
 
1805
    The following test should normally never be true as we test this
 
1806
    in the check function;  To be safe against multiple
 
1807
    SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
 
1808
  */
 
1809
  if (!active_mi->rli.slave_running)
 
1810
  {
 
1811
    pthread_mutex_lock(&active_mi->rli.data_lock);
 
1812
    active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
 
1813
    pthread_mutex_unlock(&active_mi->rli.data_lock);
 
1814
  }
 
1815
  pthread_mutex_unlock(&active_mi->rli.run_lock);
 
1816
  pthread_mutex_unlock(&LOCK_active_mi);
 
1817
  return 0;
 
1818
}
 
1819
 
 
1820
 
 
1821
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((unused)),
 
1822
                                        set_var *var)
 
1823
{
 
1824
  sync_binlog_period= (uint32_t) var->save_result.uint64_t_value;
 
1825
  return 0;
 
1826
}
 
1827
 
 
1828
int init_replication_sys_vars()
 
1829
{
 
1830
  mysql_append_static_vars(fixed_vars, sizeof(fixed_vars) / sizeof(SHOW_VAR));
 
1831
 
 
1832
  if (mysql_add_sys_var_chain(vars.first, my_long_options))
 
1833
  {
 
1834
    /* should not happen */
 
1835
    fprintf(stderr, "failed to initialize replication system variables");
 
1836
    unireg_abort(1);
 
1837
  }
 
1838
  return 0;
 
1839
}