1
/* Copyright (C) 2000-2006 MySQL AB & Sasha
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include <drizzled/server_includes.h>
20
#include "log_event.h"
21
#include "rpl_filter.h"
22
#include <drizzled/drizzled_error_messages.h>
24
int max_binlog_dump_events = 0; // unlimited
27
fake_rotate_event() builds a fake (=which does not exist physically in any
28
binlog) Rotate event, which contains the name of the binlog we are going to
29
send to the slave (because the slave may not know it if it just asked for
30
MASTER_LOG_FILE='', MASTER_LOG_POS=4).
31
< 4.0.14, fake_rotate_event() was called only if the requested pos was 4.
32
After this version we always call it, so that a 3.23.58 slave can rely on
33
it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
34
zeros in the good positions which, by chance, make it possible for the 3.23
35
slave to detect that this event is unexpected) (this is luck which happens
36
because the master and slave disagree on the size of the header of
39
Relying on the event length of the Rotate event instead of these
40
well-placed zeros was not possible as Rotate events have a variable-length
43
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
44
uint64_t position, const char** errmsg)
46
char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
48
'when' (the timestamp) is set to 0 so that slave could distinguish between
49
real and fake Rotate events (if necessary)
52
header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
54
char* p = log_file_name+dirname_length(log_file_name);
55
uint32_t ident_len = (uint32_t) strlen(p);
56
uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
57
int4store(header + SERVER_ID_OFFSET, server_id);
58
int4store(header + EVENT_LEN_OFFSET, event_len);
59
int2store(header + FLAGS_OFFSET, 0);
61
// TODO: check what problems this may cause and fix them
62
int4store(header + LOG_POS_OFFSET, 0);
64
packet->append(header, sizeof(header));
65
int8store(buf+R_POS_OFFSET,position);
66
packet->append(buf, ROTATE_HEADER_LEN);
67
packet->append(p,ident_len);
68
if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
70
*errmsg = "failed on my_net_write()";
76
static int send_file(THD *thd)
79
int fd = -1, error = 1;
81
char fname[FN_REFLEN+1];
82
const char *errmsg = 0;
84
unsigned long packet_len;
85
unsigned char buf[IO_SIZE]; // It's safe to alloc this
88
The client might be slow loading the data, give him wait_timeout to do
91
old_timeout= net->read_timeout;
92
my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
95
We need net_flush here because the client will not know it needs to send
96
us the file name until it has processed the load event entry
98
if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
100
errmsg = _("Failed in send_file() while reading file name");
104
// terminate with \0 for fn_format
105
*((char*)net->read_pos + packet_len) = 0;
106
fn_format(fname, (char*) net->read_pos + 1, "", "", 4);
107
// this is needed to make replicate-ignore-db
108
if (!strcmp(fname,"/dev/null"))
111
if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
113
errmsg = _("Failed in send_file() on open of file");
117
while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0)
119
if (my_net_write(net, buf, bytes))
121
errmsg = _("Failed in send_file() while writing data to client");
127
if (my_net_write(net, (unsigned char*) "", 0) || net_flush(net) ||
128
(my_net_read(net) == packet_error))
130
errmsg = _("Failed in send_file() while negotiating file transfer close");
136
my_net_set_read_timeout(net, old_timeout);
138
(void) my_close(fd, MYF(0));
141
sql_print_error(errmsg);
148
Adjust the position pointer in the binary log file for all running slaves
151
adjust_linfo_offsets()
152
purge_offset Number of bytes removed from start of log index file
155
- This is called when doing a PURGE when we delete lines from the
159
- Before calling this function, we have to ensure that no threads are
160
using any binary log file before purge_offset.a
163
- Inform the slave threads that they should sync the position
164
in the binary log file with flush_relay_log_info.
165
Now they sync is done for next read.
168
void adjust_linfo_offsets(my_off_t purge_offset)
172
pthread_mutex_lock(&LOCK_thread_count);
173
I_List_iterator<THD> it(threads);
178
if ((linfo = tmp->current_linfo))
180
pthread_mutex_lock(&linfo->lock);
182
Index file offset can be less that purge offset only if
183
we just started reading the index file. In that case
184
we have nothing to adjust
186
if (linfo->index_file_offset < purge_offset)
187
linfo->fatal = (linfo->index_file_offset != 0);
189
linfo->index_file_offset -= purge_offset;
190
pthread_mutex_unlock(&linfo->lock);
193
pthread_mutex_unlock(&LOCK_thread_count);
197
bool log_in_use(const char* log_name)
199
int log_name_len = strlen(log_name) + 1;
203
pthread_mutex_lock(&LOCK_thread_count);
204
I_List_iterator<THD> it(threads);
209
if ((linfo = tmp->current_linfo))
211
pthread_mutex_lock(&linfo->lock);
212
result = !memcmp(log_name, linfo->log_file_name, log_name_len);
213
pthread_mutex_unlock(&linfo->lock);
219
pthread_mutex_unlock(&LOCK_thread_count);
223
bool purge_error_message(THD* thd, int res)
229
case LOG_INFO_EOF: errmsg= ER_UNKNOWN_TARGET_BINLOG; break;
230
case LOG_INFO_IO: errmsg= ER_IO_ERR_LOG_INDEX_READ; break;
231
case LOG_INFO_INVALID:errmsg= ER_BINLOG_PURGE_PROHIBITED; break;
232
case LOG_INFO_SEEK: errmsg= ER_FSEEK_FAIL; break;
233
case LOG_INFO_MEM: errmsg= ER_OUT_OF_RESOURCES; break;
234
case LOG_INFO_FATAL: errmsg= ER_BINLOG_PURGE_FATAL_ERR; break;
235
case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break;
236
case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break;
237
default: errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break;
242
my_message(errmsg, ER(errmsg), MYF(0));
250
bool purge_master_logs(THD* thd, const char* to_log)
252
char search_file_name[FN_REFLEN];
253
if (!mysql_bin_log.is_open())
259
mysql_bin_log.make_log_name(search_file_name, to_log);
260
return purge_error_message(thd,
261
mysql_bin_log.purge_logs(search_file_name, 0, 1,
266
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
268
if (!mysql_bin_log.is_open())
273
return purge_error_message(thd,
274
mysql_bin_log.purge_logs_before_date(purge_time));
277
int test_for_non_eof_log_read_errors(int error, const char **errmsg)
279
if (error == LOG_READ_EOF)
281
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
284
*errmsg = "bogus data in log event";
286
case LOG_READ_TOO_LARGE:
287
*errmsg = "log event entry exceeded max_allowed_packet; \
288
Increase max_allowed_packet on master";
291
*errmsg = "I/O error reading log event";
294
*errmsg = "memory allocation failed reading log event";
297
*errmsg = "binlog truncated in the middle of event";
300
*errmsg = "unknown error reading log event on the master";
308
An auxiliary function for calling in mysql_binlog_send
309
to initialize the heartbeat timeout in waiting for a binlogged event.
311
@param[in] thd THD to access a user variable
313
@return heartbeat period an uint64_t of nanoseconds
314
or zero if heartbeat was not demanded by slave
316
static uint64_t get_heartbeat_period(THD * thd)
319
LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")};
320
user_var_entry *entry=
321
(user_var_entry*) hash_search(&thd->user_vars, (unsigned char*) name.str,
323
return entry? entry->val_int(&null_value) : 0;
327
Function prepares and sends repliation heartbeat event.
329
@param net net object of THD
330
@param packet buffer to store the heartbeat instance
331
@param event_coordinates binlog file name and position of the last
332
real event master sent from binlog
335
Among three essential pieces of heartbeat data Log_event::when
337
The error to send is serious and should force terminating
340
static int send_heartbeat_event(NET* net, String* packet,
341
const struct event_coordinates *coord)
343
char header[LOG_EVENT_HEADER_LEN];
345
'when' (the timestamp) is set to 0 so that slave could distinguish between
346
real and fake Rotate events (if necessary)
348
memset(header, 0, 4); // when
350
header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
352
char* p= coord->file_name + dirname_length(coord->file_name);
354
uint32_t ident_len = strlen(p);
355
uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN;
356
int4store(header + SERVER_ID_OFFSET, server_id);
357
int4store(header + EVENT_LEN_OFFSET, event_len);
358
int2store(header + FLAGS_OFFSET, 0);
360
int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos
362
packet->append(header, sizeof(header));
363
packet->append(p, ident_len); // log_file_name
365
if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) ||
370
packet->set("\0", 1, &my_charset_bin);
375
TODO: Clean up loop to only have one call to send_file()
378
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
382
char *log_file_name = linfo.log_file_name;
383
char search_file_name[FN_REFLEN], *name;
386
String* packet = &thd->packet;
388
const char *errmsg = "Unknown error";
389
NET* net = &thd->net;
390
pthread_mutex_t *log_lock;
391
bool binlog_can_be_corrupted= false;
393
memset(&log, 0, sizeof(log));
395
heartbeat_period from @master_heartbeat_period user variable
397
uint64_t heartbeat_period= get_heartbeat_period(thd);
398
struct timespec heartbeat_buf;
399
struct event_coordinates coord_buf;
400
struct timespec *heartbeat_ts= NULL;
401
struct event_coordinates *coord= NULL;
402
if (heartbeat_period != 0L)
404
heartbeat_ts= &heartbeat_buf;
405
set_timespec_nsec(*heartbeat_ts, 0);
407
coord->file_name= log_file_name; // initialization basing on what slave remembers
411
if (!mysql_bin_log.is_open())
413
errmsg = "Binary log is not open";
414
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
417
if (!server_id_supplied)
419
errmsg = "Misconfigured master - server id was not set";
420
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
424
name=search_file_name;
426
mysql_bin_log.make_log_name(search_file_name, log_ident);
428
name=0; // Find first log
430
linfo.index_file_offset = 0;
432
if (mysql_bin_log.find_log_pos(&linfo, name, 1))
434
errmsg = "Could not find first log file name in binary log index file";
435
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
439
pthread_mutex_lock(&LOCK_thread_count);
440
thd->current_linfo = &linfo;
441
pthread_mutex_unlock(&LOCK_thread_count);
443
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
445
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
448
if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
450
errmsg= "Client requested master to start replication from \
451
impossible position";
452
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
457
We need to start a packet with something other than 255
458
to distinguish it from error
460
packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
463
Tell the client about the log name with a fake Rotate event;
464
this is needed even if we also send a Format_description_log_event
465
just after, because that event does not contain the binlog's name.
466
Note that as this Rotate event is sent before
467
Format_description_log_event, the slave cannot have any info to
468
understand this event's format, so the header len of
469
Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
470
than other events except FORMAT_DESCRIPTION_EVENT).
471
Before 4.0.14 we called fake_rotate_event below only if (pos ==
472
BIN_LOG_HEADER_SIZE), because if this is false then the slave
473
already knows the binlog's name.
474
Since, we always call fake_rotate_event; if the slave already knew
475
the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
476
useless but does not harm much. It is nice for 3.23 (>=.58) slaves
477
which test Rotate events to see if the master is 4.0 (then they
478
choose to stop because they can't replicate 4.0); by always calling
479
fake_rotate_event we are sure that 3.23.58 and newer will detect the
480
problem as soon as replication starts (BUG#198).
481
Always calling fake_rotate_event makes sending of normal
482
(=from-binlog) Rotate events a priori unneeded, but it is not so
483
simple: the 2 Rotate events are not equivalent, the normal one is
484
before the Stop event, the fake one is after. If we don't send the
485
normal one, then the Stop event will be interpreted (by existing 4.0
486
slaves) as "the master stopped", which is wrong. So for safety,
487
given that we want minimum modification of 4.0, we send the normal
490
if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
493
This error code is not perfect, as fake_rotate_event() does not
494
read anything from the binlog; if it fails it's because of an
495
error in my_net_write(), fortunately it will say so in errmsg.
497
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
500
packet->set("\0", 1, &my_charset_bin);
502
Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
503
this larger than the corresponding packet (query) sent
504
from client to master.
506
thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
509
We can set log_lock now, it does not move (it's a member of
510
mysql_bin_log, and it's already inited, and it will be destroyed
513
log_lock = mysql_bin_log.get_log_lock();
514
if (pos > BIN_LOG_HEADER_SIZE)
517
Try to find a Format_description_log_event at the beginning of
520
if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
523
The packet has offsets equal to the normal offsets in a binlog
524
event +1 (the first character is \0).
526
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
528
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
529
LOG_EVENT_BINLOG_IN_USE_F);
530
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
532
mark that this event with "log_pos=0", so the slave
533
should not increment master's binlog position
534
(rli->group_master_log_pos)
536
int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
538
if reconnect master sends FD event with `created' as 0
539
to avoid destroying temp tables.
541
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
542
ST_CREATED_OFFSET+1, (uint32_t) 0);
544
if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
546
errmsg = "Failed on my_net_write()";
547
my_errno= ER_UNKNOWN_ERROR;
552
No need to save this event. We are only doing simple reads
553
(no real parsing of the events) so we don't need it. And so
554
we don't need the artificial Format_description_log_event of
561
if (test_for_non_eof_log_read_errors(error, &errmsg))
564
It's EOF, nothing to do, go on reading next events, the
565
Format_description_log_event will be found naturally if it is written.
568
/* reset the packet as we wrote to it in any case */
569
packet->set("\0", 1, &my_charset_bin);
570
} /* end of if (pos > BIN_LOG_HEADER_SIZE); */
573
/* The Format_description_log_event event will be found naturally. */
576
/* seek to the requested position, to start the requested dump */
577
my_b_seek(&log, pos); // Seek will done on next read
579
while (!net->error && net->vio != 0 && !thd->killed)
581
while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
584
log's filename does not change while it's active
587
coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
589
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
591
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
592
LOG_EVENT_BINLOG_IN_USE_F);
593
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
595
else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
596
binlog_can_be_corrupted= false;
598
if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
600
errmsg = "Failed on my_net_write()";
601
my_errno= ER_UNKNOWN_ERROR;
605
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
609
errmsg = "failed in send_file()";
610
my_errno= ER_UNKNOWN_ERROR;
614
packet->set("\0", 1, &my_charset_bin);
618
here we were reading binlog that was not closed properly (as a result
619
of a crash ?). treat any corruption as EOF
621
if (binlog_can_be_corrupted && error != LOG_READ_MEM)
624
TODO: now that we are logging the offset, check to make sure
625
the recorded offset and the actual match.
626
Guilhem 2003-06: this is not true if this master is a slave
627
<4.0.15 running with --log-slave-updates, because then log_pos may
628
be the offset in the-master-of-this-master's binlog.
630
if (test_for_non_eof_log_read_errors(error, &errmsg))
633
if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
634
mysql_bin_log.is_active(log_file_name))
637
Block until there is more data in the log
641
errmsg = "failed on net_flush()";
642
my_errno= ER_UNKNOWN_ERROR;
647
We may have missed the update broadcast from the log
648
that has just happened, let's try to catch it if it did.
649
If we did not miss anything, we just wait for other threads
654
bool read_packet = 0, fatal_error = 0;
657
No one will update the log while we are reading
658
now, but we'll be quick and just read one record
661
Add an counter that is incremented for each time we update the
662
binary log. We can avoid the following read if the counter
663
has not been updated since last read.
666
pthread_mutex_lock(log_lock);
667
switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
669
/* we read successfully, so we'll need to send it to the slave */
670
pthread_mutex_unlock(log_lock);
673
coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
679
if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
681
pthread_mutex_unlock(log_lock);
689
assert(heartbeat_ts && heartbeat_period != 0L);
690
set_timespec_nsec(*heartbeat_ts, heartbeat_period);
692
ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
693
assert(ret == 0 || (heartbeat_period != 0L && coord != NULL));
694
if (ret == ETIMEDOUT || ret == ETIME)
696
if (send_heartbeat_event(net, packet, coord))
698
errmsg = "Failed on my_net_write()";
699
my_errno= ER_UNKNOWN_ERROR;
700
pthread_mutex_unlock(log_lock);
708
} while (ret != 0 && coord != NULL && !thd->killed);
709
pthread_mutex_unlock(log_lock);
714
pthread_mutex_unlock(log_lock);
721
thd_proc_info(thd, "Sending binlog event to slave");
722
if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) )
724
errmsg = "Failed on my_net_write()";
725
my_errno= ER_UNKNOWN_ERROR;
729
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
733
errmsg = "failed in send_file()";
734
my_errno= ER_UNKNOWN_ERROR;
738
packet->set("\0", 1, &my_charset_bin);
740
No need to net_flush because we will get to flush later when
741
we hit EOF pretty quick
747
errmsg = "error reading log entry";
748
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
756
bool loop_breaker = 0;
757
/* need this to break out of the for loop from switch */
759
thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
760
switch (mysql_bin_log.find_next_log(&linfo, 1)) {
762
loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
767
errmsg = "could not find next log";
768
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
776
(void) my_close(file, MYF(MY_WME));
779
Call fake_rotate_event() in case the previous log (the one which
780
we have just finished reading) did not contain a Rotate event
781
(for example (I don't know any other example) the previous log
782
was the last one before the master was shutdown & restarted).
783
This way we tell the slave about the new log's name and
784
position. If the binlog is 5.0, the next event we are going to
785
read and send is Format_description_log_event.
787
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
788
fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
791
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
796
packet->append('\0');
798
coord->file_name= log_file_name; // reset to the next
804
(void)my_close(file, MYF(MY_WME));
807
thd_proc_info(thd, "Waiting to finalize termination");
808
pthread_mutex_lock(&LOCK_thread_count);
809
thd->current_linfo = 0;
810
pthread_mutex_unlock(&LOCK_thread_count);
814
thd_proc_info(thd, "Waiting to finalize termination");
817
Exclude iteration through thread list
818
this is needed for purge_logs() - it will iterate through
819
thread list and update thd->current_linfo->index_file_offset
820
this mutex will make sure that it never tried to update our linfo
821
after we return from this stack frame
823
pthread_mutex_lock(&LOCK_thread_count);
824
thd->current_linfo = 0;
825
pthread_mutex_unlock(&LOCK_thread_count);
827
(void) my_close(file, MYF(MY_WME));
829
my_message(my_errno, errmsg, MYF(0));
833
int start_slave(THD* thd , Master_info* mi, bool net_report)
838
lock_slave_threads(mi); // this allows us to cleanly read slave_running
839
// Get a mask of _stopped_ threads
840
init_thread_mask(&thread_mask,mi,1 /* inverse */);
842
Below we will start all stopped threads. But if the user wants to
843
start only one thread, do as if the other thread was running (as we
844
don't wan't to touch the other thread), so set the bit to 0 for the
847
if (thd->lex->slave_thd_opt)
848
thread_mask&= thd->lex->slave_thd_opt;
849
if (thread_mask) //some threads are stopped, start them
851
if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
853
slave_errno=ER_MASTER_INFO;
854
else if (server_id_supplied && *mi->host)
857
If we will start SQL thread we will care about UNTIL options If
858
not and they are specified we will ignore them and warn user
861
if (thread_mask & SLAVE_SQL)
863
pthread_mutex_lock(&mi->rli.data_lock);
865
if (thd->lex->mi.pos)
867
mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
868
mi->rli.until_log_pos= thd->lex->mi.pos;
870
We don't check thd->lex->mi.log_file_name for NULL here
871
since it is checked in sql_yacc.yy
873
strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
874
sizeof(mi->rli.until_log_name)-1);
876
else if (thd->lex->mi.relay_log_pos)
878
mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
879
mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
880
strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
881
sizeof(mi->rli.until_log_name)-1);
884
mi->rli.clear_until_condition();
886
if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
888
/* Preparing members for effective until condition checking */
889
const char *p= fn_ext(mi->rli.until_log_name);
894
mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
896
p_end points to the first invalid character. If it equals
897
to p, no digits were found, error. If it contains '\0' it
898
means conversion went ok.
900
if (p_end==p || *p_end)
901
slave_errno=ER_BAD_SLAVE_UNTIL_COND;
904
slave_errno=ER_BAD_SLAVE_UNTIL_COND;
906
/* mark the cached result of the UNTIL comparison as "undefined" */
907
mi->rli.until_log_names_cmp_result=
908
Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
910
/* Issuing warning then started without --skip-slave-start */
911
if (!opt_skip_slave_start)
912
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
913
ER_MISSING_SKIP_SLAVE,
914
ER(ER_MISSING_SKIP_SLAVE));
917
pthread_mutex_unlock(&mi->rli.data_lock);
919
else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
920
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
921
ER(ER_UNTIL_COND_IGNORED));
924
slave_errno = start_slave_threads(0 /*no mutex */,
925
1 /* wait for start */,
927
master_info_file,relay_log_info_file,
931
slave_errno = ER_BAD_SLAVE;
935
/* no error if all threads are already started, only a warning */
936
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
937
ER(ER_SLAVE_WAS_RUNNING));
940
unlock_slave_threads(mi);
945
my_message(slave_errno, ER(slave_errno), MYF(0));
955
int stop_slave(THD* thd, Master_info* mi, bool net_report )
961
thd_proc_info(thd, "Killing slave");
963
lock_slave_threads(mi);
964
// Get a mask of _running_ threads
965
init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
967
Below we will stop all running threads.
968
But if the user wants to stop only one thread, do as if the other thread
969
was stopped (as we don't wan't to touch the other thread), so set the
970
bit to 0 for the other thread
972
if (thd->lex->slave_thd_opt)
973
thread_mask &= thd->lex->slave_thd_opt;
977
slave_errno= terminate_slave_threads(mi,thread_mask,
982
//no error if both threads are already stopped, only a warning
984
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
985
ER(ER_SLAVE_WAS_NOT_RUNNING));
987
unlock_slave_threads(mi);
988
thd_proc_info(thd, 0);
993
my_message(slave_errno, ER(slave_errno), MYF(0));
1004
Remove all relay logs and start replication from the start
1009
mi Master info for the slave
1017
int reset_slave(THD *thd, Master_info* mi)
1019
struct stat stat_area;
1020
char fname[FN_REFLEN];
1021
int thread_mask= 0, error= 0;
1022
uint32_t sql_errno=0;
1023
const char* errmsg=0;
1025
lock_slave_threads(mi);
1026
init_thread_mask(&thread_mask,mi,0 /* not inverse */);
1027
if (thread_mask) // We refuse if any slave thread is running
1029
sql_errno= ER_SLAVE_MUST_STOP;
1034
// delete relay logs, clear relay log coordinates
1035
if ((error= purge_relay_logs(&mi->rli, thd,
1040
/* Clear master's log coordinates */
1041
init_master_log_pos(mi);
1043
Reset errors (the idea is that we forget about the
1046
mi->rli.clear_error();
1047
mi->rli.clear_until_condition();
1049
// close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1050
end_master_info(mi);
1051
// and delete these two files
1052
fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
1053
if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1058
// delete relay_log_info_file
1059
fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
1060
if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1067
unlock_slave_threads(mi);
1069
my_error(sql_errno, MYF(0), errmsg);
1075
Kill all Binlog_dump threads which previously talked to the same slave
1076
("same" means with the same server id). Indeed, if the slave stops, if the
1077
Binlog_dump thread is waiting (pthread_cond_wait) for binlog update, then it
1078
will keep existing until a query is written to the binlog. If the master is
1079
idle, then this could last long, and if the slave reconnects, we could have 2
1080
Binlog_dump threads in SHOW PROCESSLIST, until a query is written to the
1081
binlog. To avoid this, when the slave reconnects and sends COM_BINLOG_DUMP,
1082
the master kills any existing thread with the slave's server id (if this id is
1083
not zero; it will be true for real slaves, but false for mysqlbinlog when it
1084
sends COM_BINLOG_DUMP to get a remote binlog dump).
1087
kill_zombie_dump_threads()
1088
slave_server_id the slave's server id
1093
void kill_zombie_dump_threads(uint32_t slave_server_id)
1095
pthread_mutex_lock(&LOCK_thread_count);
1096
I_List_iterator<THD> it(threads);
1101
if (tmp->command == COM_BINLOG_DUMP &&
1102
tmp->server_id == slave_server_id)
1104
pthread_mutex_lock(&tmp->LOCK_delete); // Lock from delete
1108
pthread_mutex_unlock(&LOCK_thread_count);
1112
Here we do not call kill_one_thread() as
1113
it will be slow because it will iterate through the list
1114
again. We just to do kill the thread ourselves.
1116
tmp->awake(THD::KILL_QUERY);
1117
pthread_mutex_unlock(&tmp->LOCK_delete);
1122
bool change_master(THD* thd, Master_info* mi)
1125
const char* errmsg= 0;
1126
bool need_relay_log_purge= 1;
1128
lock_slave_threads(mi);
1129
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
1130
if (thread_mask) // We refuse if any slave thread is running
1132
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1133
unlock_slave_threads(mi);
1137
thd_proc_info(thd, "Changing master");
1138
LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1139
// TODO: see if needs re-write
1140
if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
1143
my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1144
unlock_slave_threads(mi);
1149
Data lock not needed since we have already stopped the running threads,
1150
and we have the hold on the run locks which will keep all threads that
1151
could possibly modify the data structures from running
1155
If the user specified host or port without binlog or position,
1156
reset binlog's name to FIRST and position to 4.
1159
if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1161
mi->master_log_name[0] = 0;
1162
mi->master_log_pos= BIN_LOG_HEADER_SIZE;
1165
if (lex_mi->log_file_name)
1166
strmake(mi->master_log_name, lex_mi->log_file_name,
1167
sizeof(mi->master_log_name)-1);
1170
mi->master_log_pos= lex_mi->pos;
1174
strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1176
strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
1177
if (lex_mi->password)
1178
strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
1180
mi->port = lex_mi->port;
1181
if (lex_mi->connect_retry)
1182
mi->connect_retry = lex_mi->connect_retry;
1183
if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1184
mi->heartbeat_period = lex_mi->heartbeat_period;
1186
mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
1187
(slave_net_timeout/2.0));
1188
mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
1189
if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1190
mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
1192
if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1193
mi->ssl_verify_server_cert=
1194
(lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
1197
strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
1198
if (lex_mi->ssl_capath)
1199
strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
1200
if (lex_mi->ssl_cert)
1201
strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
1202
if (lex_mi->ssl_cipher)
1203
strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
1204
if (lex_mi->ssl_key)
1205
strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
1206
if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
1207
lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
1208
lex_mi->ssl_verify_server_cert )
1209
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1210
ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1212
if (lex_mi->relay_log_name)
1214
need_relay_log_purge= 0;
1215
strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
1216
sizeof(mi->rli.group_relay_log_name)-1);
1217
strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
1218
sizeof(mi->rli.event_relay_log_name)-1);
1221
if (lex_mi->relay_log_pos)
1223
need_relay_log_purge= 0;
1224
mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
1228
If user did specify neither host nor port nor any log name nor any log
1229
pos, i.e. he specified only user/password/master_connect_retry, he probably
1230
wants replication to resume from where it had left, i.e. from the
1231
coordinates of the **SQL** thread (imagine the case where the I/O is ahead
1232
of the SQL; restarting from the coordinates of the I/O would lose some
1233
events which is probably unwanted when you are just doing minor changes
1234
like changing master_connect_retry).
1235
A side-effect is that if only the I/O thread was started, this thread may
1236
restart from ''/4 after the CHANGE MASTER. That's a minor problem (it is a
1237
much more unlikely situation than the one we are fixing here).
1238
Note: coordinates of the SQL thread must be read here, before the
1239
'if (need_relay_log_purge)' block which resets them.
1241
if (!lex_mi->host && !lex_mi->port &&
1242
!lex_mi->log_file_name && !lex_mi->pos &&
1243
need_relay_log_purge)
1246
Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1247
not initialized), so we use a cmax().
1248
What happens to mi->rli.master_log_pos during the initialization stages
1249
of replication is not 100% clear, so we guard against problems using
1252
mi->master_log_pos = ((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1253
? BIN_LOG_HEADER_SIZE
1254
: mi->rli.group_master_log_pos);
1255
strmake(mi->master_log_name, mi->rli.group_master_log_name,
1256
sizeof(mi->master_log_name)-1);
1259
Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1262
if (flush_master_info(mi, 0))
1264
my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1265
unlock_slave_threads(mi);
1268
if (need_relay_log_purge)
1271
thd_proc_info(thd, "Purging old relay logs");
1272
if (purge_relay_logs(&mi->rli, thd,
1273
0 /* not only reset, but also reinit */,
1276
my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
1277
unlock_slave_threads(mi);
1285
/* Relay log is already initialized */
1286
if (init_relay_log_pos(&mi->rli,
1287
mi->rli.group_relay_log_name,
1288
mi->rli.group_relay_log_pos,
1292
my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
1293
unlock_slave_threads(mi);
1298
Coordinates in rli were spoilt by the 'if (need_relay_log_purge)' block,
1299
so restore them to good values. If we left them to ''/0, that would work;
1300
but that would fail in the case of 2 successive CHANGE MASTER (without a
1301
START SLAVE in between): because first one would set the coords in mi to
1302
the good values of those in rli, the set those in rli to ''/0, then
1303
second CHANGE MASTER would set the coords in mi to those of rli, i.e. to
1304
''/0: we have lost all copies of the original good coordinates.
1305
That's why we always save good coords in rli.
1307
mi->rli.group_master_log_pos= mi->master_log_pos;
1308
strmake(mi->rli.group_master_log_name,mi->master_log_name,
1309
sizeof(mi->rli.group_master_log_name)-1);
1311
if (!mi->rli.group_master_log_name[0]) // uninitialized case
1312
mi->rli.group_master_log_pos=0;
1314
pthread_mutex_lock(&mi->rli.data_lock);
1315
mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1316
/* Clear the errors, for a clean start */
1317
mi->rli.clear_error();
1318
mi->rli.clear_until_condition();
1320
If we don't write new coordinates to disk now, then old will remain in
1321
relay-log.info until START SLAVE is issued; but if mysqld is shutdown
1322
before START SLAVE, then old will remain in relay-log.info, and will be the
1323
in-memory value at restart (thus causing errors, as the old relay log does
1326
flush_relay_log_info(&mi->rli);
1327
pthread_cond_broadcast(&mi->data_cond);
1328
pthread_mutex_unlock(&mi->rli.data_lock);
1330
unlock_slave_threads(mi);
1331
thd_proc_info(thd, 0);
1336
int reset_master(THD* thd)
1338
if (!mysql_bin_log.is_open())
1340
my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
1341
ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
1344
return mysql_bin_log.reset_logs(thd);
1347
int cmp_master_pos(const char* log_file_name1, uint64_t log_pos1,
1348
const char* log_file_name2, uint64_t log_pos2)
1351
uint32_t log_file_name1_len= strlen(log_file_name1);
1352
uint32_t log_file_name2_len= strlen(log_file_name2);
1354
// We assume that both log names match up to '.'
1355
if (log_file_name1_len == log_file_name2_len)
1357
if ((res= strcmp(log_file_name1, log_file_name2)))
1359
return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
1361
return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
1365
bool mysql_show_binlog_events(THD* thd)
1367
Protocol *protocol= thd->protocol;
1368
List<Item> field_list;
1369
const char *errmsg= 0;
1374
Log_event::init_show_field_list(&field_list);
1375
if (protocol->send_fields(&field_list,
1376
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1379
Format_description_log_event *description_event= new
1380
Format_description_log_event(3); /* MySQL 4.0 by default */
1382
if (mysql_bin_log.is_open())
1384
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1385
SELECT_LEX_UNIT *unit= &thd->lex->unit;
1386
ha_rows event_count, limit_start, limit_end;
1387
my_off_t pos = cmax((uint64_t)BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1388
char search_file_name[FN_REFLEN], *name;
1389
const char *log_file_name = lex_mi->log_file_name;
1390
pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1394
unit->set_limit(thd->lex->current_select);
1395
limit_start= unit->offset_limit_cnt;
1396
limit_end= unit->select_limit_cnt;
1398
name= search_file_name;
1400
mysql_bin_log.make_log_name(search_file_name, log_file_name);
1402
name=0; // Find first log
1404
linfo.index_file_offset = 0;
1406
if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1408
errmsg = "Could not find target log";
1412
pthread_mutex_lock(&LOCK_thread_count);
1413
thd->current_linfo = &linfo;
1414
pthread_mutex_unlock(&LOCK_thread_count);
1416
if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
1420
to account binlog event header size
1422
thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
1424
pthread_mutex_lock(log_lock);
1427
open_binlog() sought to position 4.
1428
Read the first event in case it's a Format_description_log_event, to
1429
know the format. If there's no such event, we are 3.23 or 4.x. This
1430
code, like before, can't read 3.23 binlogs.
1431
This code will fail on a mixed relay log (one which has Format_desc then
1432
Rotate then Format_desc).
1434
ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
1437
if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
1439
delete description_event;
1440
description_event= (Format_description_log_event*) ev;
1446
my_b_seek(&log, pos);
1448
if (!description_event->is_valid())
1450
errmsg="Invalid Format_description event; could be out of memory";
1454
for (event_count = 0;
1455
(ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
1456
description_event)); )
1458
if (event_count >= limit_start &&
1459
ev->net_send(protocol, linfo.log_file_name, pos))
1461
errmsg = "Net error";
1463
pthread_mutex_unlock(log_lock);
1467
pos = my_b_tell(&log);
1470
if (++event_count >= limit_end)
1474
if (event_count < limit_end && log.error)
1476
errmsg = "Wrong offset or I/O error";
1477
pthread_mutex_unlock(log_lock);
1481
pthread_mutex_unlock(log_lock);
1487
delete description_event;
1491
(void) my_close(file, MYF(MY_WME));
1495
my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
1496
"SHOW BINLOG EVENTS", errmsg);
1500
pthread_mutex_lock(&LOCK_thread_count);
1501
thd->current_linfo = 0;
1502
pthread_mutex_unlock(&LOCK_thread_count);
1507
bool show_binlog_info(THD* thd)
1509
Protocol *protocol= thd->protocol;
1510
List<Item> field_list;
1511
field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1512
field_list.push_back(new Item_return_int("Position",20,
1513
DRIZZLE_TYPE_LONGLONG));
1514
field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
1515
field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
1517
if (protocol->send_fields(&field_list,
1518
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1520
protocol->prepare_for_resend();
1522
if (mysql_bin_log.is_open())
1525
mysql_bin_log.get_current_log(&li);
1526
int dir_len = dirname_length(li.log_file_name);
1527
protocol->store(li.log_file_name + dir_len, &my_charset_bin);
1528
protocol->store((uint64_t) li.pos);
1529
protocol->store(binlog_filter->get_do_db());
1530
protocol->store(binlog_filter->get_ignore_db());
1531
if (protocol->write())
1540
Send a list of all binary logs to client
1544
thd Thread specific variable
1551
bool show_binlogs(THD* thd)
1553
IO_CACHE *index_file;
1556
char fname[FN_REFLEN];
1557
List<Item> field_list;
1560
Protocol *protocol= thd->protocol;
1562
if (!mysql_bin_log.is_open())
1564
my_message(ER_NO_BINARY_LOGGING, ER(ER_NO_BINARY_LOGGING), MYF(0));
1568
field_list.push_back(new Item_empty_string("Log_name", 255));
1569
field_list.push_back(new Item_return_int("File_size", 20,
1570
DRIZZLE_TYPE_LONGLONG));
1571
if (protocol->send_fields(&field_list,
1572
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1575
pthread_mutex_lock(mysql_bin_log.get_log_lock());
1576
mysql_bin_log.lock_index();
1577
index_file=mysql_bin_log.get_index_file();
1579
mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
1580
pthread_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
1582
cur_dir_len= dirname_length(cur.log_file_name);
1584
reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
1586
/* The file ends with EOF or empty line */
1587
while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
1590
uint64_t file_length= 0; // Length if open fails
1591
fname[--length] = '\0'; // remove the newline
1593
protocol->prepare_for_resend();
1594
dir_len= dirname_length(fname);
1596
protocol->store(fname + dir_len, length, &my_charset_bin);
1598
if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
1599
file_length= cur.pos; /* The active log, use the active position */
1602
/* this is an old log, open it and find the size */
1603
if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
1606
file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));
1607
my_close(file, MYF(0));
1610
protocol->store(file_length);
1611
if (protocol->write())
1614
mysql_bin_log.unlock_index();
1619
mysql_bin_log.unlock_index();
1624
Load data's io cache specific hook to be executed
1625
before a chunk of data is being read into the cache's buffer
1626
The fuction instantianates and writes into the binlog
1627
replication events along LOAD DATA processing.
1629
@param file pointer to io-cache
1632
int log_loaded_block(IO_CACHE* file)
1634
LOAD_FILE_INFO *lf_info;
1636
/* buffer contains position where we started last read */
1637
unsigned char* buffer= (unsigned char*) my_b_get_buffer_start(file);
1638
uint32_t max_event_size= current_thd->variables.max_allowed_packet;
1639
lf_info= (LOAD_FILE_INFO*) file->arg;
1640
if (lf_info->thd->current_stmt_binlog_row_based)
1642
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1643
lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1646
for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1647
buffer += cmin(block_len, max_event_size),
1648
block_len -= cmin(block_len, max_event_size))
1650
lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1651
if (lf_info->wrote_create_file)
1653
Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
1654
cmin(block_len, max_event_size),
1655
lf_info->log_delayed);
1656
mysql_bin_log.write(&a);
1660
Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1662
cmin(block_len, max_event_size),
1663
lf_info->log_delayed);
1664
mysql_bin_log.write(&b);
1665
lf_info->wrote_create_file= 1;
1672
Replication System Variables
1675
class sys_var_slave_skip_counter :public sys_var
1678
sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
1680
{ chain_sys_var(chain); }
1681
bool check(THD *thd, set_var *var);
1682
bool update(THD *thd, set_var *var);
1683
bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
1685
We can't retrieve the value of this, so we don't have to define
1686
type() or value_ptr()
1690
class sys_var_sync_binlog_period :public sys_var_long_ptr
1693
sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
1695
:sys_var_long_ptr(chain, name_arg,value_ptr) {}
1696
bool update(THD *thd, set_var *var);
1699
static void fix_slave_net_timeout(THD *thd,
1700
enum_var_type type __attribute__((unused)))
1702
pthread_mutex_lock(&LOCK_active_mi);
1703
if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1704
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1705
ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1706
"The currect value for master_heartbeat_period"
1707
" exceeds the new value of `slave_net_timeout' sec."
1708
" A sensible value for the period should be"
1709
" less than the timeout.");
1710
pthread_mutex_unlock(&LOCK_active_mi);
1714
static sys_var_chain vars = { NULL, NULL };
1716
static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
1718
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
1720
fix_slave_net_timeout);
1721
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
1722
&slave_trans_retries);
1723
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
1724
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
1726
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
1729
static int show_slave_skip_errors(THD *thd __attribute__((unused)),
1730
SHOW_VAR *var, char *buff)
1732
var->type=SHOW_CHAR;
1734
if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
1736
var->value= const_cast<char *>("OFF");
1738
else if (bitmap_is_set_all(&slave_error_mask))
1740
var->value= const_cast<char *>("ALL");
1744
/* 10 is enough assuming errors are max 4 digits */
1748
i < MAX_SLAVE_ERROR &&
1749
(buff - var->value) < SHOW_VAR_FUNC_BUFF_SIZE;
1752
if (bitmap_is_set(&slave_error_mask, i))
1754
buff= int10_to_str(i, buff, 10);
1758
if (var->value != buff)
1759
buff--; // Remove last ','
1760
if (i < MAX_SLAVE_ERROR)
1761
buff= my_stpcpy(buff, "..."); // Couldn't show all errors
1767
static st_show_var_func_container
1768
show_slave_skip_errors_cont = { &show_slave_skip_errors };
1771
static SHOW_VAR fixed_vars[]= {
1772
{"log_slave_updates", (char*) &opt_log_slave_updates, SHOW_MY_BOOL},
1773
{"relay_log" , (char*) &opt_relay_logname, SHOW_CHAR_PTR},
1774
{"relay_log_index", (char*) &opt_relaylog_index_name, SHOW_CHAR_PTR},
1775
{"relay_log_info_file", (char*) &relay_log_info_file, SHOW_CHAR_PTR},
1776
{"relay_log_space_limit", (char*) &relay_log_space_limit, SHOW_LONGLONG},
1777
{"slave_load_tmpdir", (char*) &slave_load_tmpdir, SHOW_CHAR_PTR},
1778
{"slave_skip_errors", (char*) &show_slave_skip_errors_cont, SHOW_FUNC},
1781
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((unused)),
1785
pthread_mutex_lock(&LOCK_active_mi);
1786
pthread_mutex_lock(&active_mi->rli.run_lock);
1787
if (active_mi->rli.slave_running)
1789
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1792
pthread_mutex_unlock(&active_mi->rli.run_lock);
1793
pthread_mutex_unlock(&LOCK_active_mi);
1794
var->save_result.ulong_value= (ulong) var->value->val_int();
1799
bool sys_var_slave_skip_counter::update(THD *thd __attribute__((unused)),
1802
pthread_mutex_lock(&LOCK_active_mi);
1803
pthread_mutex_lock(&active_mi->rli.run_lock);
1805
The following test should normally never be true as we test this
1806
in the check function; To be safe against multiple
1807
SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
1809
if (!active_mi->rli.slave_running)
1811
pthread_mutex_lock(&active_mi->rli.data_lock);
1812
active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
1813
pthread_mutex_unlock(&active_mi->rli.data_lock);
1815
pthread_mutex_unlock(&active_mi->rli.run_lock);
1816
pthread_mutex_unlock(&LOCK_active_mi);
1821
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((unused)),
1824
sync_binlog_period= (uint32_t) var->save_result.uint64_t_value;
1828
int init_replication_sys_vars()
1830
mysql_append_static_vars(fixed_vars, sizeof(fixed_vars) / sizeof(SHOW_VAR));
1832
if (mysql_add_sys_var_chain(vars.first, my_long_options))
1834
/* should not happen */
1835
fprintf(stderr, "failed to initialize replication system variables");