1
/* Copyright (C) 2000-2006 MySQL AB & Sasha
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include "mysql_priv.h"
17
#ifdef HAVE_REPLICATION
21
#include "log_event.h"
22
#include "rpl_filter.h"
25
int max_binlog_dump_events = 0; // unlimited
26
my_bool opt_sporadic_binlog_dump_fail = 0;
28
static int binlog_dump_count = 0;
32
fake_rotate_event() builds a fake (=which does not exist physically in any
33
binlog) Rotate event, which contains the name of the binlog we are going to
34
send to the slave (because the slave may not know it if it just asked for
35
MASTER_LOG_FILE='', MASTER_LOG_POS=4).
36
< 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
37
After this version we always call it, so that a 3.23.58 slave can rely on
38
it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
39
zeros in the good positions which, by chance, make it possible for the 3.23
40
slave to detect that this event is unexpected) (this is luck which happens
41
because the master and slave disagree on the size of the header of
44
Relying on the event length of the Rotate event instead of these
45
well-placed zeros was not possible as Rotate events have a variable-length
49
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
50
ulonglong position, const char** errmsg)
52
DBUG_ENTER("fake_rotate_event");
53
char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
55
'when' (the timestamp) is set to 0 so that slave could distinguish between
56
real and fake Rotate events (if necessary)
59
header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
61
char* p = log_file_name+dirname_length(log_file_name);
62
uint ident_len = (uint) strlen(p);
63
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
64
int4store(header + SERVER_ID_OFFSET, server_id);
65
int4store(header + EVENT_LEN_OFFSET, event_len);
66
int2store(header + FLAGS_OFFSET, 0);
68
// TODO: check what problems this may cause and fix them
69
int4store(header + LOG_POS_OFFSET, 0);
71
packet->append(header, sizeof(header));
72
int8store(buf+R_POS_OFFSET,position);
73
packet->append(buf, ROTATE_HEADER_LEN);
74
packet->append(p,ident_len);
75
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
77
*errmsg = "failed on my_net_write()";
83
static int send_file(THD *thd)
86
int fd = -1, error = 1;
88
char fname[FN_REFLEN+1];
89
const char *errmsg = 0;
91
unsigned long packet_len;
92
uchar buf[IO_SIZE]; // It's safe to alloc this
93
DBUG_ENTER("send_file");
96
The client might be slow loading the data, give him wait_timeout to do
99
old_timeout= net->read_timeout;
100
my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
103
We need net_flush here because the client will not know it needs to send
104
us the file name until it has processed the load event entry
106
if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
108
errmsg = "while reading file name";
112
// terminate with \0 for fn_format
113
*((char*)net->read_pos + packet_len) = 0;
114
fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
115
// this is needed to make replicate-ignore-db
116
if (!strcmp(fname,"/dev/null"))
119
if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
121
errmsg = "on open of file";
125
while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0)
127
if (my_net_write(net, buf, bytes))
129
errmsg = "while writing data to client";
135
if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
136
(my_net_read(net) == packet_error))
138
errmsg = "while negotiating file transfer close";
144
my_net_set_read_timeout(net, old_timeout);
146
(void) my_close(fd, MYF(0));
149
sql_print_error("Failed in send_file() %s", errmsg);
150
DBUG_PRINT("error", (errmsg));
157
Adjust the position pointer in the binary log file for all running slaves
160
adjust_linfo_offsets()
161
purge_offset Number of bytes removed from start of log index file
164
- This is called when doing a PURGE when we delete lines from the
168
- Before calling this function, we have to ensure that no threads are
169
using any binary log file before purge_offset.a
172
- Inform the slave threads that they should sync the position
173
in the binary log file with flush_relay_log_info.
174
Now they sync is done for next read.
177
void adjust_linfo_offsets(my_off_t purge_offset)
181
pthread_mutex_lock(&LOCK_thread_count);
182
I_List_iterator<THD> it(threads);
187
if ((linfo = tmp->current_linfo))
189
pthread_mutex_lock(&linfo->lock);
191
Index file offset can be less that purge offset only if
192
we just started reading the index file. In that case
193
we have nothing to adjust
195
if (linfo->index_file_offset < purge_offset)
196
linfo->fatal = (linfo->index_file_offset != 0);
198
linfo->index_file_offset -= purge_offset;
199
pthread_mutex_unlock(&linfo->lock);
202
pthread_mutex_unlock(&LOCK_thread_count);
206
bool log_in_use(const char* log_name)
208
int log_name_len = strlen(log_name) + 1;
212
pthread_mutex_lock(&LOCK_thread_count);
213
I_List_iterator<THD> it(threads);
218
if ((linfo = tmp->current_linfo))
220
pthread_mutex_lock(&linfo->lock);
221
result = !bcmp((uchar*) log_name, (uchar*) linfo->log_file_name,
223
pthread_mutex_unlock(&linfo->lock);
229
pthread_mutex_unlock(&LOCK_thread_count);
233
bool purge_error_message(THD* thd, int res)
239
case LOG_INFO_EOF: errmsg= ER_UNKNOWN_TARGET_BINLOG; break;
240
case LOG_INFO_IO: errmsg= ER_IO_ERR_LOG_INDEX_READ; break;
241
case LOG_INFO_INVALID:errmsg= ER_BINLOG_PURGE_PROHIBITED; break;
242
case LOG_INFO_SEEK: errmsg= ER_FSEEK_FAIL; break;
243
case LOG_INFO_MEM: errmsg= ER_OUT_OF_RESOURCES; break;
244
case LOG_INFO_FATAL: errmsg= ER_BINLOG_PURGE_FATAL_ERR; break;
245
case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break;
246
case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break;
247
default: errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break;
252
my_message(errmsg, ER(errmsg), MYF(0));
260
bool purge_master_logs(THD* thd, const char* to_log)
262
char search_file_name[FN_REFLEN];
263
if (!mysql_bin_log.is_open())
269
mysql_bin_log.make_log_name(search_file_name, to_log);
270
return purge_error_message(thd,
271
mysql_bin_log.purge_logs(search_file_name, 0, 1,
276
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
278
if (!mysql_bin_log.is_open())
283
return purge_error_message(thd,
284
mysql_bin_log.purge_logs_before_date(purge_time));
287
int test_for_non_eof_log_read_errors(int error, const char **errmsg)
289
if (error == LOG_READ_EOF)
291
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
294
*errmsg = "bogus data in log event";
296
case LOG_READ_TOO_LARGE:
297
*errmsg = "log event entry exceeded max_allowed_packet; \
298
Increase max_allowed_packet on master";
301
*errmsg = "I/O error reading log event";
304
*errmsg = "memory allocation failed reading log event";
307
*errmsg = "binlog truncated in the middle of event";
310
*errmsg = "unknown error reading log event on the master";
318
An auxiliary function for calling in mysql_binlog_send
319
to initialize the heartbeat timeout in waiting for a binlogged event.
321
@param[in] thd THD to access a user variable
323
@return heartbeat period an ulonglong of nanoseconds
324
or zero if heartbeat was not demanded by slave
326
static ulonglong get_heartbeat_period(THD * thd)
329
LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")};
330
user_var_entry *entry=
331
(user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
333
return entry? entry->val_int(&null_value) : 0;
337
Function prepares and sends repliation heartbeat event.
339
@param net net object of THD
340
@param packet buffer to store the heartbeat instance
341
@param event_coordinates binlog file name and position of the last
342
real event master sent from binlog
345
Among three essential pieces of heartbeat data Log_event::when
347
The error to send is serious and should force terminating
350
static int send_heartbeat_event(NET* net, String* packet,
351
const struct event_coordinates *coord)
353
DBUG_ENTER("send_heartbeat_event");
354
char header[LOG_EVENT_HEADER_LEN];
356
'when' (the timestamp) is set to 0 so that slave could distinguish between
357
real and fake Rotate events (if necessary)
359
memset(header, 0, 4); // when
361
header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
363
char* p= coord->file_name + dirname_length(coord->file_name);
365
uint ident_len = strlen(p);
366
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
367
int4store(header + SERVER_ID_OFFSET, server_id);
368
int4store(header + EVENT_LEN_OFFSET, event_len);
369
int2store(header + FLAGS_OFFSET, 0);
371
int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos
373
packet->append(header, sizeof(header));
374
packet->append(p, ident_len); // log_file_name
376
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
381
packet->set("\0", 1, &my_charset_bin);
386
TODO: Clean up loop to only have one call to send_file()
389
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
393
char *log_file_name = linfo.log_file_name;
394
char search_file_name[FN_REFLEN], *name;
397
String* packet = &thd->packet;
399
const char *errmsg = "Unknown error";
400
NET* net = &thd->net;
401
pthread_mutex_t *log_lock;
402
bool binlog_can_be_corrupted= FALSE;
404
int left_events = max_binlog_dump_events;
406
DBUG_ENTER("mysql_binlog_send");
407
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
409
bzero((char*) &log,sizeof(log));
411
heartbeat_period from @master_heartbeat_period user variable
413
ulonglong heartbeat_period= get_heartbeat_period(thd);
414
struct timespec heartbeat_buf;
415
struct event_coordinates coord_buf;
416
struct timespec *heartbeat_ts= NULL;
417
struct event_coordinates *coord= NULL;
418
if (heartbeat_period != LL(0))
420
heartbeat_ts= &heartbeat_buf;
421
set_timespec_nsec(*heartbeat_ts, 0);
423
coord->file_name= log_file_name; // initialization basing on what slave remembers
427
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
429
errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover";
430
my_errno= ER_UNKNOWN_ERROR;
435
if (!mysql_bin_log.is_open())
437
errmsg = "Binary log is not open";
438
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
441
if (!server_id_supplied)
443
errmsg = "Misconfigured master - server id was not set";
444
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
448
name=search_file_name;
450
mysql_bin_log.make_log_name(search_file_name, log_ident);
452
name=0; // Find first log
454
linfo.index_file_offset = 0;
456
if (mysql_bin_log.find_log_pos(&linfo, name, 1))
458
errmsg = "Could not find first log file name in binary log index file";
459
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
463
pthread_mutex_lock(&LOCK_thread_count);
464
thd->current_linfo = &linfo;
465
pthread_mutex_unlock(&LOCK_thread_count);
467
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
469
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
472
if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
474
errmsg= "Client requested master to start replication from \
475
impossible position";
476
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
481
We need to start a packet with something other than 255
482
to distinguish it from error
484
packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
487
Tell the client about the log name with a fake Rotate event;
488
this is needed even if we also send a Format_description_log_event
489
just after, because that event does not contain the binlog's name.
490
Note that as this Rotate event is sent before
491
Format_description_log_event, the slave cannot have any info to
492
understand this event's format, so the header len of
493
Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
494
than other events except FORMAT_DESCRIPTION_EVENT).
495
Before 4.0.14 we called fake_rotate_event below only if (pos ==
496
BIN_LOG_HEADER_SIZE), because if this is false then the slave
497
already knows the binlog's name.
498
Since, we always call fake_rotate_event; if the slave already knew
499
the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
500
useless but does not harm much. It is nice for 3.23 (>=.58) slaves
501
which test Rotate events to see if the master is 4.0 (then they
502
choose to stop because they can't replicate 4.0); by always calling
503
fake_rotate_event we are sure that 3.23.58 and newer will detect the
504
problem as soon as replication starts (BUG#198).
505
Always calling fake_rotate_event makes sending of normal
506
(=from-binlog) Rotate events a priori unneeded, but it is not so
507
simple: the 2 Rotate events are not equivalent, the normal one is
508
before the Stop event, the fake one is after. If we don't send the
509
normal one, then the Stop event will be interpreted (by existing 4.0
510
slaves) as "the master stopped", which is wrong. So for safety,
511
given that we want minimum modification of 4.0, we send the normal
514
if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
517
This error code is not perfect, as fake_rotate_event() does not
518
read anything from the binlog; if it fails it's because of an
519
error in my_net_write(), fortunately it will say so in errmsg.
521
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
524
packet->set("\0", 1, &my_charset_bin);
526
Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
527
this larger than the corresponding packet (query) sent
528
from client to master.
530
thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
533
We can set log_lock now, it does not move (it's a member of
534
mysql_bin_log, and it's already inited, and it will be destroyed
537
log_lock = mysql_bin_log.get_log_lock();
538
if (pos > BIN_LOG_HEADER_SIZE)
541
Try to find a Format_description_log_event at the beginning of
544
if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
547
The packet has offsets equal to the normal offsets in a binlog
548
event +1 (the first character is \0).
551
("Looked for a Format_description_log_event, found event type %d",
552
(*packet)[EVENT_TYPE_OFFSET+1]));
553
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
555
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
556
LOG_EVENT_BINLOG_IN_USE_F);
557
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
559
mark that this event with "log_pos=0", so the slave
560
should not increment master's binlog position
561
(rli->group_master_log_pos)
563
int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
565
if reconnect master sends FD event with `created' as 0
566
to avoid destroying temp tables.
568
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
569
ST_CREATED_OFFSET+1, (ulong) 0);
571
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
573
errmsg = "Failed on my_net_write()";
574
my_errno= ER_UNKNOWN_ERROR;
579
No need to save this event. We are only doing simple reads
580
(no real parsing of the events) so we don't need it. And so
581
we don't need the artificial Format_description_log_event of
588
if (test_for_non_eof_log_read_errors(error, &errmsg))
591
It's EOF, nothing to do, go on reading next events, the
592
Format_description_log_event will be found naturally if it is written.
595
/* reset the packet as we wrote to it in any case */
596
packet->set("\0", 1, &my_charset_bin);
597
} /* end of if (pos > BIN_LOG_HEADER_SIZE); */
600
/* The Format_description_log_event event will be found naturally. */
603
/* seek to the requested position, to start the requested dump */
604
my_b_seek(&log, pos); // Seek will done on next read
606
while (!net->error && net->vio != 0 && !thd->killed)
608
while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
611
if (max_binlog_dump_events && !left_events--)
614
errmsg = "Debugging binlog dump abort";
615
my_errno= ER_UNKNOWN_ERROR;
620
log's filename does not change while it's active
623
coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
625
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
627
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
628
LOG_EVENT_BINLOG_IN_USE_F);
629
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
631
else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
632
binlog_can_be_corrupted= FALSE;
634
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
636
errmsg = "Failed on my_net_write()";
637
my_errno= ER_UNKNOWN_ERROR;
641
DBUG_PRINT("info", ("log event code %d",
642
(*packet)[LOG_EVENT_OFFSET+1] ));
643
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
647
errmsg = "failed in send_file()";
648
my_errno= ER_UNKNOWN_ERROR;
652
packet->set("\0", 1, &my_charset_bin);
656
here we were reading binlog that was not closed properly (as a result
657
of a crash ?). treat any corruption as EOF
659
if (binlog_can_be_corrupted && error != LOG_READ_MEM)
662
TODO: now that we are logging the offset, check to make sure
663
the recorded offset and the actual match.
664
Guilhem 2003-06: this is not true if this master is a slave
665
<4.0.15 running with --log-slave-updates, because then log_pos may
666
be the offset in the-master-of-this-master's binlog.
668
if (test_for_non_eof_log_read_errors(error, &errmsg))
671
if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
672
mysql_bin_log.is_active(log_file_name))
675
Block until there is more data in the log
679
errmsg = "failed on net_flush()";
680
my_errno= ER_UNKNOWN_ERROR;
685
We may have missed the update broadcast from the log
686
that has just happened, let's try to catch it if it did.
687
If we did not miss anything, we just wait for other threads
692
bool read_packet = 0, fatal_error = 0;
695
if (max_binlog_dump_events && !left_events--)
697
errmsg = "Debugging binlog dump abort";
698
my_errno= ER_UNKNOWN_ERROR;
704
No one will update the log while we are reading
705
now, but we'll be quick and just read one record
708
Add an counter that is incremented for each time we update the
709
binary log. We can avoid the following read if the counter
710
has not been updated since last read.
713
pthread_mutex_lock(log_lock);
714
switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
716
/* we read successfully, so we'll need to send it to the slave */
717
pthread_mutex_unlock(log_lock);
720
coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
726
DBUG_PRINT("wait",("waiting for data in binary log"));
727
if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
729
pthread_mutex_unlock(log_lock);
734
ulong hb_info_counter= 0;
740
DBUG_ASSERT(heartbeat_ts && heartbeat_period != LL(0));
741
set_timespec_nsec(*heartbeat_ts, heartbeat_period);
743
ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
744
DBUG_ASSERT(ret == 0 || heartbeat_period != LL(0) && coord != NULL);
745
if (ret == ETIMEDOUT || ret == ETIME)
748
if (hb_info_counter < 3)
750
sql_print_information("master sends heartbeat message");
752
if (hb_info_counter == 3)
753
sql_print_information("the rest of heartbeat info skipped ...");
756
if (send_heartbeat_event(net, packet, coord))
758
errmsg = "Failed on my_net_write()";
759
my_errno= ER_UNKNOWN_ERROR;
760
pthread_mutex_unlock(log_lock);
766
DBUG_ASSERT(ret == 0);
767
DBUG_PRINT("wait",("binary log received update"));
769
} while (ret != 0 && coord != NULL && !thd->killed);
770
pthread_mutex_unlock(log_lock);
775
pthread_mutex_unlock(log_lock);
782
thd_proc_info(thd, "Sending binlog event to slave");
783
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
785
errmsg = "Failed on my_net_write()";
786
my_errno= ER_UNKNOWN_ERROR;
790
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
794
errmsg = "failed in send_file()";
795
my_errno= ER_UNKNOWN_ERROR;
799
packet->set("\0", 1, &my_charset_bin);
801
No need to net_flush because we will get to flush later when
802
we hit EOF pretty quick
808
errmsg = "error reading log entry";
809
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
817
bool loop_breaker = 0;
818
/* need this to break out of the for loop from switch */
820
thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
821
switch (mysql_bin_log.find_next_log(&linfo, 1)) {
823
loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
828
errmsg = "could not find next log";
829
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
837
(void) my_close(file, MYF(MY_WME));
840
Call fake_rotate_event() in case the previous log (the one which
841
we have just finished reading) did not contain a Rotate event
842
(for example (I don't know any other example) the previous log
843
was the last one before the master was shutdown & restarted).
844
This way we tell the slave about the new log's name and
845
position. If the binlog is 5.0, the next event we are going to
846
read and send is Format_description_log_event.
848
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
849
fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
852
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
857
packet->append('\0');
859
coord->file_name= log_file_name; // reset to the next
865
(void)my_close(file, MYF(MY_WME));
868
thd_proc_info(thd, "Waiting to finalize termination");
869
pthread_mutex_lock(&LOCK_thread_count);
870
thd->current_linfo = 0;
871
pthread_mutex_unlock(&LOCK_thread_count);
875
thd_proc_info(thd, "Waiting to finalize termination");
878
Exclude iteration through thread list
879
this is needed for purge_logs() - it will iterate through
880
thread list and update thd->current_linfo->index_file_offset
881
this mutex will make sure that it never tried to update our linfo
882
after we return from this stack frame
884
pthread_mutex_lock(&LOCK_thread_count);
885
thd->current_linfo = 0;
886
pthread_mutex_unlock(&LOCK_thread_count);
888
(void) my_close(file, MYF(MY_WME));
890
my_message(my_errno, errmsg, MYF(0));
894
int start_slave(THD* thd , Master_info* mi, bool net_report)
898
DBUG_ENTER("start_slave");
900
lock_slave_threads(mi); // this allows us to cleanly read slave_running
901
// Get a mask of _stopped_ threads
902
init_thread_mask(&thread_mask,mi,1 /* inverse */);
904
Below we will start all stopped threads. But if the user wants to
905
start only one thread, do as if the other thread was running (as we
906
don't wan't to touch the other thread), so set the bit to 0 for the
909
if (thd->lex->slave_thd_opt)
910
thread_mask&= thd->lex->slave_thd_opt;
911
if (thread_mask) //some threads are stopped, start them
913
if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
915
slave_errno=ER_MASTER_INFO;
916
else if (server_id_supplied && *mi->host)
919
If we will start SQL thread we will care about UNTIL options If
920
not and they are specified we will ignore them and warn user
923
if (thread_mask & SLAVE_SQL)
925
pthread_mutex_lock(&mi->rli.data_lock);
927
if (thd->lex->mi.pos)
929
mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
930
mi->rli.until_log_pos= thd->lex->mi.pos;
932
We don't check thd->lex->mi.log_file_name for NULL here
933
since it is checked in sql_yacc.yy
935
strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
936
sizeof(mi->rli.until_log_name)-1);
938
else if (thd->lex->mi.relay_log_pos)
940
mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
941
mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
942
strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
943
sizeof(mi->rli.until_log_name)-1);
946
mi->rli.clear_until_condition();
948
if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
950
/* Preparing members for effective until condition checking */
951
const char *p= fn_ext(mi->rli.until_log_name);
956
mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
958
p_end points to the first invalid character. If it equals
959
to p, no digits were found, error. If it contains '\0' it
960
means conversion went ok.
962
if (p_end==p || *p_end)
963
slave_errno=ER_BAD_SLAVE_UNTIL_COND;
966
slave_errno=ER_BAD_SLAVE_UNTIL_COND;
968
/* mark the cached result of the UNTIL comparison as "undefined" */
969
mi->rli.until_log_names_cmp_result=
970
Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
972
/* Issuing warning then started without --skip-slave-start */
973
if (!opt_skip_slave_start)
974
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
975
ER_MISSING_SKIP_SLAVE,
976
ER(ER_MISSING_SKIP_SLAVE));
979
pthread_mutex_unlock(&mi->rli.data_lock);
981
else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
982
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
983
ER(ER_UNTIL_COND_IGNORED));
986
slave_errno = start_slave_threads(0 /*no mutex */,
987
1 /* wait for start */,
989
master_info_file,relay_log_info_file,
993
slave_errno = ER_BAD_SLAVE;
997
/* no error if all threads are already started, only a warning */
998
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
999
ER(ER_SLAVE_WAS_RUNNING));
1002
unlock_slave_threads(mi);
1007
my_message(slave_errno, ER(slave_errno), MYF(0));
1010
else if (net_report)
1017
int stop_slave(THD* thd, Master_info* mi, bool net_report )
1019
DBUG_ENTER("stop_slave");
1025
thd_proc_info(thd, "Killing slave");
1027
lock_slave_threads(mi);
1028
// Get a mask of _running_ threads
1029
init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
1031
Below we will stop all running threads.
1032
But if the user wants to stop only one thread, do as if the other thread
1033
was stopped (as we don't wan't to touch the other thread), so set the
1034
bit to 0 for the other thread
1036
if (thd->lex->slave_thd_opt)
1037
thread_mask &= thd->lex->slave_thd_opt;
1041
slave_errno= terminate_slave_threads(mi,thread_mask,
1046
//no error if both threads are already stopped, only a warning
1048
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
1049
ER(ER_SLAVE_WAS_NOT_RUNNING));
1051
unlock_slave_threads(mi);
1052
thd_proc_info(thd, 0);
1057
my_message(slave_errno, ER(slave_errno), MYF(0));
1060
else if (net_report)
1068
Remove all relay logs and start replication from the start
1073
mi Master info for the slave
1081
int reset_slave(THD *thd, Master_info* mi)
1083
struct stat stat_area;
1084
char fname[FN_REFLEN];
1085
int thread_mask= 0, error= 0;
1087
const char* errmsg=0;
1088
DBUG_ENTER("reset_slave");
1090
lock_slave_threads(mi);
1091
init_thread_mask(&thread_mask,mi,0 /* not inverse */);
1092
if (thread_mask) // We refuse if any slave thread is running
1094
sql_errno= ER_SLAVE_MUST_STOP;
1099
ha_reset_slave(thd);
1101
// delete relay logs, clear relay log coordinates
1102
if ((error= purge_relay_logs(&mi->rli, thd,
1107
/* Clear master's log coordinates */
1108
init_master_log_pos(mi);
1110
Reset errors (the idea is that we forget about the
1113
mi->rli.clear_error();
1114
mi->rli.clear_until_condition();
1116
// close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1117
end_master_info(mi);
1118
// and delete these two files
1119
fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
1120
if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1125
// delete relay_log_info_file
1126
fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
1127
if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1134
unlock_slave_threads(mi);
1136
my_error(sql_errno, MYF(0), errmsg);
1142
Kill all Binlog_dump threads which previously talked to the same slave
1143
("same" means with the same server id). Indeed, if the slave stops, if the
1144
Binlog_dump thread is waiting (pthread_cond_wait) for binlog update, then it
1145
will keep existing until a query is written to the binlog. If the master is
1146
idle, then this could last long, and if the slave reconnects, we could have 2
1147
Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
1148
binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
1149
the master kills any existing thread with the slave's server id (if this id is
1150
not zero; it will be true for real slaves, but false for mysqlbinlog when it
1151
sends COM_BINLOG_DUMP to get a remote binlog dump).
1154
kill_zombie_dump_threads()
1155
slave_server_id the slave's server id
1160
void kill_zombie_dump_threads(uint32 slave_server_id)
1162
pthread_mutex_lock(&LOCK_thread_count);
1163
I_List_iterator<THD> it(threads);
1168
if (tmp->command == COM_BINLOG_DUMP &&
1169
tmp->server_id == slave_server_id)
1171
pthread_mutex_lock(&tmp->LOCK_delete); // Lock from delete
1175
pthread_mutex_unlock(&LOCK_thread_count);
1179
Here we do not call kill_one_thread() as
1180
it will be slow because it will iterate through the list
1181
again. We just to do kill the thread ourselves.
1183
tmp->awake(THD::KILL_QUERY);
1184
pthread_mutex_unlock(&tmp->LOCK_delete);
1189
bool change_master(THD* thd, Master_info* mi)
1192
const char* errmsg= 0;
1193
bool need_relay_log_purge= 1;
1194
DBUG_ENTER("change_master");
1196
lock_slave_threads(mi);
1197
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
1198
if (thread_mask) // We refuse if any slave thread is running
1200
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1201
unlock_slave_threads(mi);
1205
thd_proc_info(thd, "Changing master");
1206
LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1207
// TODO: see if needs re-write
1208
if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
1211
my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1212
unlock_slave_threads(mi);
1217
Data lock not needed since we have already stopped the running threads,
1218
and we have the hold on the run locks which will keep all threads that
1219
could possibly modify the data structures from running
1223
If the user specified host or port without binlog or position,
1224
reset binlog's name to FIRST and position to 4.
1227
if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1229
mi->master_log_name[0] = 0;
1230
mi->master_log_pos= BIN_LOG_HEADER_SIZE;
1233
if (lex_mi->log_file_name)
1234
strmake(mi->master_log_name, lex_mi->log_file_name,
1235
sizeof(mi->master_log_name)-1);
1238
mi->master_log_pos= lex_mi->pos;
1240
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
1243
strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1245
strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
1246
if (lex_mi->password)
1247
strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
1249
mi->port = lex_mi->port;
1250
if (lex_mi->connect_retry)
1251
mi->connect_retry = lex_mi->connect_retry;
1252
if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1253
mi->heartbeat_period = lex_mi->heartbeat_period;
1255
mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
1256
(slave_net_timeout/2.0));
1257
mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd
1258
if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1259
mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
1261
if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1262
mi->ssl_verify_server_cert=
1263
(lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
1266
strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
1267
if (lex_mi->ssl_capath)
1268
strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
1269
if (lex_mi->ssl_cert)
1270
strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
1271
if (lex_mi->ssl_cipher)
1272
strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
1273
if (lex_mi->ssl_key)
1274
strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
1275
if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
1276
lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
1277
lex_mi->ssl_verify_server_cert )
1278
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
1279
ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1281
if (lex_mi->relay_log_name)
1283
need_relay_log_purge= 0;
1284
strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
1285
sizeof(mi->rli.group_relay_log_name)-1);
1286
strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
1287
sizeof(mi->rli.event_relay_log_name)-1);
1290
if (lex_mi->relay_log_pos)
1292
need_relay_log_purge= 0;
1293
mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
1297
If user did specify neither host nor port nor any log name nor any log
1298
pos, i.e. he specified only user/password/master_connect_retry, he probably
1299
wants replication to resume from where it had left, i.e. from the
1300
coordinates of the **SQL** thread (imagine the case where the I/O is ahead
1301
of the SQL; restarting from the coordinates of the I/O would lose some
1302
events which is probably unwanted when you are just doing minor changes
1303
like changing master_connect_retry).
1304
A side-effect is that if only the I/O thread was started, this thread may
1305
restart from ''/4 after the CHANGE MASTER. That's a minor problem (it is a
1306
much more unlikely situation than the one we are fixing here).
1307
Note: coordinates of the SQL thread must be read here, before the
1308
'if (need_relay_log_purge)' block which resets them.
1310
if (!lex_mi->host && !lex_mi->port &&
1311
!lex_mi->log_file_name && !lex_mi->pos &&
1312
need_relay_log_purge)
1315
Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1316
not initialized), so we use a max().
1317
What happens to mi->rli.master_log_pos during the initialization stages
1318
of replication is not 100% clear, so we guard against problems using
1321
mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
1322
mi->rli.group_master_log_pos);
1323
strmake(mi->master_log_name, mi->rli.group_master_log_name,
1324
sizeof(mi->master_log_name)-1);
1327
Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1330
if (flush_master_info(mi, 0))
1332
my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1333
unlock_slave_threads(mi);
1336
if (need_relay_log_purge)
1339
thd_proc_info(thd, "Purging old relay logs");
1340
if (purge_relay_logs(&mi->rli, thd,
1341
0 /* not only reset, but also reinit */,
1344
my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
1345
unlock_slave_threads(mi);
1353
/* Relay log is already initialized */
1354
if (init_relay_log_pos(&mi->rli,
1355
mi->rli.group_relay_log_name,
1356
mi->rli.group_relay_log_pos,
1360
my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
1361
unlock_slave_threads(mi);
1366
Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block,
1367
so restore them to good values. If we left them to ''/0, that would work;
1368
but that would fail in the case of 2 successive CHANGE MASTER (without a
1369
START SLAVE in between): because first one would set the coords in mi to
1370
the good values of those in rli, the set those in rli to ''/0, then
1371
second CHANGE MASTER would set the coords in mi to those of rli, i.e. to
1372
''/0: we have lost all copies of the original good coordinates.
1373
That's why we always save good coords in rli.
1375
mi->rli.group_master_log_pos= mi->master_log_pos;
1376
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
1377
strmake(mi->rli.group_master_log_name,mi->master_log_name,
1378
sizeof(mi->rli.group_master_log_name)-1);
1380
if (!mi->rli.group_master_log_name[0]) // uninitialized case
1381
mi->rli.group_master_log_pos=0;
1383
pthread_mutex_lock(&mi->rli.data_lock);
1384
mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1385
/* Clear the errors, for a clean start */
1386
mi->rli.clear_error();
1387
mi->rli.clear_until_condition();
1389
If we don't write new coordinates to disk now, then old will remain in
1390
relay-log.info until START SLAVE is issued; but if mysqld is shutdown
1391
before START SLAVE, then old will remain in relay-log.info, and will be the
1392
in-memory value at restart (thus causing errors, as the old relay log does
1395
flush_relay_log_info(&mi->rli);
1396
pthread_cond_broadcast(&mi->data_cond);
1397
pthread_mutex_unlock(&mi->rli.data_lock);
1399
unlock_slave_threads(mi);
1400
thd_proc_info(thd, 0);
1405
int reset_master(THD* thd)
1407
if (!mysql_bin_log.is_open())
1409
my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
1410
ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
1413
return mysql_bin_log.reset_logs(thd);
1416
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
1417
const char* log_file_name2, ulonglong log_pos2)
1420
uint log_file_name1_len= strlen(log_file_name1);
1421
uint log_file_name2_len= strlen(log_file_name2);
1423
// We assume that both log names match up to '.'
1424
if (log_file_name1_len == log_file_name2_len)
1426
if ((res= strcmp(log_file_name1, log_file_name2)))
1428
return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
1430
return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
1434
bool mysql_show_binlog_events(THD* thd)
1436
Protocol *protocol= thd->protocol;
1437
List<Item> field_list;
1438
const char *errmsg = 0;
1442
DBUG_ENTER("mysql_show_binlog_events");
1444
Log_event::init_show_field_list(&field_list);
1445
if (protocol->send_fields(&field_list,
1446
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1449
Format_description_log_event *description_event= new
1450
Format_description_log_event(3); /* MySQL 4.0 by default */
1453
Wait for handlers to insert any pending information
1454
into the binlog. For e.g. ndb which updates the binlog asynchronously
1455
this is needed so that the uses sees all its own commands in the binlog
1457
ha_binlog_wait(thd);
1459
if (mysql_bin_log.is_open())
1461
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1462
SELECT_LEX_UNIT *unit= &thd->lex->unit;
1463
ha_rows event_count, limit_start, limit_end;
1464
my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1465
char search_file_name[FN_REFLEN], *name;
1466
const char *log_file_name = lex_mi->log_file_name;
1467
pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1471
unit->set_limit(thd->lex->current_select);
1472
limit_start= unit->offset_limit_cnt;
1473
limit_end= unit->select_limit_cnt;
1475
name= search_file_name;
1477
mysql_bin_log.make_log_name(search_file_name, log_file_name);
1479
name=0; // Find first log
1481
linfo.index_file_offset = 0;
1483
if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1485
errmsg = "Could not find target log";
1489
pthread_mutex_lock(&LOCK_thread_count);
1490
thd->current_linfo = &linfo;
1491
pthread_mutex_unlock(&LOCK_thread_count);
1493
if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
1497
to account binlog event header size
1499
thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
1501
pthread_mutex_lock(log_lock);
1504
open_binlog() sought to position 4.
1505
Read the first event in case it's a Format_description_log_event, to
1506
know the format. If there's no such event, we are 3.23 or 4.x. This
1507
code, like before, can't read 3.23 binlogs.
1508
This code will fail on a mixed relay log (one which has Format_desc then
1509
Rotate then Format_desc).
1511
ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
1514
if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
1516
delete description_event;
1517
description_event= (Format_description_log_event*) ev;
1523
my_b_seek(&log, pos);
1525
if (!description_event->is_valid())
1527
errmsg="Invalid Format_description event; could be out of memory";
1531
for (event_count = 0;
1532
(ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
1533
description_event)); )
1535
if (event_count >= limit_start &&
1536
ev->net_send(protocol, linfo.log_file_name, pos))
1538
errmsg = "Net error";
1540
pthread_mutex_unlock(log_lock);
1544
pos = my_b_tell(&log);
1547
if (++event_count >= limit_end)
1551
if (event_count < limit_end && log.error)
1553
errmsg = "Wrong offset or I/O error";
1554
pthread_mutex_unlock(log_lock);
1558
pthread_mutex_unlock(log_lock);
1564
delete description_event;
1568
(void) my_close(file, MYF(MY_WME));
1572
my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
1573
"SHOW BINLOG EVENTS", errmsg);
1577
pthread_mutex_lock(&LOCK_thread_count);
1578
thd->current_linfo = 0;
1579
pthread_mutex_unlock(&LOCK_thread_count);
1584
bool show_binlog_info(THD* thd)
1586
Protocol *protocol= thd->protocol;
1587
DBUG_ENTER("show_binlog_info");
1588
List<Item> field_list;
1589
field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1590
field_list.push_back(new Item_return_int("Position",20,
1591
MYSQL_TYPE_LONGLONG));
1592
field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
1593
field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
1595
if (protocol->send_fields(&field_list,
1596
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1598
protocol->prepare_for_resend();
1600
if (mysql_bin_log.is_open())
1603
mysql_bin_log.get_current_log(&li);
1604
int dir_len = dirname_length(li.log_file_name);
1605
protocol->store(li.log_file_name + dir_len, &my_charset_bin);
1606
protocol->store((ulonglong) li.pos);
1607
protocol->store(binlog_filter->get_do_db());
1608
protocol->store(binlog_filter->get_ignore_db());
1609
if (protocol->write())
1618
Send a list of all binary logs to client
1622
thd Thread specific variable
1629
bool show_binlogs(THD* thd)
1631
IO_CACHE *index_file;
1634
char fname[FN_REFLEN];
1635
List<Item> field_list;
1638
Protocol *protocol= thd->protocol;
1639
DBUG_ENTER("show_binlogs");
1641
if (!mysql_bin_log.is_open())
1643
my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0));
1647
field_list.push_back(new Item_empty_string("Log_name", 255));
1648
field_list.push_back(new Item_return_int("File_size", 20,
1649
MYSQL_TYPE_LONGLONG));
1650
if (protocol->send_fields(&field_list,
1651
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1654
pthread_mutex_lock(mysql_bin_log.get_log_lock());
1655
mysql_bin_log.lock_index();
1656
index_file=mysql_bin_log.get_index_file();
1658
mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
1659
pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
1661
cur_dir_len= dirname_length(cur.log_file_name);
1663
reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
1665
/* The file ends with EOF or empty line */
1666
while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
1669
ulonglong file_length= 0; // Length if open fails
1670
fname[--length] = '\0'; // remove the newline
1672
protocol->prepare_for_resend();
1673
dir_len= dirname_length(fname);
1675
protocol->store(fname + dir_len, length, &my_charset_bin);
1677
if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
1678
file_length= cur.pos; /* The active log, use the active position */
1681
/* this is an old log, open it and find the size */
1682
if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
1685
file_length= (ulonglong) my_seek(file, 0L, MY_SEEK_END, MYF(0));
1686
my_close(file, MYF(0));
1689
protocol->store(file_length);
1690
if (protocol->write())
1693
mysql_bin_log.unlock_index();
1698
mysql_bin_log.unlock_index();
1703
Load data's io cache specific hook to be executed
1704
before a chunk of data is being read into the cache's buffer
1705
The fuction instantianates and writes into the binlog
1706
replication events along LOAD DATA processing.
1708
@param file pointer to io-cache
1711
int log_loaded_block(IO_CACHE* file)
1713
DBUG_ENTER("log_loaded_block");
1714
LOAD_FILE_INFO *lf_info;
1716
/* buffer contains position where we started last read */
1717
uchar* buffer= (uchar*) my_b_get_buffer_start(file);
1718
uint max_event_size= current_thd->variables.max_allowed_packet;
1719
lf_info= (LOAD_FILE_INFO*) file->arg;
1720
if (lf_info->thd->current_stmt_binlog_row_based)
1722
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1723
lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1726
for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1727
buffer += min(block_len, max_event_size),
1728
block_len -= min(block_len, max_event_size))
1730
lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1731
if (lf_info->wrote_create_file)
1733
Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
1734
min(block_len, max_event_size),
1735
lf_info->log_delayed);
1736
mysql_bin_log.write(&a);
1740
Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1742
min(block_len, max_event_size),
1743
lf_info->log_delayed);
1744
mysql_bin_log.write(&b);
1745
lf_info->wrote_create_file= 1;
1746
DBUG_SYNC_POINT("debug_lock.created_file_event",10);
1753
Replication System Variables
1756
class sys_var_slave_skip_counter :public sys_var
1759
sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
1761
{ chain_sys_var(chain); }
1762
bool check(THD *thd, set_var *var);
1763
bool update(THD *thd, set_var *var);
1764
bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
1766
We can't retrieve the value of this, so we don't have to define
1767
type() or value_ptr()
1771
class sys_var_sync_binlog_period :public sys_var_long_ptr
1774
sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
1776
:sys_var_long_ptr(chain, name_arg,value_ptr) {}
1777
bool update(THD *thd, set_var *var);
1780
static void fix_slave_net_timeout(THD *thd, enum_var_type type)
1782
DBUG_ENTER("fix_slave_net_timeout");
1783
#ifdef HAVE_REPLICATION
1784
pthread_mutex_lock(&LOCK_active_mi);
1785
DBUG_PRINT("info",("slave_net_timeout=%lu mi->heartbeat_period=%.3f",
1787
(active_mi? active_mi->heartbeat_period : 0.0)));
1788
if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1789
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1790
ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1791
"The currect value for master_heartbeat_period"
1792
" exceeds the new value of `slave_net_timeout' sec."
1793
" A sensible value for the period should be"
1794
" less than the timeout.");
1795
pthread_mutex_unlock(&LOCK_active_mi);
1800
static sys_var_chain vars = { NULL, NULL };
1802
static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
1804
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
1806
fix_slave_net_timeout);
1807
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
1808
&slave_trans_retries);
1809
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
1810
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
1812
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
1815
static SHOW_VAR fixed_vars[]= {
1816
{"log_slave_updates", (char*) &opt_log_slave_updates, SHOW_MY_BOOL},
1817
{"relay_log" , (char*) &opt_relay_logname, SHOW_CHAR_PTR},
1818
{"relay_log_index", (char*) &opt_relaylog_index_name, SHOW_CHAR_PTR},
1819
{"relay_log_info_file", (char*) &relay_log_info_file, SHOW_CHAR_PTR},
1820
{"relay_log_space_limit", (char*) &relay_log_space_limit, SHOW_LONGLONG},
1821
{"slave_load_tmpdir", (char*) &slave_load_tmpdir, SHOW_CHAR_PTR},
1822
{"slave_skip_errors", (char*) &show_slave_skip_errors, SHOW_FUNC},
1825
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff)
1827
var->type=SHOW_CHAR;
1829
if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
1831
var->value= const_cast<char *>("OFF");
1833
else if (bitmap_is_set_all(&slave_error_mask))
1835
var->value= const_cast<char *>("ALL");
1839
/* 10 is enough assuming errors are max 4 digits */
1843
i < MAX_SLAVE_ERROR &&
1844
(buff - var->value) < SHOW_VAR_FUNC_BUFF_SIZE;
1847
if (bitmap_is_set(&slave_error_mask, i))
1849
buff= int10_to_str(i, buff, 10);
1853
if (var->value != buff)
1854
buff--; // Remove last ','
1855
if (i < MAX_SLAVE_ERROR)
1856
buff= strmov(buff, "..."); // Couldn't show all errors
1862
bool sys_var_slave_skip_counter::check(THD *thd, set_var *var)
1865
pthread_mutex_lock(&LOCK_active_mi);
1866
pthread_mutex_lock(&active_mi->rli.run_lock);
1867
if (active_mi->rli.slave_running)
1869
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1872
pthread_mutex_unlock(&active_mi->rli.run_lock);
1873
pthread_mutex_unlock(&LOCK_active_mi);
1874
var->save_result.ulong_value= (ulong) var->value->val_int();
1879
bool sys_var_slave_skip_counter::update(THD *thd, set_var *var)
1881
pthread_mutex_lock(&LOCK_active_mi);
1882
pthread_mutex_lock(&active_mi->rli.run_lock);
1884
The following test should normally never be true as we test this
1885
in the check function; To be safe against multiple
1886
SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
1888
if (!active_mi->rli.slave_running)
1890
pthread_mutex_lock(&active_mi->rli.data_lock);
1891
active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
1892
pthread_mutex_unlock(&active_mi->rli.data_lock);
1894
pthread_mutex_unlock(&active_mi->rli.run_lock);
1895
pthread_mutex_unlock(&LOCK_active_mi);
1900
bool sys_var_sync_binlog_period::update(THD *thd, set_var *var)
1902
sync_binlog_period= (ulong) var->save_result.ulonglong_value;
1906
int init_replication_sys_vars()
1908
mysql_append_static_vars(fixed_vars, sizeof(fixed_vars) / sizeof(SHOW_VAR));
1910
if (mysql_add_sys_var_chain(vars.first, my_long_options))
1912
/* should not happen */
1913
fprintf(stderr, "failed to initialize replication system variables");
1919
#endif /* HAVE_REPLICATION */