1
/* Copyright (C) 2000-2006 MySQL AB & Sasha
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include <drizzled/server_includes.h>
18
#ifdef HAVE_REPLICATION
21
#include "log_event.h"
22
#include "rpl_filter.h"
23
#include <drizzled/drizzled_error_messages.h>
25
int max_binlog_dump_events = 0; // unlimited
28
fake_rotate_event() builds a fake (=which does not exist physically in any
29
binlog) Rotate event, which contains the name of the binlog we are going to
30
send to the slave (because the slave may not know it if it just asked for
31
MASTER_LOG_FILE='', MASTER_LOG_POS=4).
32
< 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
33
After this version we always call it, so that a 3.23.58 slave can rely on
34
it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
35
zeros in the good positions which, by chance, make it possible for the 3.23
36
slave to detect that this event is unexpected) (this is luck which happens
37
because the master and slave disagree on the size of the header of
40
Relying on the event length of the Rotate event instead of these
41
well-placed zeros was not possible as Rotate events have a variable-length
44
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
45
uint64_t position, const char** errmsg)
47
char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
49
'when' (the timestamp) is set to 0 so that slave could distinguish between
50
real and fake Rotate events (if necessary)
53
header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
55
char* p = log_file_name+dirname_length(log_file_name);
56
uint ident_len = (uint) strlen(p);
57
uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
58
int4store(header + SERVER_ID_OFFSET, server_id);
59
int4store(header + EVENT_LEN_OFFSET, event_len);
60
int2store(header + FLAGS_OFFSET, 0);
62
// TODO: check what problems this may cause and fix them
63
int4store(header + LOG_POS_OFFSET, 0);
65
packet->append(header, sizeof(header));
66
int8store(buf+R_POS_OFFSET,position);
67
packet->append(buf, ROTATE_HEADER_LEN);
68
packet->append(p,ident_len);
69
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
71
*errmsg = "failed on my_net_write()";
77
static int send_file(THD *thd)
80
int fd = -1, error = 1;
82
char fname[FN_REFLEN+1];
83
const char *errmsg = 0;
85
unsigned long packet_len;
86
uchar buf[IO_SIZE]; // It's safe to alloc this
89
The client might be slow loading the data, give him wait_timeout to do
92
old_timeout= net->read_timeout;
93
my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
96
We need net_flush here because the client will not know it needs to send
97
us the file name until it has processed the load event entry
99
if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
101
errmsg = _("Failed in send_file() while reading file name");
105
// terminate with \0 for fn_format
106
*((char*)net->read_pos + packet_len) = 0;
107
fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
108
// this is needed to make replicate-ignore-db
109
if (!strcmp(fname,"/dev/null"))
112
if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
114
errmsg = _("Failed in send_file() on open of file");
118
while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0)
120
if (my_net_write(net, buf, bytes))
122
errmsg = _("Failed in send_file() while writing data to client");
128
if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
129
(my_net_read(net) == packet_error))
131
errmsg = _("Failed in send_file() while negotiating file transfer close");
137
my_net_set_read_timeout(net, old_timeout);
139
(void) my_close(fd, MYF(0));
142
sql_print_error(errmsg);
149
Adjust the position pointer in the binary log file for all running slaves
152
adjust_linfo_offsets()
153
purge_offset Number of bytes removed from start of log index file
156
- This is called when doing a PURGE when we delete lines from the
160
- Before calling this function, we have to ensure that no threads are
161
using any binary log file before purge_offset.a
164
- Inform the slave threads that they should sync the position
165
in the binary log file with flush_relay_log_info.
166
Now they sync is done for next read.
169
void adjust_linfo_offsets(my_off_t purge_offset)
173
pthread_mutex_lock(&LOCK_thread_count);
174
I_List_iterator<THD> it(threads);
179
if ((linfo = tmp->current_linfo))
181
pthread_mutex_lock(&linfo->lock);
183
Index file offset can be less that purge offset only if
184
we just started reading the index file. In that case
185
we have nothing to adjust
187
if (linfo->index_file_offset < purge_offset)
188
linfo->fatal = (linfo->index_file_offset != 0);
190
linfo->index_file_offset -= purge_offset;
191
pthread_mutex_unlock(&linfo->lock);
194
pthread_mutex_unlock(&LOCK_thread_count);
198
bool log_in_use(const char* log_name)
200
int log_name_len = strlen(log_name) + 1;
204
pthread_mutex_lock(&LOCK_thread_count);
205
I_List_iterator<THD> it(threads);
210
if ((linfo = tmp->current_linfo))
212
pthread_mutex_lock(&linfo->lock);
213
result = !memcmp(log_name, linfo->log_file_name, log_name_len);
214
pthread_mutex_unlock(&linfo->lock);
220
pthread_mutex_unlock(&LOCK_thread_count);
224
bool purge_error_message(THD* thd, int res)
230
case LOG_INFO_EOF: errmsg= ER_UNKNOWN_TARGET_BINLOG; break;
231
case LOG_INFO_IO: errmsg= ER_IO_ERR_LOG_INDEX_READ; break;
232
case LOG_INFO_INVALID:errmsg= ER_BINLOG_PURGE_PROHIBITED; break;
233
case LOG_INFO_SEEK: errmsg= ER_FSEEK_FAIL; break;
234
case LOG_INFO_MEM: errmsg= ER_OUT_OF_RESOURCES; break;
235
case LOG_INFO_FATAL: errmsg= ER_BINLOG_PURGE_FATAL_ERR; break;
236
case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break;
237
case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break;
238
default: errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break;
243
my_message(errmsg, ER(errmsg), MYF(0));
251
bool purge_master_logs(THD* thd, const char* to_log)
253
char search_file_name[FN_REFLEN];
254
if (!mysql_bin_log.is_open())
260
mysql_bin_log.make_log_name(search_file_name, to_log);
261
return purge_error_message(thd,
262
mysql_bin_log.purge_logs(search_file_name, 0, 1,
267
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
269
if (!mysql_bin_log.is_open())
274
return purge_error_message(thd,
275
mysql_bin_log.purge_logs_before_date(purge_time));
278
int test_for_non_eof_log_read_errors(int error, const char **errmsg)
280
if (error == LOG_READ_EOF)
282
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
285
*errmsg = "bogus data in log event";
287
case LOG_READ_TOO_LARGE:
288
*errmsg = "log event entry exceeded max_allowed_packet; \
289
Increase max_allowed_packet on master";
292
*errmsg = "I/O error reading log event";
295
*errmsg = "memory allocation failed reading log event";
298
*errmsg = "binlog truncated in the middle of event";
301
*errmsg = "unknown error reading log event on the master";
309
An auxiliary function for calling in mysql_binlog_send
310
to initialize the heartbeat timeout in waiting for a binlogged event.
312
@param[in] thd THD to access a user variable
314
@return heartbeat period an uint64_t of nanoseconds
315
or zero if heartbeat was not demanded by slave
317
static uint64_t get_heartbeat_period(THD * thd)
320
LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")};
321
user_var_entry *entry=
322
(user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
324
return entry? entry->val_int(&null_value) : 0;
328
Function prepares and sends repliation heartbeat event.
330
@param net net object of THD
331
@param packet buffer to store the heartbeat instance
332
@param event_coordinates binlog file name and position of the last
333
real event master sent from binlog
336
Among three essential pieces of heartbeat data Log_event::when
338
The error to send is serious and should force terminating
341
static int send_heartbeat_event(NET* net, String* packet,
342
const struct event_coordinates *coord)
344
char header[LOG_EVENT_HEADER_LEN];
346
'when' (the timestamp) is set to 0 so that slave could distinguish between
347
real and fake Rotate events (if necessary)
349
memset(header, 0, 4); // when
351
header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
353
char* p= coord->file_name + dirname_length(coord->file_name);
355
uint ident_len = strlen(p);
356
uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN;
357
int4store(header + SERVER_ID_OFFSET, server_id);
358
int4store(header + EVENT_LEN_OFFSET, event_len);
359
int2store(header + FLAGS_OFFSET, 0);
361
int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos
363
packet->append(header, sizeof(header));
364
packet->append(p, ident_len); // log_file_name
366
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
371
packet->set("\0", 1, &my_charset_bin);
376
TODO: Clean up loop to only have one call to send_file()
379
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
383
char *log_file_name = linfo.log_file_name;
384
char search_file_name[FN_REFLEN], *name;
387
String* packet = &thd->packet;
389
const char *errmsg = "Unknown error";
390
NET* net = &thd->net;
391
pthread_mutex_t *log_lock;
392
bool binlog_can_be_corrupted= false;
394
memset(&log, 0, sizeof(log));
396
heartbeat_period from @master_heartbeat_period user variable
398
uint64_t heartbeat_period= get_heartbeat_period(thd);
399
struct timespec heartbeat_buf;
400
struct event_coordinates coord_buf;
401
struct timespec *heartbeat_ts= NULL;
402
struct event_coordinates *coord= NULL;
403
if (heartbeat_period != 0LL)
405
heartbeat_ts= &heartbeat_buf;
406
set_timespec_nsec(*heartbeat_ts, 0);
408
coord->file_name= log_file_name; // initialization basing on what slave remembers
412
if (!mysql_bin_log.is_open())
414
errmsg = "Binary log is not open";
415
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
418
if (!server_id_supplied)
420
errmsg = "Misconfigured master - server id was not set";
421
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
425
name=search_file_name;
427
mysql_bin_log.make_log_name(search_file_name, log_ident);
429
name=0; // Find first log
431
linfo.index_file_offset = 0;
433
if (mysql_bin_log.find_log_pos(&linfo, name, 1))
435
errmsg = "Could not find first log file name in binary log index file";
436
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
440
pthread_mutex_lock(&LOCK_thread_count);
441
thd->current_linfo = &linfo;
442
pthread_mutex_unlock(&LOCK_thread_count);
444
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
446
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
449
if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
451
errmsg= "Client requested master to start replication from \
452
impossible position";
453
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
458
We need to start a packet with something other than 255
459
to distinguish it from error
461
packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
464
Tell the client about the log name with a fake Rotate event;
465
this is needed even if we also send a Format_description_log_event
466
just after, because that event does not contain the binlog's name.
467
Note that as this Rotate event is sent before
468
Format_description_log_event, the slave cannot have any info to
469
understand this event's format, so the header len of
470
Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
471
than other events except FORMAT_DESCRIPTION_EVENT).
472
Before 4.0.14 we called fake_rotate_event below only if (pos ==
473
BIN_LOG_HEADER_SIZE), because if this is false then the slave
474
already knows the binlog's name.
475
Since, we always call fake_rotate_event; if the slave already knew
476
the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
477
useless but does not harm much. It is nice for 3.23 (>=.58) slaves
478
which test Rotate events to see if the master is 4.0 (then they
479
choose to stop because they can't replicate 4.0); by always calling
480
fake_rotate_event we are sure that 3.23.58 and newer will detect the
481
problem as soon as replication starts (BUG#198).
482
Always calling fake_rotate_event makes sending of normal
483
(=from-binlog) Rotate events a priori unneeded, but it is not so
484
simple: the 2 Rotate events are not equivalent, the normal one is
485
before the Stop event, the fake one is after. If we don't send the
486
normal one, then the Stop event will be interpreted (by existing 4.0
487
slaves) as "the master stopped", which is wrong. So for safety,
488
given that we want minimum modification of 4.0, we send the normal
491
if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
494
This error code is not perfect, as fake_rotate_event() does not
495
read anything from the binlog; if it fails it's because of an
496
error in my_net_write(), fortunately it will say so in errmsg.
498
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
501
packet->set("\0", 1, &my_charset_bin);
503
Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
504
this larger than the corresponding packet (query) sent
505
from client to master.
507
thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
510
We can set log_lock now, it does not move (it's a member of
511
mysql_bin_log, and it's already inited, and it will be destroyed
514
log_lock = mysql_bin_log.get_log_lock();
515
if (pos > BIN_LOG_HEADER_SIZE)
518
Try to find a Format_description_log_event at the beginning of
521
if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
524
The packet has offsets equal to the normal offsets in a binlog
525
event +1 (the first character is \0).
527
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
529
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
530
LOG_EVENT_BINLOG_IN_USE_F);
531
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
533
mark that this event with "log_pos=0", so the slave
534
should not increment master's binlog position
535
(rli->group_master_log_pos)
537
int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
539
if reconnect master sends FD event with `created' as 0
540
to avoid destroying temp tables.
542
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
543
ST_CREATED_OFFSET+1, (uint32_t) 0);
545
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
547
errmsg = "Failed on my_net_write()";
548
my_errno= ER_UNKNOWN_ERROR;
553
No need to save this event. We are only doing simple reads
554
(no real parsing of the events) so we don't need it. And so
555
we don't need the artificial Format_description_log_event of
562
if (test_for_non_eof_log_read_errors(error, &errmsg))
565
It's EOF, nothing to do, go on reading next events, the
566
Format_description_log_event will be found naturally if it is written.
569
/* reset the packet as we wrote to it in any case */
570
packet->set("\0", 1, &my_charset_bin);
571
} /* end of if (pos > BIN_LOG_HEADER_SIZE); */
574
/* The Format_description_log_event event will be found naturally. */
577
/* seek to the requested position, to start the requested dump */
578
my_b_seek(&log, pos); // Seek will done on next read
580
while (!net->error && net->vio != 0 && !thd->killed)
582
while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
585
log's filename does not change while it's active
588
coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
590
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
592
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
593
LOG_EVENT_BINLOG_IN_USE_F);
594
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
596
else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
597
binlog_can_be_corrupted= false;
599
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
601
errmsg = "Failed on my_net_write()";
602
my_errno= ER_UNKNOWN_ERROR;
606
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
610
errmsg = "failed in send_file()";
611
my_errno= ER_UNKNOWN_ERROR;
615
packet->set("\0", 1, &my_charset_bin);
619
here we were reading binlog that was not closed properly (as a result
620
of a crash ?). treat any corruption as EOF
622
if (binlog_can_be_corrupted && error != LOG_READ_MEM)
625
TODO: now that we are logging the offset, check to make sure
626
the recorded offset and the actual match.
627
Guilhem 2003-06: this is not true if this master is a slave
628
<4.0.15 running with --log-slave-updates, because then log_pos may
629
be the offset in the-master-of-this-master's binlog.
631
if (test_for_non_eof_log_read_errors(error, &errmsg))
634
if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
635
mysql_bin_log.is_active(log_file_name))
638
Block until there is more data in the log
642
errmsg = "failed on net_flush()";
643
my_errno= ER_UNKNOWN_ERROR;
648
We may have missed the update broadcast from the log
649
that has just happened, let's try to catch it if it did.
650
If we did not miss anything, we just wait for other threads
655
bool read_packet = 0, fatal_error = 0;
658
No one will update the log while we are reading
659
now, but we'll be quick and just read one record
662
Add an counter that is incremented for each time we update the
663
binary log. We can avoid the following read if the counter
664
has not been updated since last read.
667
pthread_mutex_lock(log_lock);
668
switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
670
/* we read successfully, so we'll need to send it to the slave */
671
pthread_mutex_unlock(log_lock);
674
coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
680
if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
682
pthread_mutex_unlock(log_lock);
690
assert(heartbeat_ts && heartbeat_period != 0LL);
691
set_timespec_nsec(*heartbeat_ts, heartbeat_period);
693
ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
694
assert(ret == 0 || (heartbeat_period != 0LL && coord != NULL));
695
if (ret == ETIMEDOUT || ret == ETIME)
697
if (send_heartbeat_event(net, packet, coord))
699
errmsg = "Failed on my_net_write()";
700
my_errno= ER_UNKNOWN_ERROR;
701
pthread_mutex_unlock(log_lock);
709
} while (ret != 0 && coord != NULL && !thd->killed);
710
pthread_mutex_unlock(log_lock);
715
pthread_mutex_unlock(log_lock);
722
thd_proc_info(thd, "Sending binlog event to slave");
723
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
725
errmsg = "Failed on my_net_write()";
726
my_errno= ER_UNKNOWN_ERROR;
730
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
734
errmsg = "failed in send_file()";
735
my_errno= ER_UNKNOWN_ERROR;
739
packet->set("\0", 1, &my_charset_bin);
741
No need to net_flush because we will get to flush later when
742
we hit EOF pretty quick
748
errmsg = "error reading log entry";
749
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
757
bool loop_breaker = 0;
758
/* need this to break out of the for loop from switch */
760
thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
761
switch (mysql_bin_log.find_next_log(&linfo, 1)) {
763
loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
768
errmsg = "could not find next log";
769
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
777
(void) my_close(file, MYF(MY_WME));
780
Call fake_rotate_event() in case the previous log (the one which
781
we have just finished reading) did not contain a Rotate event
782
(for example (I don't know any other example) the previous log
783
was the last one before the master was shutdown & restarted).
784
This way we tell the slave about the new log's name and
785
position. If the binlog is 5.0, the next event we are going to
786
read and send is Format_description_log_event.
788
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
789
fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
792
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
797
packet->append('\0');
799
coord->file_name= log_file_name; // reset to the next
805
(void)my_close(file, MYF(MY_WME));
808
thd_proc_info(thd, "Waiting to finalize termination");
809
pthread_mutex_lock(&LOCK_thread_count);
810
thd->current_linfo = 0;
811
pthread_mutex_unlock(&LOCK_thread_count);
815
thd_proc_info(thd, "Waiting to finalize termination");
818
Exclude iteration through thread list
819
this is needed for purge_logs() - it will iterate through
820
thread list and update thd->current_linfo->index_file_offset
821
this mutex will make sure that it never tried to update our linfo
822
after we return from this stack frame
824
pthread_mutex_lock(&LOCK_thread_count);
825
thd->current_linfo = 0;
826
pthread_mutex_unlock(&LOCK_thread_count);
828
(void) my_close(file, MYF(MY_WME));
830
my_message(my_errno, errmsg, MYF(0));
834
int start_slave(THD* thd , Master_info* mi, bool net_report)
839
lock_slave_threads(mi); // this allows us to cleanly read slave_running
840
// Get a mask of _stopped_ threads
841
init_thread_mask(&thread_mask,mi,1 /* inverse */);
843
Below we will start all stopped threads. But if the user wants to
844
start only one thread, do as if the other thread was running (as we
845
don't wan't to touch the other thread), so set the bit to 0 for the
848
if (thd->lex->slave_thd_opt)
849
thread_mask&= thd->lex->slave_thd_opt;
850
if (thread_mask) //some threads are stopped, start them
852
if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
854
slave_errno=ER_MASTER_INFO;
855
else if (server_id_supplied && *mi->host)
858
If we will start SQL thread we will care about UNTIL options If
859
not and they are specified we will ignore them and warn user
862
if (thread_mask & SLAVE_SQL)
864
pthread_mutex_lock(&mi->rli.data_lock);
866
if (thd->lex->mi.pos)
868
mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
869
mi->rli.until_log_pos= thd->lex->mi.pos;
871
We don't check thd->lex->mi.log_file_name for NULL here
872
since it is checked in sql_yacc.yy
874
strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
875
sizeof(mi->rli.until_log_name)-1);
877
else if (thd->lex->mi.relay_log_pos)
879
mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
880
mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
881
strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
882
sizeof(mi->rli.until_log_name)-1);
885
mi->rli.clear_until_condition();
887
if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
889
/* Preparing members for effective until condition checking */
890
const char *p= fn_ext(mi->rli.until_log_name);
895
mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
897
p_end points to the first invalid character. If it equals
898
to p, no digits were found, error. If it contains '\0' it
899
means conversion went ok.
901
if (p_end==p || *p_end)
902
slave_errno=ER_BAD_SLAVE_UNTIL_COND;
905
slave_errno=ER_BAD_SLAVE_UNTIL_COND;
907
/* mark the cached result of the UNTIL comparison as "undefined" */
908
mi->rli.until_log_names_cmp_result=
909
Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
911
/* Issuing warning then started without --skip-slave-start */
912
if (!opt_skip_slave_start)
913
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
914
ER_MISSING_SKIP_SLAVE,
915
ER(ER_MISSING_SKIP_SLAVE));
918
pthread_mutex_unlock(&mi->rli.data_lock);
920
else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
921
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
922
ER(ER_UNTIL_COND_IGNORED));
925
slave_errno = start_slave_threads(0 /*no mutex */,
926
1 /* wait for start */,
928
master_info_file,relay_log_info_file,
932
slave_errno = ER_BAD_SLAVE;
936
/* no error if all threads are already started, only a warning */
937
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
938
ER(ER_SLAVE_WAS_RUNNING));
941
unlock_slave_threads(mi);
946
my_message(slave_errno, ER(slave_errno), MYF(0));
956
int stop_slave(THD* thd, Master_info* mi, bool net_report )
962
thd_proc_info(thd, "Killing slave");
964
lock_slave_threads(mi);
965
// Get a mask of _running_ threads
966
init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
968
Below we will stop all running threads.
969
But if the user wants to stop only one thread, do as if the other thread
970
was stopped (as we don't wan't to touch the other thread), so set the
971
bit to 0 for the other thread
973
if (thd->lex->slave_thd_opt)
974
thread_mask &= thd->lex->slave_thd_opt;
978
slave_errno= terminate_slave_threads(mi,thread_mask,
983
//no error if both threads are already stopped, only a warning
985
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
986
ER(ER_SLAVE_WAS_NOT_RUNNING));
988
unlock_slave_threads(mi);
989
thd_proc_info(thd, 0);
994
my_message(slave_errno, ER(slave_errno), MYF(0));
1005
Remove all relay logs and start replication from the start
1010
mi Master info for the slave
1018
int reset_slave(THD *thd, Master_info* mi)
1020
struct stat stat_area;
1021
char fname[FN_REFLEN];
1022
int thread_mask= 0, error= 0;
1024
const char* errmsg=0;
1026
lock_slave_threads(mi);
1027
init_thread_mask(&thread_mask,mi,0 /* not inverse */);
1028
if (thread_mask) // We refuse if any slave thread is running
1030
sql_errno= ER_SLAVE_MUST_STOP;
1035
ha_reset_slave(thd);
1037
// delete relay logs, clear relay log coordinates
1038
if ((error= purge_relay_logs(&mi->rli, thd,
1043
/* Clear master's log coordinates */
1044
init_master_log_pos(mi);
1046
Reset errors (the idea is that we forget about the
1049
mi->rli.clear_error();
1050
mi->rli.clear_until_condition();
1052
// close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1053
end_master_info(mi);
1054
// and delete these two files
1055
fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
1056
if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1061
// delete relay_log_info_file
1062
fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
1063
if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1070
unlock_slave_threads(mi);
1072
my_error(sql_errno, MYF(0), errmsg);
1078
Kill all Binlog_dump threads which previously talked to the same slave
1079
("same" means with the same server id). Indeed, if the slave stops, if the
1080
Binlog_dump thread is waiting (pthread_cond_wait) for binlog update, then it
1081
will keep existing until a query is written to the binlog. If the master is
1082
idle, then this could last long, and if the slave reconnects, we could have 2
1083
Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
1084
binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
1085
the master kills any existing thread with the slave's server id (if this id is
1086
not zero; it will be true for real slaves, but false for mysqlbinlog when it
1087
sends COM_BINLOG_DUMP to get a remote binlog dump).
1090
kill_zombie_dump_threads()
1091
slave_server_id the slave's server id
1096
void kill_zombie_dump_threads(uint32_t slave_server_id)
1098
pthread_mutex_lock(&LOCK_thread_count);
1099
I_List_iterator<THD> it(threads);
1104
if (tmp->command == COM_BINLOG_DUMP &&
1105
tmp->server_id == slave_server_id)
1107
pthread_mutex_lock(&tmp->LOCK_delete); // Lock from delete
1111
pthread_mutex_unlock(&LOCK_thread_count);
1115
Here we do not call kill_one_thread() as
1116
it will be slow because it will iterate through the list
1117
again. We just to do kill the thread ourselves.
1119
tmp->awake(THD::KILL_QUERY);
1120
pthread_mutex_unlock(&tmp->LOCK_delete);
1125
bool change_master(THD* thd, Master_info* mi)
1128
const char* errmsg= 0;
1129
bool need_relay_log_purge= 1;
1131
lock_slave_threads(mi);
1132
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
1133
if (thread_mask) // We refuse if any slave thread is running
1135
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1136
unlock_slave_threads(mi);
1140
thd_proc_info(thd, "Changing master");
1141
LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1142
// TODO: see if needs re-write
1143
if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
1146
my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1147
unlock_slave_threads(mi);
1152
Data lock not needed since we have already stopped the running threads,
1153
and we have the hold on the run locks which will keep all threads that
1154
could possibly modify the data structures from running
1158
If the user specified host or port without binlog or position,
1159
reset binlog's name to FIRST and position to 4.
1162
if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1164
mi->master_log_name[0] = 0;
1165
mi->master_log_pos= BIN_LOG_HEADER_SIZE;
1168
if (lex_mi->log_file_name)
1169
strmake(mi->master_log_name, lex_mi->log_file_name,
1170
sizeof(mi->master_log_name)-1);
1173
mi->master_log_pos= lex_mi->pos;
1177
strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1179
strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
1180
if (lex_mi->password)
1181
strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
1183
mi->port = lex_mi->port;
1184
if (lex_mi->connect_retry)
1185
mi->connect_retry = lex_mi->connect_retry;
1186
if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1187
mi->heartbeat_period = lex_mi->heartbeat_period;
1189
mi->heartbeat_period= (float) min((double)SLAVE_MAX_HEARTBEAT_PERIOD,
1190
(slave_net_timeout/2.0));
1191
mi->received_heartbeats= 0LL; // counter lives until master is CHANGEd
1192
if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1193
mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
1195
if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1196
mi->ssl_verify_server_cert=
1197
(lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
1200
strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
1201
if (lex_mi->ssl_capath)
1202
strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
1203
if (lex_mi->ssl_cert)
1204
strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
1205
if (lex_mi->ssl_cipher)
1206
strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
1207
if (lex_mi->ssl_key)
1208
strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
1209
if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
1210
lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
1211
lex_mi->ssl_verify_server_cert )
1212
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1213
ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1215
if (lex_mi->relay_log_name)
1217
need_relay_log_purge= 0;
1218
strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
1219
sizeof(mi->rli.group_relay_log_name)-1);
1220
strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
1221
sizeof(mi->rli.event_relay_log_name)-1);
1224
if (lex_mi->relay_log_pos)
1226
need_relay_log_purge= 0;
1227
mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
1231
If user did specify neither host nor port nor any log name nor any log
1232
pos, i.e. he specified only user/password/master_connect_retry, he probably
1233
wants replication to resume from where it had left, i.e. from the
1234
coordinates of the **SQL** thread (imagine the case where the I/O is ahead
1235
of the SQL; restarting from the coordinates of the I/O would lose some
1236
events which is probably unwanted when you are just doing minor changes
1237
like changing master_connect_retry).
1238
A side-effect is that if only the I/O thread was started, this thread may
1239
restart from ''/4 after the CHANGE MASTER. That's a minor problem (it is a
1240
much more unlikely situation than the one we are fixing here).
1241
Note: coordinates of the SQL thread must be read here, before the
1242
'if (need_relay_log_purge)' block which resets them.
1244
if (!lex_mi->host && !lex_mi->port &&
1245
!lex_mi->log_file_name && !lex_mi->pos &&
1246
need_relay_log_purge)
1249
Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1250
not initialized), so we use a max().
1251
What happens to mi->rli.master_log_pos during the initialization stages
1252
of replication is not 100% clear, so we guard against problems using
1255
mi->master_log_pos = ((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1256
? BIN_LOG_HEADER_SIZE
1257
: mi->rli.group_master_log_pos);
1258
strmake(mi->master_log_name, mi->rli.group_master_log_name,
1259
sizeof(mi->master_log_name)-1);
1262
Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1265
if (flush_master_info(mi, 0))
1267
my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1268
unlock_slave_threads(mi);
1271
if (need_relay_log_purge)
1274
thd_proc_info(thd, "Purging old relay logs");
1275
if (purge_relay_logs(&mi->rli, thd,
1276
0 /* not only reset, but also reinit */,
1279
my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
1280
unlock_slave_threads(mi);
1288
/* Relay log is already initialized */
1289
if (init_relay_log_pos(&mi->rli,
1290
mi->rli.group_relay_log_name,
1291
mi->rli.group_relay_log_pos,
1295
my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
1296
unlock_slave_threads(mi);
1301
Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block,
1302
so restore them to good values. If we left them to ''/0, that would work;
1303
but that would fail in the case of 2 successive CHANGE MASTER (without a
1304
START SLAVE in between): because first one would set the coords in mi to
1305
the good values of those in rli, the set those in rli to ''/0, then
1306
second CHANGE MASTER would set the coords in mi to those of rli, i.e. to
1307
''/0: we have lost all copies of the original good coordinates.
1308
That's why we always save good coords in rli.
1310
mi->rli.group_master_log_pos= mi->master_log_pos;
1311
strmake(mi->rli.group_master_log_name,mi->master_log_name,
1312
sizeof(mi->rli.group_master_log_name)-1);
1314
if (!mi->rli.group_master_log_name[0]) // uninitialized case
1315
mi->rli.group_master_log_pos=0;
1317
pthread_mutex_lock(&mi->rli.data_lock);
1318
mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1319
/* Clear the errors, for a clean start */
1320
mi->rli.clear_error();
1321
mi->rli.clear_until_condition();
1323
If we don't write new coordinates to disk now, then old will remain in
1324
relay-log.info until START SLAVE is issued; but if mysqld is shutdown
1325
before START SLAVE, then old will remain in relay-log.info, and will be the
1326
in-memory value at restart (thus causing errors, as the old relay log does
1329
flush_relay_log_info(&mi->rli);
1330
pthread_cond_broadcast(&mi->data_cond);
1331
pthread_mutex_unlock(&mi->rli.data_lock);
1333
unlock_slave_threads(mi);
1334
thd_proc_info(thd, 0);
1339
int reset_master(THD* thd)
1341
if (!mysql_bin_log.is_open())
1343
my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
1344
ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
1347
return mysql_bin_log.reset_logs(thd);
1350
int cmp_master_pos(const char* log_file_name1, uint64_t log_pos1,
1351
const char* log_file_name2, uint64_t log_pos2)
1354
uint log_file_name1_len= strlen(log_file_name1);
1355
uint log_file_name2_len= strlen(log_file_name2);
1357
// We assume that both log names match up to '.'
1358
if (log_file_name1_len == log_file_name2_len)
1360
if ((res= strcmp(log_file_name1, log_file_name2)))
1362
return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
1364
return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
1368
bool mysql_show_binlog_events(THD* thd)
1370
Protocol *protocol= thd->protocol;
1371
List<Item> field_list;
1372
const char *errmsg= 0;
1377
Log_event::init_show_field_list(&field_list);
1378
if (protocol->send_fields(&field_list,
1379
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1382
Format_description_log_event *description_event= new
1383
Format_description_log_event(3); /* MySQL 4.0 by default */
1386
Wait for handlers to insert any pending information
1387
into the binlog. For e.g. ndb which updates the binlog asynchronously
1388
this is needed so that the uses sees all its own commands in the binlog
1390
ha_binlog_wait(thd);
1392
if (mysql_bin_log.is_open())
1394
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1395
SELECT_LEX_UNIT *unit= &thd->lex->unit;
1396
ha_rows event_count, limit_start, limit_end;
1397
my_off_t pos = max((uint64_t)BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1398
char search_file_name[FN_REFLEN], *name;
1399
const char *log_file_name = lex_mi->log_file_name;
1400
pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1404
unit->set_limit(thd->lex->current_select);
1405
limit_start= unit->offset_limit_cnt;
1406
limit_end= unit->select_limit_cnt;
1408
name= search_file_name;
1410
mysql_bin_log.make_log_name(search_file_name, log_file_name);
1412
name=0; // Find first log
1414
linfo.index_file_offset = 0;
1416
if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1418
errmsg = "Could not find target log";
1422
pthread_mutex_lock(&LOCK_thread_count);
1423
thd->current_linfo = &linfo;
1424
pthread_mutex_unlock(&LOCK_thread_count);
1426
if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
1430
to account binlog event header size
1432
thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
1434
pthread_mutex_lock(log_lock);
1437
open_binlog() sought to position 4.
1438
Read the first event in case it's a Format_description_log_event, to
1439
know the format. If there's no such event, we are 3.23 or 4.x. This
1440
code, like before, can't read 3.23 binlogs.
1441
This code will fail on a mixed relay log (one which has Format_desc then
1442
Rotate then Format_desc).
1444
ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
1447
if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
1449
delete description_event;
1450
description_event= (Format_description_log_event*) ev;
1456
my_b_seek(&log, pos);
1458
if (!description_event->is_valid())
1460
errmsg="Invalid Format_description event; could be out of memory";
1464
for (event_count = 0;
1465
(ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
1466
description_event)); )
1468
if (event_count >= limit_start &&
1469
ev->net_send(protocol, linfo.log_file_name, pos))
1471
errmsg = "Net error";
1473
pthread_mutex_unlock(log_lock);
1477
pos = my_b_tell(&log);
1480
if (++event_count >= limit_end)
1484
if (event_count < limit_end && log.error)
1486
errmsg = "Wrong offset or I/O error";
1487
pthread_mutex_unlock(log_lock);
1491
pthread_mutex_unlock(log_lock);
1497
delete description_event;
1501
(void) my_close(file, MYF(MY_WME));
1505
my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
1506
"SHOW BINLOG EVENTS", errmsg);
1510
pthread_mutex_lock(&LOCK_thread_count);
1511
thd->current_linfo = 0;
1512
pthread_mutex_unlock(&LOCK_thread_count);
1517
bool show_binlog_info(THD* thd)
1519
Protocol *protocol= thd->protocol;
1520
List<Item> field_list;
1521
field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1522
field_list.push_back(new Item_return_int("Position",20,
1523
DRIZZLE_TYPE_LONGLONG));
1524
field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
1525
field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
1527
if (protocol->send_fields(&field_list,
1528
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1530
protocol->prepare_for_resend();
1532
if (mysql_bin_log.is_open())
1535
mysql_bin_log.get_current_log(&li);
1536
int dir_len = dirname_length(li.log_file_name);
1537
protocol->store(li.log_file_name + dir_len, &my_charset_bin);
1538
protocol->store((uint64_t) li.pos);
1539
protocol->store(binlog_filter->get_do_db());
1540
protocol->store(binlog_filter->get_ignore_db());
1541
if (protocol->write())
1550
Send a list of all binary logs to client
1554
thd Thread specific variable
1561
bool show_binlogs(THD* thd)
1563
IO_CACHE *index_file;
1566
char fname[FN_REFLEN];
1567
List<Item> field_list;
1570
Protocol *protocol= thd->protocol;
1572
if (!mysql_bin_log.is_open())
1574
my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0));
1578
field_list.push_back(new Item_empty_string("Log_name", 255));
1579
field_list.push_back(new Item_return_int("File_size", 20,
1580
DRIZZLE_TYPE_LONGLONG));
1581
if (protocol->send_fields(&field_list,
1582
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1585
pthread_mutex_lock(mysql_bin_log.get_log_lock());
1586
mysql_bin_log.lock_index();
1587
index_file=mysql_bin_log.get_index_file();
1589
mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
1590
pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
1592
cur_dir_len= dirname_length(cur.log_file_name);
1594
reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
1596
/* The file ends with EOF or empty line */
1597
while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
1600
uint64_t file_length= 0; // Length if open fails
1601
fname[--length] = '\0'; // remove the newline
1603
protocol->prepare_for_resend();
1604
dir_len= dirname_length(fname);
1606
protocol->store(fname + dir_len, length, &my_charset_bin);
1608
if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
1609
file_length= cur.pos; /* The active log, use the active position */
1612
/* this is an old log, open it and find the size */
1613
if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
1616
file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));
1617
my_close(file, MYF(0));
1620
protocol->store(file_length);
1621
if (protocol->write())
1624
mysql_bin_log.unlock_index();
1629
mysql_bin_log.unlock_index();
1634
Load data's io cache specific hook to be executed
1635
before a chunk of data is being read into the cache's buffer
1636
The fuction instantianates and writes into the binlog
1637
replication events along LOAD DATA processing.
1639
@param file pointer to io-cache
1642
int log_loaded_block(IO_CACHE* file)
1644
LOAD_FILE_INFO *lf_info;
1646
/* buffer contains position where we started last read */
1647
uchar* buffer= (uchar*) my_b_get_buffer_start(file);
1648
uint max_event_size= current_thd->variables.max_allowed_packet;
1649
lf_info= (LOAD_FILE_INFO*) file->arg;
1650
if (lf_info->thd->current_stmt_binlog_row_based)
1652
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1653
lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1656
for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1657
buffer += min(block_len, max_event_size),
1658
block_len -= min(block_len, max_event_size))
1660
lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1661
if (lf_info->wrote_create_file)
1663
Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
1664
min(block_len, max_event_size),
1665
lf_info->log_delayed);
1666
mysql_bin_log.write(&a);
1670
Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1672
min(block_len, max_event_size),
1673
lf_info->log_delayed);
1674
mysql_bin_log.write(&b);
1675
lf_info->wrote_create_file= 1;
1682
Replication System Variables
1685
class sys_var_slave_skip_counter :public sys_var
1688
sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
1690
{ chain_sys_var(chain); }
1691
bool check(THD *thd, set_var *var);
1692
bool update(THD *thd, set_var *var);
1693
bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
1695
We can't retrieve the value of this, so we don't have to define
1696
type() or value_ptr()
1700
class sys_var_sync_binlog_period :public sys_var_long_ptr
1703
sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
1705
:sys_var_long_ptr(chain, name_arg,value_ptr) {}
1706
bool update(THD *thd, set_var *var);
1709
static void fix_slave_net_timeout(THD *thd,
1710
enum_var_type type __attribute__((unused)))
1712
#ifdef HAVE_REPLICATION
1713
pthread_mutex_lock(&LOCK_active_mi);
1714
if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1715
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1716
ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1717
"The currect value for master_heartbeat_period"
1718
" exceeds the new value of `slave_net_timeout' sec."
1719
" A sensible value for the period should be"
1720
" less than the timeout.");
1721
pthread_mutex_unlock(&LOCK_active_mi);
1726
static sys_var_chain vars = { NULL, NULL };
1728
static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
1730
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
1732
fix_slave_net_timeout);
1733
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
1734
&slave_trans_retries);
1735
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
1736
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
1738
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
1741
static int show_slave_skip_errors(THD *thd __attribute__((unused)),
1742
SHOW_VAR *var, char *buff)
1744
var->type=SHOW_CHAR;
1746
if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
1748
var->value= const_cast<char *>("OFF");
1750
else if (bitmap_is_set_all(&slave_error_mask))
1752
var->value= const_cast<char *>("ALL");
1756
/* 10 is enough assuming errors are max 4 digits */
1760
i < MAX_SLAVE_ERROR &&
1761
(buff - var->value) < SHOW_VAR_FUNC_BUFF_SIZE;
1764
if (bitmap_is_set(&slave_error_mask, i))
1766
buff= int10_to_str(i, buff, 10);
1770
if (var->value != buff)
1771
buff--; // Remove last ','
1772
if (i < MAX_SLAVE_ERROR)
1773
buff= stpcpy(buff, "..."); // Couldn't show all errors
1779
static st_show_var_func_container
1780
show_slave_skip_errors_cont = { &show_slave_skip_errors };
1783
static SHOW_VAR fixed_vars[]= {
1784
{"log_slave_updates", (char*) &opt_log_slave_updates, SHOW_MY_BOOL},
1785
{"relay_log" , (char*) &opt_relay_logname, SHOW_CHAR_PTR},
1786
{"relay_log_index", (char*) &opt_relaylog_index_name, SHOW_CHAR_PTR},
1787
{"relay_log_info_file", (char*) &relay_log_info_file, SHOW_CHAR_PTR},
1788
{"relay_log_space_limit", (char*) &relay_log_space_limit, SHOW_LONGLONG},
1789
{"slave_load_tmpdir", (char*) &slave_load_tmpdir, SHOW_CHAR_PTR},
1790
{"slave_skip_errors", (char*) &show_slave_skip_errors_cont, SHOW_FUNC},
1793
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((unused)),
1797
pthread_mutex_lock(&LOCK_active_mi);
1798
pthread_mutex_lock(&active_mi->rli.run_lock);
1799
if (active_mi->rli.slave_running)
1801
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1804
pthread_mutex_unlock(&active_mi->rli.run_lock);
1805
pthread_mutex_unlock(&LOCK_active_mi);
1806
var->save_result.ulong_value= (ulong) var->value->val_int();
1811
bool sys_var_slave_skip_counter::update(THD *thd __attribute__((unused)),
1814
pthread_mutex_lock(&LOCK_active_mi);
1815
pthread_mutex_lock(&active_mi->rli.run_lock);
1817
The following test should normally never be true as we test this
1818
in the check function; To be safe against multiple
1819
SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
1821
if (!active_mi->rli.slave_running)
1823
pthread_mutex_lock(&active_mi->rli.data_lock);
1824
active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
1825
pthread_mutex_unlock(&active_mi->rli.data_lock);
1827
pthread_mutex_unlock(&active_mi->rli.run_lock);
1828
pthread_mutex_unlock(&LOCK_active_mi);
1833
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((unused)),
1836
sync_binlog_period= (uint32_t) var->save_result.uint64_t_value;
1840
int init_replication_sys_vars()
1842
mysql_append_static_vars(fixed_vars, sizeof(fixed_vars) / sizeof(SHOW_VAR));
1844
if (mysql_add_sys_var_chain(vars.first, my_long_options))
1846
/* should not happen */
1847
fprintf(stderr, "failed to initialize replication system variables");
1853
#endif /* HAVE_REPLICATION */