~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_repl.cc

  • Committer: Brian Aker
  • Date: 2010-01-27 18:58:12 UTC
  • Revision ID: brian@gaz-20100127185812-n62n0vwetnx8jrjy
Remove dead code.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2000-2006 MySQL AB & Sasha
2
 
 
3
 
   This program is free software; you can redistribute it and/or modify
4
 
   it under the terms of the GNU General Public License as published by
5
 
   the Free Software Foundation; version 2 of the License.
6
 
 
7
 
   This program is distributed in the hope that it will be useful,
8
 
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 
   GNU General Public License for more details.
11
 
 
12
 
   You should have received a copy of the GNU General Public License
13
 
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
 
 
16
 
#include <drizzled/server_includes.h>
17
 
 
18
 
#ifdef HAVE_REPLICATION
19
 
#include "rpl_mi.h"
20
 
#include "sql_repl.h"
21
 
#include "log_event.h"
22
 
#include "rpl_filter.h"
23
 
#include <drizzled/drizzled_error_messages.h>
24
 
 
25
 
int max_binlog_dump_events = 0; // unlimited
26
 
 
27
 
/*
28
 
    fake_rotate_event() builds a fake (=which does not exist physically in any
29
 
    binlog) Rotate event, which contains the name of the binlog we are going to
30
 
    send to the slave (because the slave may not know it if it just asked for
31
 
    MASTER_LOG_FILE='', MASTER_LOG_POS=4).
32
 
    < 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
33
 
    After this version we always call it, so that a 3.23.58 slave can rely on
34
 
    it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
35
 
    zeros in the good positions which, by chance, make it possible for the 3.23
36
 
    slave to detect that this event is unexpected) (this is luck which happens
37
 
    because the master and slave disagree on the size of the header of
38
 
    Log_event).
39
 
 
40
 
    Relying on the event length of the Rotate event instead of these
41
 
    well-placed zeros was not possible as Rotate events have a variable-length
42
 
    part.
43
 
*/
44
 
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
45
 
                             uint64_t position, const char** errmsg)
46
 
{
47
 
  char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
48
 
  /*
49
 
    'when' (the timestamp) is set to 0 so that slave could distinguish between
50
 
    real and fake Rotate events (if necessary)
51
 
  */
52
 
  memset(header, 0, 4);
53
 
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
54
 
 
55
 
  char* p = log_file_name+dirname_length(log_file_name);
56
 
  uint ident_len = (uint) strlen(p);
57
 
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
58
 
  int4store(header + SERVER_ID_OFFSET, server_id);
59
 
  int4store(header + EVENT_LEN_OFFSET, event_len);
60
 
  int2store(header + FLAGS_OFFSET, 0);
61
 
 
62
 
  // TODO: check what problems this may cause and fix them
63
 
  int4store(header + LOG_POS_OFFSET, 0);
64
 
 
65
 
  packet->append(header, sizeof(header));
66
 
  int8store(buf+R_POS_OFFSET,position);
67
 
  packet->append(buf, ROTATE_HEADER_LEN);
68
 
  packet->append(p,ident_len);
69
 
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
70
 
  {
71
 
    *errmsg = "failed on my_net_write()";
72
 
    return(-1);
73
 
  }
74
 
  return(0);
75
 
}
76
 
 
77
 
static int send_file(THD *thd)
78
 
{
79
 
  NET* net = &thd->net;
80
 
  int fd = -1, error = 1;
81
 
  size_t bytes;
82
 
  char fname[FN_REFLEN+1];
83
 
  const char *errmsg = 0;
84
 
  int old_timeout;
85
 
  unsigned long packet_len;
86
 
  uchar buf[IO_SIZE];                           // It's safe to alloc this
87
 
 
88
 
  /*
89
 
    The client might be slow loading the data, give him wait_timeout to do
90
 
    the job
91
 
  */
92
 
  old_timeout= net->read_timeout;
93
 
  my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
94
 
 
95
 
  /*
96
 
    We need net_flush here because the client will not know it needs to send
97
 
    us the file name until it has processed the load event entry
98
 
  */
99
 
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
100
 
  {
101
 
    errmsg = _("Failed in send_file() while reading file name");
102
 
    goto err;
103
 
  }
104
 
 
105
 
  // terminate with \0 for fn_format
106
 
  *((char*)net->read_pos +  packet_len) = 0;
107
 
  fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
108
 
  // this is needed to make replicate-ignore-db
109
 
  if (!strcmp(fname,"/dev/null"))
110
 
    goto end;
111
 
 
112
 
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
113
 
  {
114
 
    errmsg = _("Failed in send_file() on open of file");
115
 
    goto err;
116
 
  }
117
 
 
118
 
  while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0)
119
 
  {
120
 
    if (my_net_write(net, buf, bytes))
121
 
    {
122
 
      errmsg = _("Failed in send_file() while writing data to client");
123
 
      goto err;
124
 
    }
125
 
  }
126
 
 
127
 
 end:
128
 
  if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
129
 
      (my_net_read(net) == packet_error))
130
 
  {
131
 
    errmsg = _("Failed in send_file() while negotiating file transfer close");
132
 
    goto err;
133
 
  }
134
 
  error = 0;
135
 
 
136
 
 err:
137
 
  my_net_set_read_timeout(net, old_timeout);
138
 
  if (fd >= 0)
139
 
    (void) my_close(fd, MYF(0));
140
 
  if (errmsg)
141
 
  {
142
 
    sql_print_error(errmsg);
143
 
  }
144
 
  return(error);
145
 
}
146
 
 
147
 
 
148
 
/*
149
 
  Adjust the position pointer in the binary log file for all running slaves
150
 
 
151
 
  SYNOPSIS
152
 
    adjust_linfo_offsets()
153
 
    purge_offset        Number of bytes removed from start of log index file
154
 
 
155
 
  NOTES
156
 
    - This is called when doing a PURGE when we delete lines from the
157
 
      index log file
158
 
 
159
 
  REQUIREMENTS
160
 
    - Before calling this function, we have to ensure that no threads are
161
 
      using any binary log file before purge_offset.a
162
 
 
163
 
  TODO
164
 
    - Inform the slave threads that they should sync the position
165
 
      in the binary log file with flush_relay_log_info.
166
 
      Now they sync is done for next read.
167
 
*/
168
 
 
169
 
void adjust_linfo_offsets(my_off_t purge_offset)
170
 
{
171
 
  THD *tmp;
172
 
 
173
 
  pthread_mutex_lock(&LOCK_thread_count);
174
 
  I_List_iterator<THD> it(threads);
175
 
 
176
 
  while ((tmp=it++))
177
 
  {
178
 
    LOG_INFO* linfo;
179
 
    if ((linfo = tmp->current_linfo))
180
 
    {
181
 
      pthread_mutex_lock(&linfo->lock);
182
 
      /*
183
 
        Index file offset can be less that purge offset only if
184
 
        we just started reading the index file. In that case
185
 
        we have nothing to adjust
186
 
      */
187
 
      if (linfo->index_file_offset < purge_offset)
188
 
        linfo->fatal = (linfo->index_file_offset != 0);
189
 
      else
190
 
        linfo->index_file_offset -= purge_offset;
191
 
      pthread_mutex_unlock(&linfo->lock);
192
 
    }
193
 
  }
194
 
  pthread_mutex_unlock(&LOCK_thread_count);
195
 
}
196
 
 
197
 
 
198
 
bool log_in_use(const char* log_name)
199
 
{
200
 
  int log_name_len = strlen(log_name) + 1;
201
 
  THD *tmp;
202
 
  bool result = 0;
203
 
 
204
 
  pthread_mutex_lock(&LOCK_thread_count);
205
 
  I_List_iterator<THD> it(threads);
206
 
 
207
 
  while ((tmp=it++))
208
 
  {
209
 
    LOG_INFO* linfo;
210
 
    if ((linfo = tmp->current_linfo))
211
 
    {
212
 
      pthread_mutex_lock(&linfo->lock);
213
 
      result = !memcmp(log_name, linfo->log_file_name, log_name_len);
214
 
      pthread_mutex_unlock(&linfo->lock);
215
 
      if (result)
216
 
        break;
217
 
    }
218
 
  }
219
 
 
220
 
  pthread_mutex_unlock(&LOCK_thread_count);
221
 
  return result;
222
 
}
223
 
 
224
 
bool purge_error_message(THD* thd, int res)
225
 
{
226
 
  uint errmsg= 0;
227
 
 
228
 
  switch (res)  {
229
 
  case 0: break;
230
 
  case LOG_INFO_EOF:    errmsg= ER_UNKNOWN_TARGET_BINLOG; break;
231
 
  case LOG_INFO_IO:     errmsg= ER_IO_ERR_LOG_INDEX_READ; break;
232
 
  case LOG_INFO_INVALID:errmsg= ER_BINLOG_PURGE_PROHIBITED; break;
233
 
  case LOG_INFO_SEEK:   errmsg= ER_FSEEK_FAIL; break;
234
 
  case LOG_INFO_MEM:    errmsg= ER_OUT_OF_RESOURCES; break;
235
 
  case LOG_INFO_FATAL:  errmsg= ER_BINLOG_PURGE_FATAL_ERR; break;
236
 
  case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break;
237
 
  case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break;
238
 
  default:              errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break;
239
 
  }
240
 
 
241
 
  if (errmsg)
242
 
  {
243
 
    my_message(errmsg, ER(errmsg), MYF(0));
244
 
    return true;
245
 
  }
246
 
  my_ok(thd);
247
 
  return false;
248
 
}
249
 
 
250
 
 
251
 
bool purge_master_logs(THD* thd, const char* to_log)
252
 
{
253
 
  char search_file_name[FN_REFLEN];
254
 
  if (!mysql_bin_log.is_open())
255
 
  {
256
 
    my_ok(thd);
257
 
    return false;
258
 
  }
259
 
 
260
 
  mysql_bin_log.make_log_name(search_file_name, to_log);
261
 
  return purge_error_message(thd,
262
 
                             mysql_bin_log.purge_logs(search_file_name, 0, 1,
263
 
                                                      1, NULL));
264
 
}
265
 
 
266
 
 
267
 
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
268
 
{
269
 
  if (!mysql_bin_log.is_open())
270
 
  {
271
 
    my_ok(thd);
272
 
    return 0;
273
 
  }
274
 
  return purge_error_message(thd,
275
 
                             mysql_bin_log.purge_logs_before_date(purge_time));
276
 
}
277
 
 
278
 
int test_for_non_eof_log_read_errors(int error, const char **errmsg)
279
 
{
280
 
  if (error == LOG_READ_EOF)
281
 
    return 0;
282
 
  my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
283
 
  switch (error) {
284
 
  case LOG_READ_BOGUS:
285
 
    *errmsg = "bogus data in log event";
286
 
    break;
287
 
  case LOG_READ_TOO_LARGE:
288
 
    *errmsg = "log event entry exceeded max_allowed_packet; \
289
 
Increase max_allowed_packet on master";
290
 
    break;
291
 
  case LOG_READ_IO:
292
 
    *errmsg = "I/O error reading log event";
293
 
    break;
294
 
  case LOG_READ_MEM:
295
 
    *errmsg = "memory allocation failed reading log event";
296
 
    break;
297
 
  case LOG_READ_TRUNC:
298
 
    *errmsg = "binlog truncated in the middle of event";
299
 
    break;
300
 
  default:
301
 
    *errmsg = "unknown error reading log event on the master";
302
 
    break;
303
 
  }
304
 
  return error;
305
 
}
306
 
 
307
 
 
308
 
/**
309
 
  An auxiliary function for calling in mysql_binlog_send
310
 
  to initialize the heartbeat timeout in waiting for a binlogged event.
311
 
 
312
 
  @param[in]    thd  THD to access a user variable
313
 
 
314
 
  @return        heartbeat period an uint64_t of nanoseconds
315
 
                 or zero if heartbeat was not demanded by slave
316
 
*/ 
317
 
static uint64_t get_heartbeat_period(THD * thd)
318
 
{
319
 
  bool null_value;
320
 
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
321
 
  user_var_entry *entry= 
322
 
    (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
323
 
                                  name.length);
324
 
  return entry? entry->val_int(&null_value) : 0;
325
 
}
326
 
 
327
 
/*
328
 
  Function prepares and sends repliation heartbeat event.
329
 
 
330
 
  @param net                net object of THD
331
 
  @param packet             buffer to store the heartbeat instance
332
 
  @param event_coordinates  binlog file name and position of the last
333
 
                            real event master sent from binlog
334
 
 
335
 
  @note 
336
 
    Among three essential pieces of heartbeat data Log_event::when
337
 
    is computed locally.
338
 
    The  error to send is serious and should force terminating
339
 
    the dump thread.
340
 
*/
341
 
static int send_heartbeat_event(NET* net, String* packet,
342
 
                                const struct event_coordinates *coord)
343
 
{
344
 
  char header[LOG_EVENT_HEADER_LEN];
345
 
  /*
346
 
    'when' (the timestamp) is set to 0 so that slave could distinguish between
347
 
    real and fake Rotate events (if necessary)
348
 
  */
349
 
  memset(header, 0, 4);  // when
350
 
 
351
 
  header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
352
 
 
353
 
  char* p= coord->file_name + dirname_length(coord->file_name);
354
 
 
355
 
  uint ident_len = strlen(p);
356
 
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN;
357
 
  int4store(header + SERVER_ID_OFFSET, server_id);
358
 
  int4store(header + EVENT_LEN_OFFSET, event_len);
359
 
  int2store(header + FLAGS_OFFSET, 0);
360
 
 
361
 
  int4store(header + LOG_POS_OFFSET, coord->pos);  // log_pos
362
 
 
363
 
  packet->append(header, sizeof(header));
364
 
  packet->append(p, ident_len);             // log_file_name
365
 
 
366
 
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
367
 
      net_flush(net))
368
 
  {
369
 
    return(-1);
370
 
  }
371
 
  packet->set("\0", 1, &my_charset_bin);
372
 
  return(0);
373
 
}
374
 
 
375
 
/*
376
 
  TODO: Clean up loop to only have one call to send_file()
377
 
*/
378
 
 
379
 
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
380
 
                       ushort flags)
381
 
{
382
 
  LOG_INFO linfo;
383
 
  char *log_file_name = linfo.log_file_name;
384
 
  char search_file_name[FN_REFLEN], *name;
385
 
  IO_CACHE log;
386
 
  File file = -1;
387
 
  String* packet = &thd->packet;
388
 
  int error;
389
 
  const char *errmsg = "Unknown error";
390
 
  NET* net = &thd->net;
391
 
  pthread_mutex_t *log_lock;
392
 
  bool binlog_can_be_corrupted= false;
393
 
 
394
 
  memset(&log, 0, sizeof(log));
395
 
  /* 
396
 
     heartbeat_period from @master_heartbeat_period user variable
397
 
  */
398
 
  uint64_t heartbeat_period= get_heartbeat_period(thd);
399
 
  struct timespec heartbeat_buf;
400
 
  struct event_coordinates coord_buf;
401
 
  struct timespec *heartbeat_ts= NULL;
402
 
  struct event_coordinates *coord= NULL;
403
 
  if (heartbeat_period != 0LL)
404
 
  {
405
 
    heartbeat_ts= &heartbeat_buf;
406
 
    set_timespec_nsec(*heartbeat_ts, 0);
407
 
    coord= &coord_buf;
408
 
    coord->file_name= log_file_name; // initialization basing on what slave remembers
409
 
    coord->pos= pos;
410
 
  }
411
 
 
412
 
  if (!mysql_bin_log.is_open())
413
 
  {
414
 
    errmsg = "Binary log is not open";
415
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
416
 
    goto err;
417
 
  }
418
 
  if (!server_id_supplied)
419
 
  {
420
 
    errmsg = "Misconfigured master - server id was not set";
421
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
422
 
    goto err;
423
 
  }
424
 
 
425
 
  name=search_file_name;
426
 
  if (log_ident[0])
427
 
    mysql_bin_log.make_log_name(search_file_name, log_ident);
428
 
  else
429
 
    name=0;                                     // Find first log
430
 
 
431
 
  linfo.index_file_offset = 0;
432
 
 
433
 
  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
434
 
  {
435
 
    errmsg = "Could not find first log file name in binary log index file";
436
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
437
 
    goto err;
438
 
  }
439
 
 
440
 
  pthread_mutex_lock(&LOCK_thread_count);
441
 
  thd->current_linfo = &linfo;
442
 
  pthread_mutex_unlock(&LOCK_thread_count);
443
 
 
444
 
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
445
 
  {
446
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
447
 
    goto err;
448
 
  }
449
 
  if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
450
 
  {
451
 
    errmsg= "Client requested master to start replication from \
452
 
impossible position";
453
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
454
 
    goto err;
455
 
  }
456
 
 
457
 
  /*
458
 
    We need to start a packet with something other than 255
459
 
    to distinguish it from error
460
 
  */
461
 
  packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
462
 
 
463
 
  /*
464
 
    Tell the client about the log name with a fake Rotate event;
465
 
    this is needed even if we also send a Format_description_log_event
466
 
    just after, because that event does not contain the binlog's name.
467
 
    Note that as this Rotate event is sent before
468
 
    Format_description_log_event, the slave cannot have any info to
469
 
    understand this event's format, so the header len of
470
 
    Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
471
 
    than other events except FORMAT_DESCRIPTION_EVENT).
472
 
    Before 4.0.14 we called fake_rotate_event below only if (pos ==
473
 
    BIN_LOG_HEADER_SIZE), because if this is false then the slave
474
 
    already knows the binlog's name.
475
 
    Since, we always call fake_rotate_event; if the slave already knew
476
 
    the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
477
 
    useless but does not harm much. It is nice for 3.23 (>=.58) slaves
478
 
    which test Rotate events to see if the master is 4.0 (then they
479
 
    choose to stop because they can't replicate 4.0); by always calling
480
 
    fake_rotate_event we are sure that 3.23.58 and newer will detect the
481
 
    problem as soon as replication starts (BUG#198).
482
 
    Always calling fake_rotate_event makes sending of normal
483
 
    (=from-binlog) Rotate events a priori unneeded, but it is not so
484
 
    simple: the 2 Rotate events are not equivalent, the normal one is
485
 
    before the Stop event, the fake one is after. If we don't send the
486
 
    normal one, then the Stop event will be interpreted (by existing 4.0
487
 
    slaves) as "the master stopped", which is wrong. So for safety,
488
 
    given that we want minimum modification of 4.0, we send the normal
489
 
    and fake Rotates.
490
 
  */
491
 
  if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
492
 
  {
493
 
    /*
494
 
       This error code is not perfect, as fake_rotate_event() does not
495
 
       read anything from the binlog; if it fails it's because of an
496
 
       error in my_net_write(), fortunately it will say so in errmsg.
497
 
    */
498
 
    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
499
 
    goto err;
500
 
  }
501
 
  packet->set("\0", 1, &my_charset_bin);
502
 
  /*
503
 
    Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
504
 
    this larger than the corresponding packet (query) sent 
505
 
    from client to master.
506
 
  */
507
 
  thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
508
 
 
509
 
  /*
510
 
    We can set log_lock now, it does not move (it's a member of
511
 
    mysql_bin_log, and it's already inited, and it will be destroyed
512
 
    only at shutdown).
513
 
  */
514
 
  log_lock = mysql_bin_log.get_log_lock();
515
 
  if (pos > BIN_LOG_HEADER_SIZE)
516
 
  {
517
 
     /*
518
 
       Try to find a Format_description_log_event at the beginning of
519
 
       the binlog
520
 
     */
521
 
     if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
522
 
     {
523
 
       /*
524
 
         The packet has offsets equal to the normal offsets in a binlog
525
 
         event +1 (the first character is \0).
526
 
       */
527
 
       if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
528
 
       {
529
 
         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
530
 
                                       LOG_EVENT_BINLOG_IN_USE_F);
531
 
         (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
532
 
         /*
533
 
           mark that this event with "log_pos=0", so the slave
534
 
           should not increment master's binlog position
535
 
           (rli->group_master_log_pos)
536
 
         */
537
 
         int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
538
 
         /*
539
 
           if reconnect master sends FD event with `created' as 0
540
 
           to avoid destroying temp tables.
541
 
          */
542
 
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
543
 
                   ST_CREATED_OFFSET+1, (uint32_t) 0);
544
 
         /* send it */
545
 
         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
546
 
         {
547
 
           errmsg = "Failed on my_net_write()";
548
 
           my_errno= ER_UNKNOWN_ERROR;
549
 
           goto err;
550
 
         }
551
 
 
552
 
         /*
553
 
           No need to save this event. We are only doing simple reads
554
 
           (no real parsing of the events) so we don't need it. And so
555
 
           we don't need the artificial Format_description_log_event of
556
 
           3.23&4.x.
557
 
         */
558
 
       }
559
 
     }
560
 
     else
561
 
     {
562
 
       if (test_for_non_eof_log_read_errors(error, &errmsg))
563
 
         goto err;
564
 
       /*
565
 
         It's EOF, nothing to do, go on reading next events, the
566
 
         Format_description_log_event will be found naturally if it is written.
567
 
       */
568
 
     }
569
 
     /* reset the packet as we wrote to it in any case */
570
 
     packet->set("\0", 1, &my_charset_bin);
571
 
  } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
572
 
  else
573
 
  {
574
 
    /* The Format_description_log_event event will be found naturally. */
575
 
  }
576
 
 
577
 
  /* seek to the requested position, to start the requested dump */
578
 
  my_b_seek(&log, pos);                 // Seek will done on next read
579
 
 
580
 
  while (!net->error && net->vio != 0 && !thd->killed)
581
 
  {
582
 
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
583
 
    {
584
 
      /*
585
 
        log's filename does not change while it's active
586
 
      */
587
 
      if (coord)
588
 
        coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
589
 
 
590
 
      if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
591
 
      {
592
 
        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
593
 
                                      LOG_EVENT_BINLOG_IN_USE_F);
594
 
        (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
595
 
      }
596
 
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
597
 
        binlog_can_be_corrupted= false;
598
 
 
599
 
      if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
600
 
      {
601
 
        errmsg = "Failed on my_net_write()";
602
 
        my_errno= ER_UNKNOWN_ERROR;
603
 
        goto err;
604
 
      }
605
 
 
606
 
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
607
 
      {
608
 
        if (send_file(thd))
609
 
        {
610
 
          errmsg = "failed in send_file()";
611
 
          my_errno= ER_UNKNOWN_ERROR;
612
 
          goto err;
613
 
        }
614
 
      }
615
 
      packet->set("\0", 1, &my_charset_bin);
616
 
    }
617
 
 
618
 
    /*
619
 
      here we were reading binlog that was not closed properly (as a result
620
 
      of a crash ?). treat any corruption as EOF
621
 
    */
622
 
    if (binlog_can_be_corrupted && error != LOG_READ_MEM)
623
 
      error=LOG_READ_EOF;
624
 
    /*
625
 
      TODO: now that we are logging the offset, check to make sure
626
 
      the recorded offset and the actual match.
627
 
      Guilhem 2003-06: this is not true if this master is a slave
628
 
      <4.0.15 running with --log-slave-updates, because then log_pos may
629
 
      be the offset in the-master-of-this-master's binlog.
630
 
    */
631
 
    if (test_for_non_eof_log_read_errors(error, &errmsg))
632
 
      goto err;
633
 
 
634
 
    if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
635
 
        mysql_bin_log.is_active(log_file_name))
636
 
    {
637
 
      /*
638
 
        Block until there is more data in the log
639
 
      */
640
 
      if (net_flush(net))
641
 
      {
642
 
        errmsg = "failed on net_flush()";
643
 
        my_errno= ER_UNKNOWN_ERROR;
644
 
        goto err;
645
 
      }
646
 
 
647
 
      /*
648
 
        We may have missed the update broadcast from the log
649
 
        that has just happened, let's try to catch it if it did.
650
 
        If we did not miss anything, we just wait for other threads
651
 
        to signal us.
652
 
      */
653
 
      {
654
 
        log.error=0;
655
 
        bool read_packet = 0, fatal_error = 0;
656
 
 
657
 
        /*
658
 
          No one will update the log while we are reading
659
 
          now, but we'll be quick and just read one record
660
 
 
661
 
          TODO:
662
 
          Add an counter that is incremented for each time we update the
663
 
          binary log.  We can avoid the following read if the counter
664
 
          has not been updated since last read.
665
 
        */
666
 
 
667
 
        pthread_mutex_lock(log_lock);
668
 
        switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
669
 
        case 0:
670
 
          /* we read successfully, so we'll need to send it to the slave */
671
 
          pthread_mutex_unlock(log_lock);
672
 
          read_packet = 1;
673
 
          if (coord)
674
 
            coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
675
 
          break;
676
 
 
677
 
        case LOG_READ_EOF:
678
 
        {
679
 
          int ret;
680
 
          if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
681
 
          {
682
 
            pthread_mutex_unlock(log_lock);
683
 
            goto end;
684
 
          }
685
 
 
686
 
          do 
687
 
          {
688
 
            if (coord)
689
 
            {
690
 
              assert(heartbeat_ts && heartbeat_period != 0LL);
691
 
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
692
 
            }
693
 
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
694
 
            assert(ret == 0 || (heartbeat_period != 0LL && coord != NULL));
695
 
            if (ret == ETIMEDOUT || ret == ETIME)
696
 
            {
697
 
              if (send_heartbeat_event(net, packet, coord))
698
 
              {
699
 
                errmsg = "Failed on my_net_write()";
700
 
                my_errno= ER_UNKNOWN_ERROR;
701
 
                pthread_mutex_unlock(log_lock);
702
 
                goto err;
703
 
              }
704
 
            }
705
 
            else
706
 
            {
707
 
              assert(ret == 0);
708
 
            }
709
 
          } while (ret != 0 && coord != NULL && !thd->killed);
710
 
          pthread_mutex_unlock(log_lock);
711
 
        }    
712
 
        break;
713
 
            
714
 
        default:
715
 
          pthread_mutex_unlock(log_lock);
716
 
          fatal_error = 1;
717
 
          break;
718
 
        }
719
 
 
720
 
        if (read_packet)
721
 
        {
722
 
          thd_proc_info(thd, "Sending binlog event to slave");
723
 
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
724
 
          {
725
 
            errmsg = "Failed on my_net_write()";
726
 
            my_errno= ER_UNKNOWN_ERROR;
727
 
            goto err;
728
 
          }
729
 
 
730
 
          if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
731
 
          {
732
 
            if (send_file(thd))
733
 
            {
734
 
              errmsg = "failed in send_file()";
735
 
              my_errno= ER_UNKNOWN_ERROR;
736
 
              goto err;
737
 
            }
738
 
          }
739
 
          packet->set("\0", 1, &my_charset_bin);
740
 
          /*
741
 
            No need to net_flush because we will get to flush later when
742
 
            we hit EOF pretty quick
743
 
          */
744
 
        }
745
 
 
746
 
        if (fatal_error)
747
 
        {
748
 
          errmsg = "error reading log entry";
749
 
          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
750
 
          goto err;
751
 
        }
752
 
        log.error=0;
753
 
      }
754
 
    }
755
 
    else
756
 
    {
757
 
      bool loop_breaker = 0;
758
 
      /* need this to break out of the for loop from switch */
759
 
 
760
 
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
761
 
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
762
 
      case LOG_INFO_EOF:
763
 
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
764
 
        break;
765
 
      case 0:
766
 
        break;
767
 
      default:
768
 
        errmsg = "could not find next log";
769
 
        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
770
 
        goto err;
771
 
      }
772
 
 
773
 
      if (loop_breaker)
774
 
        break;
775
 
 
776
 
      end_io_cache(&log);
777
 
      (void) my_close(file, MYF(MY_WME));
778
 
 
779
 
      /*
780
 
        Call fake_rotate_event() in case the previous log (the one which
781
 
        we have just finished reading) did not contain a Rotate event
782
 
        (for example (I don't know any other example) the previous log
783
 
        was the last one before the master was shutdown & restarted).
784
 
        This way we tell the slave about the new log's name and
785
 
        position.  If the binlog is 5.0, the next event we are going to
786
 
        read and send is Format_description_log_event.
787
 
      */
788
 
      if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
789
 
          fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
790
 
                            &errmsg))
791
 
      {
792
 
        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
793
 
        goto err;
794
 
      }
795
 
 
796
 
      packet->length(0);
797
 
      packet->append('\0');
798
 
      if (coord)
799
 
        coord->file_name= log_file_name; // reset to the next
800
 
    }
801
 
  }
802
 
 
803
 
end:
804
 
  end_io_cache(&log);
805
 
  (void)my_close(file, MYF(MY_WME));
806
 
 
807
 
  my_eof(thd);
808
 
  thd_proc_info(thd, "Waiting to finalize termination");
809
 
  pthread_mutex_lock(&LOCK_thread_count);
810
 
  thd->current_linfo = 0;
811
 
  pthread_mutex_unlock(&LOCK_thread_count);
812
 
  return;
813
 
 
814
 
err:
815
 
  thd_proc_info(thd, "Waiting to finalize termination");
816
 
  end_io_cache(&log);
817
 
  /*
818
 
    Exclude  iteration through thread list
819
 
    this is needed for purge_logs() - it will iterate through
820
 
    thread list and update thd->current_linfo->index_file_offset
821
 
    this mutex will make sure that it never tried to update our linfo
822
 
    after we return from this stack frame
823
 
  */
824
 
  pthread_mutex_lock(&LOCK_thread_count);
825
 
  thd->current_linfo = 0;
826
 
  pthread_mutex_unlock(&LOCK_thread_count);
827
 
  if (file >= 0)
828
 
    (void) my_close(file, MYF(MY_WME));
829
 
 
830
 
  my_message(my_errno, errmsg, MYF(0));
831
 
  return;
832
 
}
833
 
 
834
 
int start_slave(THD* thd , Master_info* mi,  bool net_report)
835
 
{
836
 
  int slave_errno= 0;
837
 
  int thread_mask;
838
 
 
839
 
  lock_slave_threads(mi);  // this allows us to cleanly read slave_running
840
 
  // Get a mask of _stopped_ threads
841
 
  init_thread_mask(&thread_mask,mi,1 /* inverse */);
842
 
  /*
843
 
    Below we will start all stopped threads.  But if the user wants to
844
 
    start only one thread, do as if the other thread was running (as we
845
 
    don't wan't to touch the other thread), so set the bit to 0 for the
846
 
    other thread
847
 
  */
848
 
  if (thd->lex->slave_thd_opt)
849
 
    thread_mask&= thd->lex->slave_thd_opt;
850
 
  if (thread_mask) //some threads are stopped, start them
851
 
  {
852
 
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
853
 
                         thread_mask))
854
 
      slave_errno=ER_MASTER_INFO;
855
 
    else if (server_id_supplied && *mi->host)
856
 
    {
857
 
      /*
858
 
        If we will start SQL thread we will care about UNTIL options If
859
 
        not and they are specified we will ignore them and warn user
860
 
        about this fact.
861
 
      */
862
 
      if (thread_mask & SLAVE_SQL)
863
 
      {
864
 
        pthread_mutex_lock(&mi->rli.data_lock);
865
 
 
866
 
        if (thd->lex->mi.pos)
867
 
        {
868
 
          mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
869
 
          mi->rli.until_log_pos= thd->lex->mi.pos;
870
 
          /*
871
 
             We don't check thd->lex->mi.log_file_name for NULL here
872
 
             since it is checked in sql_yacc.yy
873
 
          */
874
 
          strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
875
 
                  sizeof(mi->rli.until_log_name)-1);
876
 
        }
877
 
        else if (thd->lex->mi.relay_log_pos)
878
 
        {
879
 
          mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
880
 
          mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
881
 
          strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
882
 
                  sizeof(mi->rli.until_log_name)-1);
883
 
        }
884
 
        else
885
 
          mi->rli.clear_until_condition();
886
 
 
887
 
        if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
888
 
        {
889
 
          /* Preparing members for effective until condition checking */
890
 
          const char *p= fn_ext(mi->rli.until_log_name);
891
 
          char *p_end;
892
 
          if (*p)
893
 
          {
894
 
            //p points to '.'
895
 
            mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
896
 
            /*
897
 
              p_end points to the first invalid character. If it equals
898
 
              to p, no digits were found, error. If it contains '\0' it
899
 
              means  conversion went ok.
900
 
            */
901
 
            if (p_end==p || *p_end)
902
 
              slave_errno=ER_BAD_SLAVE_UNTIL_COND;
903
 
          }
904
 
          else
905
 
            slave_errno=ER_BAD_SLAVE_UNTIL_COND;
906
 
 
907
 
          /* mark the cached result of the UNTIL comparison as "undefined" */
908
 
          mi->rli.until_log_names_cmp_result=
909
 
            Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
910
 
 
911
 
          /* Issuing warning then started without --skip-slave-start */
912
 
          if (!opt_skip_slave_start)
913
 
            push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
914
 
                         ER_MISSING_SKIP_SLAVE,
915
 
                         ER(ER_MISSING_SKIP_SLAVE));
916
 
        }
917
 
 
918
 
        pthread_mutex_unlock(&mi->rli.data_lock);
919
 
      }
920
 
      else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
921
 
        push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
922
 
                     ER(ER_UNTIL_COND_IGNORED));
923
 
 
924
 
      if (!slave_errno)
925
 
        slave_errno = start_slave_threads(0 /*no mutex */,
926
 
                                        1 /* wait for start */,
927
 
                                        mi,
928
 
                                        master_info_file,relay_log_info_file,
929
 
                                        thread_mask);
930
 
    }
931
 
    else
932
 
      slave_errno = ER_BAD_SLAVE;
933
 
  }
934
 
  else
935
 
  {
936
 
    /* no error if all threads are already started, only a warning */
937
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
938
 
                 ER(ER_SLAVE_WAS_RUNNING));
939
 
  }
940
 
 
941
 
  unlock_slave_threads(mi);
942
 
 
943
 
  if (slave_errno)
944
 
  {
945
 
    if (net_report)
946
 
      my_message(slave_errno, ER(slave_errno), MYF(0));
947
 
    return(1);
948
 
  }
949
 
  else if (net_report)
950
 
    my_ok(thd);
951
 
 
952
 
  return(0);
953
 
}
954
 
 
955
 
 
956
 
int stop_slave(THD* thd, Master_info* mi, bool net_report )
957
 
{
958
 
  int slave_errno;
959
 
  if (!thd)
960
 
    thd = current_thd;
961
 
 
962
 
  thd_proc_info(thd, "Killing slave");
963
 
  int thread_mask;
964
 
  lock_slave_threads(mi);
965
 
  // Get a mask of _running_ threads
966
 
  init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
967
 
  /*
968
 
    Below we will stop all running threads.
969
 
    But if the user wants to stop only one thread, do as if the other thread
970
 
    was stopped (as we don't wan't to touch the other thread), so set the
971
 
    bit to 0 for the other thread
972
 
  */
973
 
  if (thd->lex->slave_thd_opt)
974
 
    thread_mask &= thd->lex->slave_thd_opt;
975
 
 
976
 
  if (thread_mask)
977
 
  {
978
 
    slave_errno= terminate_slave_threads(mi,thread_mask,
979
 
                                         1 /*skip lock */);
980
 
  }
981
 
  else
982
 
  {
983
 
    //no error if both threads are already stopped, only a warning
984
 
    slave_errno= 0;
985
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
986
 
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
987
 
  }
988
 
  unlock_slave_threads(mi);
989
 
  thd_proc_info(thd, 0);
990
 
 
991
 
  if (slave_errno)
992
 
  {
993
 
    if (net_report)
994
 
      my_message(slave_errno, ER(slave_errno), MYF(0));
995
 
    return(1);
996
 
  }
997
 
  else if (net_report)
998
 
    my_ok(thd);
999
 
 
1000
 
  return(0);
1001
 
}
1002
 
 
1003
 
 
1004
 
/*
1005
 
  Remove all relay logs and start replication from the start
1006
 
 
1007
 
  SYNOPSIS
1008
 
    reset_slave()
1009
 
    thd                 Thread handler
1010
 
    mi                  Master info for the slave
1011
 
 
1012
 
  RETURN
1013
 
    0   ok
1014
 
    1   error
1015
 
*/
1016
 
 
1017
 
 
1018
 
int reset_slave(THD *thd, Master_info* mi)
1019
 
{
1020
 
  struct stat stat_area;
1021
 
  char fname[FN_REFLEN];
1022
 
  int thread_mask= 0, error= 0;
1023
 
  uint sql_errno=0;
1024
 
  const char* errmsg=0;
1025
 
 
1026
 
  lock_slave_threads(mi);
1027
 
  init_thread_mask(&thread_mask,mi,0 /* not inverse */);
1028
 
  if (thread_mask) // We refuse if any slave thread is running
1029
 
  {
1030
 
    sql_errno= ER_SLAVE_MUST_STOP;
1031
 
    error=1;
1032
 
    goto err;
1033
 
  }
1034
 
 
1035
 
  ha_reset_slave(thd);
1036
 
 
1037
 
  // delete relay logs, clear relay log coordinates
1038
 
  if ((error= purge_relay_logs(&mi->rli, thd,
1039
 
                               1 /* just reset */,
1040
 
                               &errmsg)))
1041
 
    goto err;
1042
 
 
1043
 
  /* Clear master's log coordinates */
1044
 
  init_master_log_pos(mi);
1045
 
  /*
1046
 
     Reset errors (the idea is that we forget about the
1047
 
     old master).
1048
 
  */
1049
 
  mi->rli.clear_error();
1050
 
  mi->rli.clear_until_condition();
1051
 
 
1052
 
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1053
 
  end_master_info(mi);
1054
 
  // and delete these two files
1055
 
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
1056
 
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1057
 
  {
1058
 
    error=1;
1059
 
    goto err;
1060
 
  }
1061
 
  // delete relay_log_info_file
1062
 
  fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
1063
 
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1064
 
  {
1065
 
    error=1;
1066
 
    goto err;
1067
 
  }
1068
 
 
1069
 
err:
1070
 
  unlock_slave_threads(mi);
1071
 
  if (error)
1072
 
    my_error(sql_errno, MYF(0), errmsg);
1073
 
  return(error);
1074
 
}
1075
 
 
1076
 
/*
1077
 
 
1078
 
  Kill all Binlog_dump threads which previously talked to the same slave
1079
 
  ("same" means with the same server id). Indeed, if the slave stops, if the
1080
 
  Binlog_dump thread is waiting (pthread_cond_wait) for binlog update, then it
1081
 
  will keep existing until a query is written to the binlog. If the master is
1082
 
  idle, then this could last long, and if the slave reconnects, we could have 2
1083
 
  Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
1084
 
  binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
1085
 
  the master kills any existing thread with the slave's server id (if this id is
1086
 
  not zero; it will be true for real slaves, but false for mysqlbinlog when it
1087
 
  sends COM_BINLOG_DUMP to get a remote binlog dump).
1088
 
 
1089
 
  SYNOPSIS
1090
 
    kill_zombie_dump_threads()
1091
 
    slave_server_id     the slave's server id
1092
 
 
1093
 
*/
1094
 
 
1095
 
 
1096
 
void kill_zombie_dump_threads(uint32_t slave_server_id)
1097
 
{
1098
 
  pthread_mutex_lock(&LOCK_thread_count);
1099
 
  I_List_iterator<THD> it(threads);
1100
 
  THD *tmp;
1101
 
 
1102
 
  while ((tmp=it++))
1103
 
  {
1104
 
    if (tmp->command == COM_BINLOG_DUMP &&
1105
 
       tmp->server_id == slave_server_id)
1106
 
    {
1107
 
      pthread_mutex_lock(&tmp->LOCK_delete);    // Lock from delete
1108
 
      break;
1109
 
    }
1110
 
  }
1111
 
  pthread_mutex_unlock(&LOCK_thread_count);
1112
 
  if (tmp)
1113
 
  {
1114
 
    /*
1115
 
      Here we do not call kill_one_thread() as
1116
 
      it will be slow because it will iterate through the list
1117
 
      again. We just to do kill the thread ourselves.
1118
 
    */
1119
 
    tmp->awake(THD::KILL_QUERY);
1120
 
    pthread_mutex_unlock(&tmp->LOCK_delete);
1121
 
  }
1122
 
}
1123
 
 
1124
 
 
1125
 
bool change_master(THD* thd, Master_info* mi)
1126
 
{
1127
 
  int thread_mask;
1128
 
  const char* errmsg= 0;
1129
 
  bool need_relay_log_purge= 1;
1130
 
 
1131
 
  lock_slave_threads(mi);
1132
 
  init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
1133
 
  if (thread_mask) // We refuse if any slave thread is running
1134
 
  {
1135
 
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1136
 
    unlock_slave_threads(mi);
1137
 
    return(true);
1138
 
  }
1139
 
 
1140
 
  thd_proc_info(thd, "Changing master");
1141
 
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1142
 
  // TODO: see if needs re-write
1143
 
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
1144
 
                       thread_mask))
1145
 
  {
1146
 
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1147
 
    unlock_slave_threads(mi);
1148
 
    return(true);
1149
 
  }
1150
 
 
1151
 
  /*
1152
 
    Data lock not needed since we have already stopped the running threads,
1153
 
    and we have the hold on the run locks which will keep all threads that
1154
 
    could possibly modify the data structures from running
1155
 
  */
1156
 
 
1157
 
  /*
1158
 
    If the user specified host or port without binlog or position,
1159
 
    reset binlog's name to FIRST and position to 4.
1160
 
  */
1161
 
 
1162
 
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1163
 
  {
1164
 
    mi->master_log_name[0] = 0;
1165
 
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
1166
 
  }
1167
 
 
1168
 
  if (lex_mi->log_file_name)
1169
 
    strmake(mi->master_log_name, lex_mi->log_file_name,
1170
 
            sizeof(mi->master_log_name)-1);
1171
 
  if (lex_mi->pos)
1172
 
  {
1173
 
    mi->master_log_pos= lex_mi->pos;
1174
 
  }
1175
 
 
1176
 
  if (lex_mi->host)
1177
 
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1178
 
  if (lex_mi->user)
1179
 
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
1180
 
  if (lex_mi->password)
1181
 
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
1182
 
  if (lex_mi->port)
1183
 
    mi->port = lex_mi->port;
1184
 
  if (lex_mi->connect_retry)
1185
 
    mi->connect_retry = lex_mi->connect_retry;
1186
 
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1187
 
    mi->heartbeat_period = lex_mi->heartbeat_period;
1188
 
  else
1189
 
    mi->heartbeat_period= (float) min((double)SLAVE_MAX_HEARTBEAT_PERIOD,
1190
 
                                      (slave_net_timeout/2.0));
1191
 
  mi->received_heartbeats= 0LL; // counter lives until master is CHANGEd
1192
 
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1193
 
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
1194
 
 
1195
 
  if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1196
 
    mi->ssl_verify_server_cert=
1197
 
      (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
1198
 
 
1199
 
  if (lex_mi->ssl_ca)
1200
 
    strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
1201
 
  if (lex_mi->ssl_capath)
1202
 
    strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
1203
 
  if (lex_mi->ssl_cert)
1204
 
    strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
1205
 
  if (lex_mi->ssl_cipher)
1206
 
    strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
1207
 
  if (lex_mi->ssl_key)
1208
 
    strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
1209
 
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
1210
 
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
1211
 
      lex_mi->ssl_verify_server_cert )
1212
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1213
 
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1214
 
 
1215
 
  if (lex_mi->relay_log_name)
1216
 
  {
1217
 
    need_relay_log_purge= 0;
1218
 
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
1219
 
            sizeof(mi->rli.group_relay_log_name)-1);
1220
 
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
1221
 
            sizeof(mi->rli.event_relay_log_name)-1);
1222
 
  }
1223
 
 
1224
 
  if (lex_mi->relay_log_pos)
1225
 
  {
1226
 
    need_relay_log_purge= 0;
1227
 
    mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
1228
 
  }
1229
 
 
1230
 
  /*
1231
 
    If user did specify neither host nor port nor any log name nor any log
1232
 
    pos, i.e. he specified only user/password/master_connect_retry, he probably
1233
 
    wants replication to resume from where it had left, i.e. from the
1234
 
    coordinates of the **SQL** thread (imagine the case where the I/O is ahead
1235
 
    of the SQL; restarting from the coordinates of the I/O would lose some
1236
 
    events which is probably unwanted when you are just doing minor changes
1237
 
    like changing master_connect_retry).
1238
 
    A side-effect is that if only the I/O thread was started, this thread may
1239
 
    restart from ''/4 after the CHANGE MASTER. That's a minor problem (it is a
1240
 
    much more unlikely situation than the one we are fixing here).
1241
 
    Note: coordinates of the SQL thread must be read here, before the
1242
 
    'if (need_relay_log_purge)' block which resets them.
1243
 
  */
1244
 
  if (!lex_mi->host && !lex_mi->port &&
1245
 
      !lex_mi->log_file_name && !lex_mi->pos &&
1246
 
      need_relay_log_purge)
1247
 
   {
1248
 
     /*
1249
 
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1250
 
       not initialized), so we use a max().
1251
 
       What happens to mi->rli.master_log_pos during the initialization stages
1252
 
       of replication is not 100% clear, so we guard against problems using
1253
 
       max().
1254
 
      */
1255
 
     mi->master_log_pos = ((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1256
 
                           ? BIN_LOG_HEADER_SIZE
1257
 
                           : mi->rli.group_master_log_pos);
1258
 
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
1259
 
             sizeof(mi->master_log_name)-1);
1260
 
  }
1261
 
  /*
1262
 
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1263
 
    a slave before).
1264
 
  */
1265
 
  if (flush_master_info(mi, 0))
1266
 
  {
1267
 
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1268
 
    unlock_slave_threads(mi);
1269
 
    return(true);
1270
 
  }
1271
 
  if (need_relay_log_purge)
1272
 
  {
1273
 
    relay_log_purge= 1;
1274
 
    thd_proc_info(thd, "Purging old relay logs");
1275
 
    if (purge_relay_logs(&mi->rli, thd,
1276
 
                         0 /* not only reset, but also reinit */,
1277
 
                         &errmsg))
1278
 
    {
1279
 
      my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
1280
 
      unlock_slave_threads(mi);
1281
 
      return(true);
1282
 
    }
1283
 
  }
1284
 
  else
1285
 
  {
1286
 
    const char* msg;
1287
 
    relay_log_purge= 0;
1288
 
    /* Relay log is already initialized */
1289
 
    if (init_relay_log_pos(&mi->rli,
1290
 
                           mi->rli.group_relay_log_name,
1291
 
                           mi->rli.group_relay_log_pos,
1292
 
                           0 /*no data lock*/,
1293
 
                           &msg, 0))
1294
 
    {
1295
 
      my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
1296
 
      unlock_slave_threads(mi);
1297
 
      return(true);
1298
 
    }
1299
 
  }
1300
 
  /*
1301
 
    Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block,
1302
 
    so restore them to good values. If we left them to ''/0, that would work;
1303
 
    but that would fail in the case of 2 successive CHANGE MASTER (without a
1304
 
    START SLAVE in between): because first one would set the coords in mi to
1305
 
    the good values of those in rli, the set those in rli to ''/0, then
1306
 
    second CHANGE MASTER would set the coords in mi to those of rli, i.e. to
1307
 
    ''/0: we have lost all copies of the original good coordinates.
1308
 
    That's why we always save good coords in rli.
1309
 
  */
1310
 
  mi->rli.group_master_log_pos= mi->master_log_pos;
1311
 
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
1312
 
          sizeof(mi->rli.group_master_log_name)-1);
1313
 
 
1314
 
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
1315
 
    mi->rli.group_master_log_pos=0;
1316
 
 
1317
 
  pthread_mutex_lock(&mi->rli.data_lock);
1318
 
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1319
 
  /* Clear the errors, for a clean start */
1320
 
  mi->rli.clear_error();
1321
 
  mi->rli.clear_until_condition();
1322
 
  /*
1323
 
    If we don't write new coordinates to disk now, then old will remain in
1324
 
    relay-log.info until START SLAVE is issued; but if mysqld is shutdown
1325
 
    before START SLAVE, then old will remain in relay-log.info, and will be the
1326
 
    in-memory value at restart (thus causing errors, as the old relay log does
1327
 
    not exist anymore).
1328
 
  */
1329
 
  flush_relay_log_info(&mi->rli);
1330
 
  pthread_cond_broadcast(&mi->data_cond);
1331
 
  pthread_mutex_unlock(&mi->rli.data_lock);
1332
 
 
1333
 
  unlock_slave_threads(mi);
1334
 
  thd_proc_info(thd, 0);
1335
 
  my_ok(thd);
1336
 
  return(false);
1337
 
}
1338
 
 
1339
 
int reset_master(THD* thd)
1340
 
{
1341
 
  if (!mysql_bin_log.is_open())
1342
 
  {
1343
 
    my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
1344
 
               ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
1345
 
    return 1;
1346
 
  }
1347
 
  return mysql_bin_log.reset_logs(thd);
1348
 
}
1349
 
 
1350
 
int cmp_master_pos(const char* log_file_name1, uint64_t log_pos1,
1351
 
                   const char* log_file_name2, uint64_t log_pos2)
1352
 
{
1353
 
  int res;
1354
 
  uint log_file_name1_len=  strlen(log_file_name1);
1355
 
  uint log_file_name2_len=  strlen(log_file_name2);
1356
 
 
1357
 
  //  We assume that both log names match up to '.'
1358
 
  if (log_file_name1_len == log_file_name2_len)
1359
 
  {
1360
 
    if ((res= strcmp(log_file_name1, log_file_name2)))
1361
 
      return res;
1362
 
    return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
1363
 
  }
1364
 
  return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
1365
 
}
1366
 
 
1367
 
 
1368
 
bool mysql_show_binlog_events(THD* thd)
1369
 
{
1370
 
  Protocol *protocol= thd->protocol;
1371
 
  List<Item> field_list;
1372
 
  const char *errmsg= 0;
1373
 
  bool ret= true;
1374
 
  IO_CACHE log;
1375
 
  File file= -1;
1376
 
 
1377
 
  Log_event::init_show_field_list(&field_list);
1378
 
  if (protocol->send_fields(&field_list,
1379
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1380
 
    return(true);
1381
 
 
1382
 
  Format_description_log_event *description_event= new
1383
 
    Format_description_log_event(3); /* MySQL 4.0 by default */
1384
 
 
1385
 
  /*
1386
 
    Wait for handlers to insert any pending information
1387
 
    into the binlog.  For e.g. ndb which updates the binlog asynchronously
1388
 
    this is needed so that the uses sees all its own commands in the binlog
1389
 
  */
1390
 
  ha_binlog_wait(thd);
1391
 
 
1392
 
  if (mysql_bin_log.is_open())
1393
 
  {
1394
 
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1395
 
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
1396
 
    ha_rows event_count, limit_start, limit_end;
1397
 
    my_off_t pos = max((uint64_t)BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1398
 
    char search_file_name[FN_REFLEN], *name;
1399
 
    const char *log_file_name = lex_mi->log_file_name;
1400
 
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1401
 
    LOG_INFO linfo;
1402
 
    Log_event* ev;
1403
 
 
1404
 
    unit->set_limit(thd->lex->current_select);
1405
 
    limit_start= unit->offset_limit_cnt;
1406
 
    limit_end= unit->select_limit_cnt;
1407
 
 
1408
 
    name= search_file_name;
1409
 
    if (log_file_name)
1410
 
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
1411
 
    else
1412
 
      name=0;                                   // Find first log
1413
 
 
1414
 
    linfo.index_file_offset = 0;
1415
 
 
1416
 
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1417
 
    {
1418
 
      errmsg = "Could not find target log";
1419
 
      goto err;
1420
 
    }
1421
 
 
1422
 
    pthread_mutex_lock(&LOCK_thread_count);
1423
 
    thd->current_linfo = &linfo;
1424
 
    pthread_mutex_unlock(&LOCK_thread_count);
1425
 
 
1426
 
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
1427
 
      goto err;
1428
 
 
1429
 
    /*
1430
 
      to account binlog event header size
1431
 
    */
1432
 
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
1433
 
 
1434
 
    pthread_mutex_lock(log_lock);
1435
 
 
1436
 
    /*
1437
 
      open_binlog() sought to position 4.
1438
 
      Read the first event in case it's a Format_description_log_event, to
1439
 
      know the format. If there's no such event, we are 3.23 or 4.x. This
1440
 
      code, like before, can't read 3.23 binlogs.
1441
 
      This code will fail on a mixed relay log (one which has Format_desc then
1442
 
      Rotate then Format_desc).
1443
 
    */
1444
 
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
1445
 
    if (ev)
1446
 
    {
1447
 
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
1448
 
      {
1449
 
        delete description_event;
1450
 
        description_event= (Format_description_log_event*) ev;
1451
 
      }
1452
 
      else
1453
 
        delete ev;
1454
 
    }
1455
 
 
1456
 
    my_b_seek(&log, pos);
1457
 
 
1458
 
    if (!description_event->is_valid())
1459
 
    {
1460
 
      errmsg="Invalid Format_description event; could be out of memory";
1461
 
      goto err;
1462
 
    }
1463
 
 
1464
 
    for (event_count = 0;
1465
 
         (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
1466
 
                                         description_event)); )
1467
 
    {
1468
 
      if (event_count >= limit_start &&
1469
 
          ev->net_send(protocol, linfo.log_file_name, pos))
1470
 
      {
1471
 
        errmsg = "Net error";
1472
 
        delete ev;
1473
 
        pthread_mutex_unlock(log_lock);
1474
 
        goto err;
1475
 
      }
1476
 
 
1477
 
      pos = my_b_tell(&log);
1478
 
      delete ev;
1479
 
 
1480
 
      if (++event_count >= limit_end)
1481
 
        break;
1482
 
    }
1483
 
 
1484
 
    if (event_count < limit_end && log.error)
1485
 
    {
1486
 
      errmsg = "Wrong offset or I/O error";
1487
 
      pthread_mutex_unlock(log_lock);
1488
 
      goto err;
1489
 
    }
1490
 
 
1491
 
    pthread_mutex_unlock(log_lock);
1492
 
  }
1493
 
 
1494
 
  ret= false;
1495
 
 
1496
 
err:
1497
 
  delete description_event;
1498
 
  if (file >= 0)
1499
 
  {
1500
 
    end_io_cache(&log);
1501
 
    (void) my_close(file, MYF(MY_WME));
1502
 
  }
1503
 
 
1504
 
  if (errmsg)
1505
 
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
1506
 
             "SHOW BINLOG EVENTS", errmsg);
1507
 
  else
1508
 
    my_eof(thd);
1509
 
 
1510
 
  pthread_mutex_lock(&LOCK_thread_count);
1511
 
  thd->current_linfo = 0;
1512
 
  pthread_mutex_unlock(&LOCK_thread_count);
1513
 
  return(ret);
1514
 
}
1515
 
 
1516
 
 
1517
 
bool show_binlog_info(THD* thd)
1518
 
{
1519
 
  Protocol *protocol= thd->protocol;
1520
 
  List<Item> field_list;
1521
 
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1522
 
  field_list.push_back(new Item_return_int("Position",20,
1523
 
                                           DRIZZLE_TYPE_LONGLONG));
1524
 
  field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
1525
 
  field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
1526
 
 
1527
 
  if (protocol->send_fields(&field_list,
1528
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1529
 
    return(true);
1530
 
  protocol->prepare_for_resend();
1531
 
 
1532
 
  if (mysql_bin_log.is_open())
1533
 
  {
1534
 
    LOG_INFO li;
1535
 
    mysql_bin_log.get_current_log(&li);
1536
 
    int dir_len = dirname_length(li.log_file_name);
1537
 
    protocol->store(li.log_file_name + dir_len, &my_charset_bin);
1538
 
    protocol->store((uint64_t) li.pos);
1539
 
    protocol->store(binlog_filter->get_do_db());
1540
 
    protocol->store(binlog_filter->get_ignore_db());
1541
 
    if (protocol->write())
1542
 
      return(true);
1543
 
  }
1544
 
  my_eof(thd);
1545
 
  return(false);
1546
 
}
1547
 
 
1548
 
 
1549
 
/*
1550
 
  Send a list of all binary logs to client
1551
 
 
1552
 
  SYNOPSIS
1553
 
    show_binlogs()
1554
 
    thd         Thread specific variable
1555
 
 
1556
 
  RETURN VALUES
1557
 
    false OK
1558
 
    true  error
1559
 
*/
1560
 
 
1561
 
bool show_binlogs(THD* thd)
1562
 
{
1563
 
  IO_CACHE *index_file;
1564
 
  LOG_INFO cur;
1565
 
  File file;
1566
 
  char fname[FN_REFLEN];
1567
 
  List<Item> field_list;
1568
 
  uint length;
1569
 
  int cur_dir_len;
1570
 
  Protocol *protocol= thd->protocol;
1571
 
 
1572
 
  if (!mysql_bin_log.is_open())
1573
 
  {
1574
 
    my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0));
1575
 
    return 1;
1576
 
  }
1577
 
 
1578
 
  field_list.push_back(new Item_empty_string("Log_name", 255));
1579
 
  field_list.push_back(new Item_return_int("File_size", 20,
1580
 
                                           DRIZZLE_TYPE_LONGLONG));
1581
 
  if (protocol->send_fields(&field_list,
1582
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1583
 
    return(true);
1584
 
  
1585
 
  pthread_mutex_lock(mysql_bin_log.get_log_lock());
1586
 
  mysql_bin_log.lock_index();
1587
 
  index_file=mysql_bin_log.get_index_file();
1588
 
  
1589
 
  mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
1590
 
  pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
1591
 
  
1592
 
  cur_dir_len= dirname_length(cur.log_file_name);
1593
 
 
1594
 
  reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
1595
 
 
1596
 
  /* The file ends with EOF or empty line */
1597
 
  while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
1598
 
  {
1599
 
    int dir_len;
1600
 
    uint64_t file_length= 0;                   // Length if open fails
1601
 
    fname[--length] = '\0';                     // remove the newline
1602
 
 
1603
 
    protocol->prepare_for_resend();
1604
 
    dir_len= dirname_length(fname);
1605
 
    length-= dir_len;
1606
 
    protocol->store(fname + dir_len, length, &my_charset_bin);
1607
 
 
1608
 
    if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
1609
 
      file_length= cur.pos;  /* The active log, use the active position */
1610
 
    else
1611
 
    {
1612
 
      /* this is an old log, open it and find the size */
1613
 
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
1614
 
                         MYF(0))) >= 0)
1615
 
      {
1616
 
        file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));
1617
 
        my_close(file, MYF(0));
1618
 
      }
1619
 
    }
1620
 
    protocol->store(file_length);
1621
 
    if (protocol->write())
1622
 
      goto err;
1623
 
  }
1624
 
  mysql_bin_log.unlock_index();
1625
 
  my_eof(thd);
1626
 
  return(false);
1627
 
 
1628
 
err:
1629
 
  mysql_bin_log.unlock_index();
1630
 
  return(true);
1631
 
}
1632
 
 
1633
 
/**
1634
 
   Load data's io cache specific hook to be executed
1635
 
   before a chunk of data is being read into the cache's buffer
1636
 
   The fuction instantianates and writes into the binlog
1637
 
   replication events along LOAD DATA processing.
1638
 
   
1639
 
   @param file  pointer to io-cache
1640
 
   @return 0
1641
 
*/
1642
 
int log_loaded_block(IO_CACHE* file)
1643
 
{
1644
 
  LOAD_FILE_INFO *lf_info;
1645
 
  uint block_len;
1646
 
  /* buffer contains position where we started last read */
1647
 
  uchar* buffer= (uchar*) my_b_get_buffer_start(file);
1648
 
  uint max_event_size= current_thd->variables.max_allowed_packet;
1649
 
  lf_info= (LOAD_FILE_INFO*) file->arg;
1650
 
  if (lf_info->thd->current_stmt_binlog_row_based)
1651
 
    return(0);
1652
 
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1653
 
      lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1654
 
    return(0);
1655
 
  
1656
 
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1657
 
       buffer += min(block_len, max_event_size),
1658
 
       block_len -= min(block_len, max_event_size))
1659
 
  {
1660
 
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1661
 
    if (lf_info->wrote_create_file)
1662
 
    {
1663
 
      Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
1664
 
                               min(block_len, max_event_size),
1665
 
                               lf_info->log_delayed);
1666
 
      mysql_bin_log.write(&a);
1667
 
    }
1668
 
    else
1669
 
    {
1670
 
      Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1671
 
                                   buffer,
1672
 
                                   min(block_len, max_event_size),
1673
 
                                   lf_info->log_delayed);
1674
 
      mysql_bin_log.write(&b);
1675
 
      lf_info->wrote_create_file= 1;
1676
 
    }
1677
 
  }
1678
 
  return(0);
1679
 
}
1680
 
 
1681
 
/*
1682
 
  Replication System Variables
1683
 
*/
1684
 
 
1685
 
class sys_var_slave_skip_counter :public sys_var
1686
 
{
1687
 
public:
1688
 
  sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
1689
 
    :sys_var(name_arg)
1690
 
  { chain_sys_var(chain); }
1691
 
  bool check(THD *thd, set_var *var);
1692
 
  bool update(THD *thd, set_var *var);
1693
 
  bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
1694
 
  /*
1695
 
    We can't retrieve the value of this, so we don't have to define
1696
 
    type() or value_ptr()
1697
 
  */
1698
 
};
1699
 
 
1700
 
class sys_var_sync_binlog_period :public sys_var_long_ptr
1701
 
{
1702
 
public:
1703
 
  sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
1704
 
                             ulong *value_ptr)
1705
 
    :sys_var_long_ptr(chain, name_arg,value_ptr) {}
1706
 
  bool update(THD *thd, set_var *var);
1707
 
};
1708
 
 
1709
 
static void fix_slave_net_timeout(THD *thd,
1710
 
                                  enum_var_type type __attribute__((unused)))
1711
 
{
1712
 
#ifdef HAVE_REPLICATION
1713
 
  pthread_mutex_lock(&LOCK_active_mi);
1714
 
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1715
 
    push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1716
 
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1717
 
                        "The currect value for master_heartbeat_period"
1718
 
                        " exceeds the new value of `slave_net_timeout' sec."
1719
 
                        " A sensible value for the period should be"
1720
 
                        " less than the timeout.");
1721
 
  pthread_mutex_unlock(&LOCK_active_mi);
1722
 
#endif
1723
 
  return;
1724
 
}
1725
 
 
1726
 
static sys_var_chain vars = { NULL, NULL };
1727
 
 
1728
 
static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
1729
 
                                            &relay_log_purge);
1730
 
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
1731
 
                                              &slave_net_timeout,
1732
 
                                              fix_slave_net_timeout);
1733
 
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
1734
 
                                                &slave_trans_retries);
1735
 
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
1736
 
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
1737
 
 
1738
 
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
1739
 
 
1740
 
 
1741
 
static int show_slave_skip_errors(THD *thd __attribute__((unused)),
1742
 
                                  SHOW_VAR *var, char *buff)
1743
 
{
1744
 
  var->type=SHOW_CHAR;
1745
 
  var->value= buff;
1746
 
  if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
1747
 
  {
1748
 
    var->value= const_cast<char *>("OFF");
1749
 
  }
1750
 
  else if (bitmap_is_set_all(&slave_error_mask))
1751
 
  {
1752
 
    var->value= const_cast<char *>("ALL");
1753
 
  }
1754
 
  else
1755
 
  {
1756
 
    /* 10 is enough assuming errors are max 4 digits */
1757
 
    int i;
1758
 
    var->value= buff;
1759
 
    for (i= 1;
1760
 
         i < MAX_SLAVE_ERROR &&
1761
 
         (buff - var->value) < SHOW_VAR_FUNC_BUFF_SIZE;
1762
 
         i++)
1763
 
    {
1764
 
      if (bitmap_is_set(&slave_error_mask, i))
1765
 
      {
1766
 
        buff= int10_to_str(i, buff, 10);
1767
 
        *buff++= ',';
1768
 
      }
1769
 
    }
1770
 
    if (var->value != buff)
1771
 
      buff--;                           // Remove last ','
1772
 
    if (i < MAX_SLAVE_ERROR)
1773
 
      buff= stpcpy(buff, "...");  // Couldn't show all errors
1774
 
    *buff=0;
1775
 
  }
1776
 
  return 0;
1777
 
}
1778
 
 
1779
 
static st_show_var_func_container
1780
 
show_slave_skip_errors_cont = { &show_slave_skip_errors };
1781
 
 
1782
 
 
1783
 
static SHOW_VAR fixed_vars[]= {
1784
 
  {"log_slave_updates",       (char*) &opt_log_slave_updates,       SHOW_MY_BOOL},
1785
 
  {"relay_log" , (char*) &opt_relay_logname, SHOW_CHAR_PTR},
1786
 
  {"relay_log_index", (char*) &opt_relaylog_index_name, SHOW_CHAR_PTR},
1787
 
  {"relay_log_info_file", (char*) &relay_log_info_file, SHOW_CHAR_PTR},
1788
 
  {"relay_log_space_limit",   (char*) &relay_log_space_limit,       SHOW_LONGLONG},
1789
 
  {"slave_load_tmpdir",       (char*) &slave_load_tmpdir,           SHOW_CHAR_PTR},
1790
 
  {"slave_skip_errors",       (char*) &show_slave_skip_errors_cont,      SHOW_FUNC},
1791
 
};
1792
 
 
1793
 
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((unused)),
1794
 
                                       set_var *var)
1795
 
{
1796
 
  int result= 0;
1797
 
  pthread_mutex_lock(&LOCK_active_mi);
1798
 
  pthread_mutex_lock(&active_mi->rli.run_lock);
1799
 
  if (active_mi->rli.slave_running)
1800
 
  {
1801
 
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1802
 
    result=1;
1803
 
  }
1804
 
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1805
 
  pthread_mutex_unlock(&LOCK_active_mi);
1806
 
  var->save_result.ulong_value= (ulong) var->value->val_int();
1807
 
  return result;
1808
 
}
1809
 
 
1810
 
 
1811
 
bool sys_var_slave_skip_counter::update(THD *thd __attribute__((unused)),
1812
 
                                        set_var *var)
1813
 
{
1814
 
  pthread_mutex_lock(&LOCK_active_mi);
1815
 
  pthread_mutex_lock(&active_mi->rli.run_lock);
1816
 
  /*
1817
 
    The following test should normally never be true as we test this
1818
 
    in the check function;  To be safe against multiple
1819
 
    SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
1820
 
  */
1821
 
  if (!active_mi->rli.slave_running)
1822
 
  {
1823
 
    pthread_mutex_lock(&active_mi->rli.data_lock);
1824
 
    active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
1825
 
    pthread_mutex_unlock(&active_mi->rli.data_lock);
1826
 
  }
1827
 
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1828
 
  pthread_mutex_unlock(&LOCK_active_mi);
1829
 
  return 0;
1830
 
}
1831
 
 
1832
 
 
1833
 
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((unused)),
1834
 
                                        set_var *var)
1835
 
{
1836
 
  sync_binlog_period= (uint32_t) var->save_result.uint64_t_value;
1837
 
  return 0;
1838
 
}
1839
 
 
1840
 
int init_replication_sys_vars()
1841
 
{
1842
 
  mysql_append_static_vars(fixed_vars, sizeof(fixed_vars) / sizeof(SHOW_VAR));
1843
 
 
1844
 
  if (mysql_add_sys_var_chain(vars.first, my_long_options))
1845
 
  {
1846
 
    /* should not happen */
1847
 
    fprintf(stderr, "failed to initialize replication system variables");
1848
 
    unireg_abort(1);
1849
 
  }
1850
 
  return 0;
1851
 
}
1852
 
 
1853
 
#endif /* HAVE_REPLICATION */