1
/* Copyright (C) 2000-2003 DRIZZLE AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
@addtogroup Replication
23
@brief Code to run the io thread and the sql thread on the
26
#include <drizzled/server_includes.h>
28
#include <storage/myisam/myisam.h>
29
#include <drizzled/replication/mi.h>
30
#include <drizzled/replication/rli.h>
31
#include <drizzled/replication/replication.h>
32
#include <libdrizzle/libdrizzle.h>
33
#include <mysys/hash.h>
34
#include <mysys/thr_alarm.h>
35
#include <libdrizzle/errmsg.h>
36
#include <mysys/mysys_err.h>
37
#include <drizzled/error.h>
38
#include <drizzled/sql_parse.h>
39
#include <drizzled/gettext.h>
41
#include <drizzled/session.h>
42
#include <drizzled/log_event.h>
43
#include <drizzled/item/empty_string.h>
44
#include <drizzled/item/return_int.h>
46
#if TIME_WITH_SYS_TIME
47
# include <sys/time.h>
51
# include <sys/time.h>
57
#include <drizzled/tztime.h>
59
#include <drizzled/replication/tblmap.h>
61
#define MAX_SLAVE_RETRY_PAUSE 5
62
bool use_slave_mask = 0;
63
MY_BITMAP slave_error_mask;
65
typedef bool (*CHECK_KILLED_FUNC)(Session*,void*);
67
char* slave_load_tmpdir = 0;
68
Master_info *active_mi= 0;
69
bool replicate_same_server_id;
70
uint64_t relay_log_space_limit = 0;
73
When slave thread exits, we need to remember the temporary tables so we
74
can re-use them on slave start.
76
TODO: move the vars below under Master_info
79
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
80
int32_t events_till_abort = -1;
82
enum enum_slave_reconnect_actions
84
SLAVE_RECON_ACT_REG= 0,
85
SLAVE_RECON_ACT_DUMP= 1,
86
SLAVE_RECON_ACT_EVENT= 2,
90
enum enum_slave_reconnect_messages
92
SLAVE_RECON_MSG_WAIT= 0,
93
SLAVE_RECON_MSG_KILLED_WAITING= 1,
94
SLAVE_RECON_MSG_AFTER= 2,
95
SLAVE_RECON_MSG_FAILED= 3,
96
SLAVE_RECON_MSG_COMMAND= 4,
97
SLAVE_RECON_MSG_KILLED_AFTER= 5,
101
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
104
N_("Waiting to reconnect after a failed registration on master"),
105
N_("Slave I/O thread killed while waiting to reconnect after a "
106
"failed registration on master"),
107
N_("Reconnecting after a failed registration on master"),
108
N_("failed registering on master, reconnecting to try again, "
109
"log '%s' at position %s"),
110
"COM_REGISTER_SLAVE",
111
N_("Slave I/O thread killed during or after reconnect")
114
N_("Waiting to reconnect after a failed binlog dump request"),
115
N_("Slave I/O thread killed while retrying master dump"),
116
N_("Reconnecting after a failed binlog dump request"),
117
N_("failed dump request, reconnecting to try again, "
118
"log '%s' at position %s"),
120
N_("Slave I/O thread killed during or after reconnect")
123
N_("Waiting to reconnect after a failed master event read"),
124
N_("Slave I/O thread killed while waiting to reconnect "
125
"after a failed read"),
126
N_("Reconnecting after a failed master event read"),
127
N_("Slave I/O thread: Failed reading log event, "
128
"reconnecting to retry, log '%s' at position %s"),
130
N_("Slave I/O thread killed during or after a "
131
"reconnect done to recover from failed read")
136
typedef enum { SLAVE_Session_IO, SLAVE_Session_SQL} SLAVE_Session_TYPE;
138
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
139
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
140
static bool wait_for_relay_log_space(Relay_log_info* rli);
141
static inline bool io_slave_killed(Session* session,Master_info* mi);
142
static inline bool sql_slave_killed(Session* session,Relay_log_info* rli);
143
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type);
144
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi);
145
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
146
bool suppress_warnings);
147
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
148
bool reconnect, bool suppress_warnings);
149
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
150
void* thread_killed_arg);
151
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
152
static Log_event* next_event(Relay_log_info* rli);
153
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
154
static int32_t terminate_slave_thread(Session *session,
155
pthread_mutex_t* term_lock,
156
pthread_cond_t* term_cond,
157
volatile uint32_t *slave_running,
159
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info);
162
Find out which replications threads are running
166
mask Return value here
167
mi master_info for slave
168
inverse If set, returns which threads are not running
171
Get a bit mask for which threads are running so that we can later restart
175
mask If inverse == 0, running threads
176
If inverse == 1, stopped threads
179
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
181
bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
182
register int32_t tmp_mask=0;
185
tmp_mask |= SLAVE_IO;
187
tmp_mask |= SLAVE_SQL;
189
tmp_mask^= (SLAVE_IO | SLAVE_SQL);
199
void lock_slave_threads(Master_info* mi)
201
//TODO: see if we can do this without dual mutex
202
pthread_mutex_lock(&mi->run_lock);
203
pthread_mutex_lock(&mi->rli.run_lock);
209
unlock_slave_threads()
212
void unlock_slave_threads(Master_info* mi)
214
//TODO: see if we can do this without dual mutex
215
pthread_mutex_unlock(&mi->rli.run_lock);
216
pthread_mutex_unlock(&mi->run_lock);
221
/* Initialize slave structures */
226
This is called when mysqld starts. Before client connections are
227
accepted. However bootstrap may conflict with us if it does START SLAVE.
228
So it's safer to take the lock.
230
pthread_mutex_lock(&LOCK_active_mi);
232
TODO: re-write this to interate through the list of files
235
active_mi= new Master_info;
238
If master_host is not specified, try to read it from the master_info file.
239
If master_host is specified, create the master_info file if it doesn't
244
sql_print_error(_("Failed to allocate memory for the master info structure"));
248
if (active_mi->init_master_info(master_info_file, relay_log_info_file, (SLAVE_IO | SLAVE_SQL)))
250
sql_print_error(_("Failed to initialize the master info structure"));
254
/* If server id is not set, start_slave_thread() will say it */
256
if (active_mi->host[0] && !opt_skip_slave_start)
258
if (start_slave_threads(1 /* need mutex */,
259
0 /* no wait for start*/,
263
SLAVE_IO | SLAVE_SQL))
265
sql_print_error(_("Failed to create slave threads"));
269
pthread_mutex_unlock(&LOCK_active_mi);
273
pthread_mutex_unlock(&LOCK_active_mi);
279
Init function to set up array for errors that should be skipped for slave
282
init_slave_skip_errors()
283
arg List of errors numbers to skip, separated with ','
286
Called from get_options() in mysqld.cc on start-up
289
void init_slave_skip_errors(const char* arg)
293
if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
295
fprintf(stderr, "Badly out of memory, please check your system status\n");
299
for (;my_isspace(system_charset_info,*arg);++arg)
301
if (!my_strnncoll(system_charset_info,(unsigned char*)arg,4,(const unsigned char*)"all",4))
303
bitmap_set_all(&slave_error_mask);
309
if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
311
if (err_code < MAX_SLAVE_ERROR)
312
bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
313
while (!my_isdigit(system_charset_info,*p) && *p)
320
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
323
return(0); /* successfully do nothing */
324
int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
325
pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
327
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
330
if ((error=terminate_slave_thread(mi->io_session,io_lock,
337
if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
339
mi->rli.abort_slave=1;
340
if ((error=terminate_slave_thread(mi->rli.sql_session,sql_lock,
342
&mi->rli.slave_running,
352
Wait for a slave thread to terminate.
354
This function is called after requesting the thread to terminate
355
(by setting @c abort_slave member of @c Relay_log_info or @c
356
Master_info structure to 1). Termination of the thread is
357
controlled with the the predicate <code>*slave_running</code>.
359
Function will acquire @c term_lock before waiting on the condition
360
unless @c skip_lock is true in which case the mutex should be owned
361
by the caller of this function and will remain acquired after
362
return from the function.
365
Associated lock to use when waiting for @c term_cond
368
Condition that is signalled when the thread has terminated
371
Pointer to predicate to check for slave thread termination
374
If @c true the lock will not be acquired before waiting on
375
the condition. In this case, it is assumed that the calling
376
function acquires the lock before calling this function.
381
terminate_slave_thread(Session *session,
382
pthread_mutex_t* term_lock,
383
pthread_cond_t* term_cond,
384
volatile uint32_t *slave_running,
390
pthread_mutex_lock(term_lock);
392
safe_mutex_assert_owner(term_lock);
397
pthread_mutex_unlock(term_lock);
398
return(ER_SLAVE_NOT_RUNNING);
400
assert(session != 0);
401
Session_CHECK_SENTRY(session);
404
Is is critical to test if the slave is running. Otherwise, we might
405
be referening freed memory trying to kick it
408
while (*slave_running) // Should always be true
410
pthread_mutex_lock(&session->LOCK_delete);
411
#ifndef DONT_USE_THR_ALARM
413
Error codes from pthread_kill are:
414
EINVAL: invalid signal number (can't happen)
415
ESRCH: thread already killed (can happen, should be ignored)
417
int32_t err= pthread_kill(session->real_id, thr_client_alarm);
418
assert(err != EINVAL);
420
session->awake(Session::NOT_KILLED);
421
pthread_mutex_unlock(&session->LOCK_delete);
424
There is a small chance that slave thread might miss the first
425
alarm. To protect againts it, resend the signal until it reacts
427
struct timespec abstime;
428
set_timespec(abstime,2);
429
error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
430
assert(error == ETIMEDOUT || error == 0);
433
assert(*slave_running == 0);
436
pthread_mutex_unlock(term_lock);
441
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
442
pthread_mutex_t *cond_lock,
443
pthread_cond_t *start_cond,
444
volatile uint32_t *slave_running,
445
volatile uint32_t *slave_run_id,
455
pthread_mutex_lock(start_lock);
459
pthread_cond_broadcast(start_cond);
461
pthread_mutex_unlock(start_lock);
462
sql_print_error(_("Server id not set, will not start slave"));
463
return(ER_BAD_SLAVE);
469
pthread_cond_broadcast(start_cond);
471
pthread_mutex_unlock(start_lock);
472
return(ER_SLAVE_MUST_STOP);
474
start_id= *slave_run_id;
477
struct sched_param tmp_sched_param;
479
memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
480
tmp_sched_param.sched_priority= CONNECT_PRIOR;
481
(void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
483
if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
486
pthread_mutex_unlock(start_lock);
487
return(ER_SLAVE_THREAD);
489
if (start_cond && cond_lock) // caller has cond_lock
491
Session* session = current_session;
492
while (start_id == *slave_run_id)
494
const char* old_msg = session->enter_cond(start_cond,cond_lock,
495
"Waiting for slave thread to start");
496
pthread_cond_wait(start_cond,cond_lock);
497
session->exit_cond(old_msg);
498
pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
500
return(session->killed_errno());
504
pthread_mutex_unlock(start_lock);
510
start_slave_threads()
513
SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
514
sense to do that for starting a slave--we always care if it actually
515
started the threads that were not previously running
518
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
519
Master_info* mi, const char*, const char*,
522
pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
523
pthread_cond_t* cond_io=0,*cond_sql=0;
526
if (need_slave_mutex)
528
lock_io = &mi->run_lock;
529
lock_sql = &mi->rli.run_lock;
533
cond_io = &mi->start_cond;
534
cond_sql = &mi->rli.start_cond;
535
lock_cond_io = &mi->run_lock;
536
lock_cond_sql = &mi->rli.run_lock;
539
if (thread_mask & SLAVE_IO)
540
error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
542
&mi->slave_running, &mi->slave_run_id,
543
mi, 1); //high priority, to read the most possible
544
if (!error && (thread_mask & SLAVE_SQL))
546
error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
548
&mi->rli.slave_running, &mi->rli.slave_run_id,
551
terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
558
static int32_t end_slave_on_walk(Master_info* mi, unsigned char* /*unused*/)
567
Free all resources used by slave
576
This is called when the server terminates, in close_connections().
577
It terminates slave threads. However, some CHANGE MASTER etc may still be
578
running presently. If a START SLAVE was in progress, the mutex lock below
579
will make us wait until slave threads have started, and START SLAVE
580
returns, then we terminate them here.
582
pthread_mutex_lock(&LOCK_active_mi);
586
TODO: replace the line below with
587
list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
588
once multi-master code is ready.
590
terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
591
active_mi->end_master_info();
595
pthread_mutex_unlock(&LOCK_active_mi);
600
static bool io_slave_killed(Session* session, Master_info* mi)
602
assert(mi->io_session == session);
603
assert(mi->slave_running); // tracking buffer overrun
604
return(mi->abort_slave || abort_loop || session->killed);
608
static bool sql_slave_killed(Session* session, Relay_log_info* rli)
610
assert(rli->sql_session == session);
611
assert(rli->slave_running == 1);// tracking buffer overrun
612
if (abort_loop || session->killed || rli->abort_slave)
615
If we are in an unsafe situation (stopping could corrupt replication),
616
we give one minute to the slave SQL thread of grace before really
617
terminating, in the hope that it will be able to read more events and
618
the unsafe situation will soon be left. Note that this one minute starts
619
from the last time anything happened in the slave SQL thread. So it's
620
really one minute of idleness, we don't timeout if the slave SQL thread
623
if (rli->last_event_start_time == 0)
625
if (difftime(time(NULL), rli->last_event_start_time) > 60)
627
rli->report(ERROR_LEVEL, 0,
628
_("SQL thread had to stop in an unsafe situation, in "
629
"the middle of applying updates to a "
630
"non-transactional table without any primary key. "
631
"There is a risk of duplicate updates when the slave "
632
"SQL thread is restarted. Please check your tables' "
633
"contents after restart."));
642
skip_load_data_infile()
645
This is used to tell a 3.23 master to break send_file()
648
void skip_load_data_infile(NET *net)
650
(void)net_request_file(net, "/dev/null");
651
(void)my_net_read(net); // discard response
652
(void)net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0); // ok
657
bool net_request_file(NET* net, const char* fname)
659
return(net_write_command(net, 251, (unsigned char*) fname, strlen(fname),
660
(unsigned char*) "", 0));
664
From other comments and tests in code, it looks like
665
sometimes Query_log_event and Load_log_event can have db == 0
666
(see rewrite_db() above for example)
667
(cases where this happens are unclear; it may be when the master is 3.23).
670
const char *print_slave_db_safe(const char* db)
672
return((db ? db : ""));
675
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
676
const char *default_val)
680
if ((length=my_b_gets(f,var, max_size)))
682
char* last_p = var + length -1;
684
*last_p = 0; // if we stopped on newline, kill it
688
If we truncated a line or stopped on last char, remove all chars
689
up to and including newline.
692
while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
696
else if (default_val)
698
strncpy(var, default_val, max_size-1);
705
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
710
if (my_b_gets(f, buf, sizeof(buf)))
715
else if (default_val)
723
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
728
if (my_b_gets(f, buf, sizeof(buf)))
730
if (sscanf(buf, "%f", var) != 1)
735
else if (default_val != 0.0)
743
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info)
745
if (io_slave_killed(session, mi))
747
if (info && global_system_variables.log_warnings)
748
sql_print_information("%s",info);
756
Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
757
relying on the binlog's version. This is not perfect: imagine an upgrade
758
of the master without waiting that all slaves are in sync with the master;
759
then a slave could be fooled about the binlog's format. This is what happens
760
when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
761
slaves are fooled. So we do this only to distinguish between 3.23 and more
762
recent masters (it's too late to change things for 3.23).
769
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
772
String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
773
char err_buff[MAX_SLAVE_ERRMSG];
774
const char* errmsg= 0;
776
DRIZZLE_RES *master_res= 0;
777
DRIZZLE_ROW master_row;
781
Free old description_event_for_queue (that is needed if we are in
784
delete mi->rli.relay_log.description_event_for_queue;
785
mi->rli.relay_log.description_event_for_queue= 0;
787
if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
789
errmsg = _("Master reported unrecognized DRIZZLE version");
790
err_code= ER_SLAVE_FATAL_ERROR;
791
sprintf(err_buff, ER(err_code), errmsg);
792
err_msg.append(err_buff);
797
Note the following switch will bug when we have DRIZZLE branch 30 ;)
799
switch (*drizzle->server_version)
804
errmsg = _("Master reported unrecognized DRIZZLE version");
805
err_code= ER_SLAVE_FATAL_ERROR;
806
sprintf(err_buff, ER(err_code), errmsg);
807
err_msg.append(err_buff);
810
mi->rli.relay_log.description_event_for_queue= new
811
Format_description_log_event(1, drizzle->server_version);
814
mi->rli.relay_log.description_event_for_queue= new
815
Format_description_log_event(3, drizzle->server_version);
819
Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
820
take the early steps (like tests for "is this a 3.23 master") which we
821
have to take before we receive the real master's Format_desc which will
822
override this one. Note that the Format_desc we create below is garbage
823
(it has the format of the *slave*); it's only good to help know if the
824
master is 3.23, 4.0, etc.
826
mi->rli.relay_log.description_event_for_queue= new
827
Format_description_log_event(4, drizzle->server_version);
833
This does not mean that a 5.0 slave will be able to read a 6.0 master; but
834
as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
835
can't read a 6.0 master, this will show up when the slave can't read some
836
events sent by the master, and there will be error messages.
839
if (err_msg.length() != 0)
842
/* as we are here, we tried to allocate the event */
843
if (!mi->rli.relay_log.description_event_for_queue)
845
errmsg= _("default Format_description_log_event");
846
err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
847
sprintf(err_buff, ER(err_code), errmsg);
848
err_msg.append(err_buff);
853
Compare the master and slave's clock. Do not die if master's clock is
854
unavailable (very old master not supporting UNIX_TIMESTAMP()?).
857
if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
858
(master_res= drizzle_store_result(drizzle)) &&
859
(master_row= drizzle_fetch_row(master_res)))
861
mi->clock_diff_with_master=
862
(long) (time(NULL) - strtoul(master_row[0], 0, 10));
864
else if (!check_io_slave_killed(mi->io_session, mi, NULL))
866
mi->clock_diff_with_master= 0; /* The "most sensible" value */
867
sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
868
"do not trust column Seconds_Behind_Master of SHOW "
869
"SLAVE STATUS. Error: %s (%d)"),
870
drizzle_error(drizzle), drizzle_errno(drizzle));
873
drizzle_free_result(master_res);
876
Check that the master's server id and ours are different. Because if they
877
are equal (which can result from a simple copy of master's datadir to slave,
878
thus copying some drizzle.cnf), replication will work but all events will be
880
Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
882
Note: we could have put a @@SERVER_ID in the previous SELECT
883
UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
885
if (!drizzle_real_query(drizzle,
886
STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
887
(master_res= drizzle_store_result(drizzle)))
889
if ((master_row= drizzle_fetch_row(master_res)) &&
890
(::server_id == strtoul(master_row[1], 0, 10)) &&
891
!mi->rli.replicate_same_server_id)
894
_("The slave I/O thread stops because master and slave have equal "
895
"DRIZZLE server ids; these ids must be different "
896
"for replication to work (or "
897
"the --replicate-same-server-id option must be used "
898
"on slave but this does "
899
"not always make sense; please check the manual before using it).");
900
err_code= ER_SLAVE_FATAL_ERROR;
901
sprintf(err_buff, ER(err_code), errmsg);
902
err_msg.append(err_buff);
904
drizzle_free_result(master_res);
910
Check that the master's global character_set_server and ours are the same.
911
Not fatal if query fails (old master?).
912
Note that we don't check for equality of global character_set_client and
913
collation_connection (neither do we prevent their setting in
914
set_var.cc). That's because from what I (Guilhem) have tested, the global
915
values of these 2 are never used (new connections don't use them).
916
We don't test equality of global collation_database either as it's is
917
going to be deprecated (made read-only) in 4.1 very soon.
918
The test is only relevant if master < 5.0.3 (we'll test only if it's older
919
than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
920
charset info in each binlog event.
921
We don't do it for 3.23 because masters <3.23.50 hang on
922
SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
923
test only if master is 4.x.
926
/* redundant with rest of code but safer against later additions */
927
if (*drizzle->server_version == '3')
930
if ((*drizzle->server_version == '4') &&
931
!drizzle_real_query(drizzle,
932
STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
933
(master_res= drizzle_store_result(drizzle)))
935
if ((master_row= drizzle_fetch_row(master_res)) &&
936
strcmp(master_row[0], global_system_variables.collation_server->name))
939
_("The slave I/O thread stops because master and slave have"
940
" different values for the COLLATION_SERVER global variable."
941
" The values must be equal for replication to work");
942
err_code= ER_SLAVE_FATAL_ERROR;
943
sprintf(err_buff, ER(err_code), errmsg);
944
err_msg.append(err_buff);
946
drizzle_free_result(master_res);
952
Perform analogous check for time zone. Theoretically we also should
953
perform check here to verify that SYSTEM time zones are the same on
954
slave and master, but we can't rely on value of @@system_time_zone
955
variable (it is time zone abbreviation) since it determined at start
956
time and so could differ for slave and master even if they are really
957
in the same system time zone. So we are omiting this check and just
958
relying on documentation. Also according to Monty there are many users
959
who are using replication between servers in various time zones. Hence
960
such check will broke everything for them. (And now everything will
961
work for them because by default both their master and slave will have
963
This check is only necessary for 4.x masters (and < 5.0.4 masters but
966
if ((*drizzle->server_version == '4') &&
967
!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
968
(master_res= drizzle_store_result(drizzle)))
970
if ((master_row= drizzle_fetch_row(master_res)) &&
971
strcmp(master_row[0],
972
global_system_variables.time_zone->get_name()->ptr()))
975
_("The slave I/O thread stops because master and slave have"
976
" different values for the TIME_ZONE global variable."
977
" The values must be equal for replication to work");
978
err_code= ER_SLAVE_FATAL_ERROR;
979
sprintf(err_buff, ER(err_code), errmsg);
980
err_msg.append(err_buff);
982
drizzle_free_result(master_res);
988
if (mi->heartbeat_period != 0.0)
991
const char query_format[]= "SET @master_heartbeat_period= %s";
992
char query[sizeof(query_format) - 2 + sizeof(llbuf)];
994
the period is an uint64_t of nano-secs.
996
llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
997
sprintf(query, query_format, llbuf);
999
if (drizzle_real_query(drizzle, query, strlen(query))
1000
&& !check_io_slave_killed(mi->io_session, mi, NULL))
1002
err_msg.append("The slave I/O thread stops because querying master with '");
1003
err_msg.append(query);
1004
err_msg.append("' failed;");
1005
err_msg.append(" error: ");
1006
err_code= drizzle_errno(drizzle);
1007
err_msg.qs_append(err_code);
1008
err_msg.append(" '");
1009
err_msg.append(drizzle_error(drizzle));
1010
err_msg.append("'");
1011
drizzle_free_result(drizzle_store_result(drizzle));
1014
drizzle_free_result(drizzle_store_result(drizzle));
1018
if (err_msg.length() != 0)
1020
sql_print_error("%s",err_msg.ptr());
1021
assert(err_code != 0);
1022
mi->report(ERROR_LEVEL, err_code, "%s",err_msg.ptr());
1030
static bool wait_for_relay_log_space(Relay_log_info* rli)
1032
bool slave_killed=0;
1033
Master_info* mi = rli->mi;
1034
const char *save_proc_info;
1035
Session* session = mi->io_session;
1037
pthread_mutex_lock(&rli->log_space_lock);
1038
save_proc_info= session->enter_cond(&rli->log_space_cond,
1039
&rli->log_space_lock,
1040
_("Waiting for the slave SQL thread "
1041
"to free enough relay log space"));
1042
while (rli->log_space_limit < rli->log_space_total &&
1043
!(slave_killed=io_slave_killed(session,mi)) &&
1044
!rli->ignore_log_space_limit)
1045
pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1046
session->exit_cond(save_proc_info);
1047
return(slave_killed);
1052
Builds a Rotate from the ignored events' info and writes it to relay log.
1055
write_ignored_events_info_to_relay_log()
1056
session pointer to I/O thread's session
1060
Slave I/O thread, going to die, must leave a durable trace of the
1061
ignored events' end position for the use of the slave SQL thread, by
1062
calling this function. Only that thread can call it (see assertion).
1064
static void write_ignored_events_info_to_relay_log(Session *session,
1067
Relay_log_info *rli= &mi->rli;
1068
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1070
assert(session == mi->io_session);
1071
pthread_mutex_lock(log_lock);
1072
if (rli->ign_master_log_name_end[0])
1074
Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1075
0, rli->ign_master_log_pos_end,
1076
Rotate_log_event::DUP_NAME);
1077
rli->ign_master_log_name_end[0]= 0;
1078
/* can unlock before writing as slave SQL session will soon see our Rotate */
1079
pthread_mutex_unlock(log_lock);
1080
if (likely((bool)ev))
1082
ev->server_id= 0; // don't be ignored by slave SQL thread
1083
if (unlikely(rli->relay_log.append(ev)))
1084
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
1085
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
1086
_("failed to write a Rotate event"
1087
" to the relay log, SHOW SLAVE STATUS may be"
1089
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1091
sql_print_error(_("Failed to flush master info file"));
1095
mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
1096
ER(ER_SLAVE_CREATE_EVENT_FAILURE),
1097
_("Rotate_event (out of memory?),"
1098
" SHOW SLAVE STATUS may be inaccurate"));
1101
pthread_mutex_unlock(log_lock);
1106
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
1107
bool *suppress_warnings)
1109
unsigned char buf[1024], *pos= buf;
1110
uint32_t report_host_len, report_user_len=0, report_password_len=0;
1112
*suppress_warnings= false;
1115
report_host_len= strlen(report_host);
1116
/* 30 is a good safety margin */
1117
if (report_host_len + report_user_len + report_password_len + 30 >
1119
return(0); // safety
1121
int4store(pos, server_id); pos+= 4;
1122
pos= net_store_data(pos, (unsigned char*) report_host, report_host_len);
1123
pos= net_store_data(pos, NULL, report_user_len);
1124
pos= net_store_data(pos, NULL, report_password_len);
1125
int2store(pos, (uint16_t) report_port); pos+= 2;
1126
int4store(pos, 0); pos+= 4;
1127
/* The master will fill in master_id */
1128
int4store(pos, 0); pos+= 4;
1130
if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
1132
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1134
*suppress_warnings= true; // Suppress reconnect warning
1136
else if (!check_io_slave_killed(mi->io_session, mi, NULL))
1139
snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle),
1140
drizzle_errno(drizzle));
1141
mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1142
ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1150
bool show_master_info(Session* session, Master_info* mi)
1152
// TODO: fix this for multi-master
1153
List<Item> field_list;
1154
Protocol *protocol= session->protocol;
1156
field_list.push_back(new Item_empty_string("Slave_IO_State",
1158
field_list.push_back(new Item_empty_string("Master_Host",
1160
field_list.push_back(new Item_empty_string("Master_User",
1162
field_list.push_back(new Item_return_int("Master_Port", 7,
1163
DRIZZLE_TYPE_LONG));
1164
field_list.push_back(new Item_return_int("Connect_Retry", 10,
1165
DRIZZLE_TYPE_LONG));
1166
field_list.push_back(new Item_empty_string("Master_Log_File",
1168
field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
1169
DRIZZLE_TYPE_LONGLONG));
1170
field_list.push_back(new Item_empty_string("Relay_Log_File",
1172
field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
1173
DRIZZLE_TYPE_LONGLONG));
1174
field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
1176
field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
1177
field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1178
field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
1179
field_list.push_back(new Item_empty_string("Last_Error", 20));
1180
field_list.push_back(new Item_return_int("Skip_Counter", 10,
1181
DRIZZLE_TYPE_LONG));
1182
field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
1183
DRIZZLE_TYPE_LONGLONG));
1184
field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
1185
DRIZZLE_TYPE_LONGLONG));
1186
field_list.push_back(new Item_empty_string("Until_Condition", 6));
1187
field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1188
field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1189
DRIZZLE_TYPE_LONGLONG));
1190
field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1191
DRIZZLE_TYPE_LONGLONG));
1192
field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
1193
field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1194
field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
1195
field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
1197
if (protocol->send_fields(&field_list,
1198
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1203
String *packet= &session->packet;
1204
protocol->prepare_for_resend();
1207
slave_running can be accessed without run_lock but not other
1208
non-volotile members like mi->io_session, which is guarded by the mutex.
1210
pthread_mutex_lock(&mi->run_lock);
1211
protocol->store(mi->io_session ? mi->io_session->get_proc_info() : "", &my_charset_bin);
1212
pthread_mutex_unlock(&mi->run_lock);
1214
pthread_mutex_lock(&mi->data_lock);
1215
pthread_mutex_lock(&mi->rli.data_lock);
1216
protocol->store(mi->getHostname(), &my_charset_bin);
1217
protocol->store(mi->getUsername(), &my_charset_bin);
1218
protocol->store((uint32_t) mi->getPort());
1219
protocol->store(mi->getConnectionRetry());
1220
protocol->store(mi->getLogName(), &my_charset_bin);
1221
protocol->store((uint64_t) mi->getLogPosition());
1222
protocol->store(mi->rli.group_relay_log_name.c_str() +
1223
dirname_length(mi->rli.group_relay_log_name.c_str()),
1225
protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1226
protocol->store(mi->rli.group_master_log_name.c_str(), &my_charset_bin);
1227
protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1228
"Yes" : "No", &my_charset_bin);
1229
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1231
protocol->store(mi->rli.last_error().number);
1232
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1233
protocol->store((uint32_t) mi->rli.slave_skip_counter);
1234
protocol->store((uint64_t) mi->rli.group_master_log_pos);
1235
protocol->store((uint64_t) mi->rli.log_space_total);
1238
mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
1239
( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
1240
"Relay"), &my_charset_bin);
1241
protocol->store(mi->rli.until_log_name, &my_charset_bin);
1242
protocol->store((uint64_t) mi->rli.until_log_pos);
1245
Seconds_Behind_Master: if SQL thread is running and I/O thread is
1246
connected, we can compute it otherwise show NULL (i.e. unknown).
1248
if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
1249
mi->rli.slave_running)
1251
long time_diff= ((long)(time(NULL) - mi->rli.last_master_timestamp)
1252
- mi->clock_diff_with_master);
1254
Apparently on some systems time_diff can be <0. Here are possible
1255
reasons related to MySQL:
1256
- the master is itself a slave of another master whose time is ahead.
1257
- somebody used an explicit SET TIMESTAMP on the master.
1258
Possible reason related to granularity-to-second of time functions
1259
(nothing to do with MySQL), which can explain a value of -1:
1260
assume the master's and slave's time are perfectly synchronized, and
1261
that at slave's connection time, when the master's timestamp is read,
1262
it is at the very end of second 1, and (a very short time later) when
1263
the slave's timestamp is read it is at the very beginning of second
1264
2. Then the recorded value for master is 1 and the recorded value for
1265
slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1266
between timestamp of slave and rli->last_master_timestamp is 0
1267
(i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1268
This confuses users, so we don't go below 0: hence the cmax().
1270
last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1271
special marker to say "consider we have caught up".
1273
protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1274
cmax((long)0, time_diff) : 0));
1278
protocol->store_null();
1282
protocol->store(mi->last_error().number);
1284
protocol->store(mi->last_error().message, &my_charset_bin);
1286
protocol->store(mi->rli.last_error().number);
1288
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1290
pthread_mutex_unlock(&mi->rli.data_lock);
1291
pthread_mutex_unlock(&mi->data_lock);
1293
if (my_net_write(&session->net, (unsigned char*) session->packet.ptr(), packet->length()))
1301
void set_slave_thread_options(Session* session)
1304
It's nonsense to constrain the slave threads with max_join_size; if a
1305
query succeeded on master, we HAVE to execute it. So set
1306
OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
1307
(and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
1308
SELECT examining more than 4 billion rows would still fail (yes, because
1309
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1310
only for client threads.
1312
uint64_t options= session->options | OPTION_BIG_SELECTS;
1313
if (opt_log_slave_updates)
1314
options|= OPTION_BIN_LOG;
1316
options&= ~OPTION_BIN_LOG;
1317
session->options= options;
1318
session->variables.completion_type= 0;
1326
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type)
1328
int32_t simulate_error= 0;
1329
session->system_thread = (session_type == SLAVE_Session_SQL) ?
1330
SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1331
session->security_ctx.skip_grants();
1332
my_net_init(&session->net, 0);
1334
Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1335
slave threads, since a replication event can become this much larger
1336
than the corresponding packet (query) sent from client to master.
1338
session->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1339
+ MAX_LOG_EVENT_HEADER; /* note, incr over the global not session var */
1340
session->slave_thread = 1;
1341
set_slave_thread_options(session);
1342
session->client_capabilities = CLIENT_LOCAL_FILES;
1343
pthread_mutex_lock(&LOCK_thread_count);
1344
session->thread_id= session->variables.pseudo_thread_id= thread_id++;
1345
pthread_mutex_unlock(&LOCK_thread_count);
1347
simulate_error|= (1 << SLAVE_Session_IO);
1348
simulate_error|= (1 << SLAVE_Session_SQL);
1349
if (init_thr_lock() || session->store_globals() || simulate_error & (1<< session_type))
1356
if (session_type == SLAVE_Session_SQL)
1357
session->set_proc_info("Waiting for the next event in relay log");
1359
session->set_proc_info("Waiting for master update");
1360
session->version=refresh_version;
1361
session->set_time();
1365
/* Returns non zero on error */
1366
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1367
void* thread_killed_arg)
1370
thr_alarm_t alarmed;
1372
thr_alarm_init(&alarmed);
1373
time_t start_time, end_time;
1375
start_time= time(NULL);
1376
if (start_time == (time_t)-1)
1378
end_time= start_time+sec;
1380
while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1384
The only reason we are asking for alarm is so that
1385
we will be woken up in case of murder, so if we do not get killed,
1386
set the alarm so it goes off after we wake up naturally
1388
thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
1390
thr_end_alarm(&alarmed);
1392
if ((*thread_killed)(session,thread_killed_arg))
1395
start_time= time(NULL);
1396
if (start_time == (time_t)-1)
1403
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
1404
bool *suppress_warnings)
1406
unsigned char buf[FN_REFLEN + 10];
1408
int32_t binlog_flags = 0; // for now
1409
const char* logname = mi->getLogName();
1411
*suppress_warnings= false;
1413
// TODO if big log files: Change next to int8store()
1414
int4store(buf, (uint32_t) mi->getLogPosition());
1415
int2store(buf + 4, binlog_flags);
1416
int4store(buf + 6, server_id);
1417
len = (uint32_t) strlen(logname);
1418
memcpy(buf + 10, logname,len);
1419
if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
1422
Something went wrong, so we will just reconnect and retry later
1423
in the future, we should do a better error analysis, but for
1424
now we just fill up the error log :-)
1426
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1427
*suppress_warnings= true; // Suppress reconnect warning
1429
sql_print_error(_("Error on COM_BINLOG_DUMP: %d %s, will retry in %d secs"),
1430
drizzle_errno(drizzle), drizzle_error(drizzle),
1439
Read one event from the master
1443
DRIZZLE DRIZZLE connection
1444
mi Master connection information
1445
suppress_warnings TRUE when a normal net read timeout has caused us to
1446
try a reconnect. We do not want to print anything to
1447
the error log in this case because this a anormal
1448
event in an idle server.
1451
'packet_error' Error
1452
number Length of packet
1455
static uint32_t read_event(DRIZZLE *drizzle,
1457
bool* suppress_warnings)
1461
*suppress_warnings= false;
1463
my_real_read() will time us out
1464
We check if we were told to die, and if not, try reading again
1466
if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
1467
return(packet_error);
1469
len = cli_safe_read(drizzle);
1470
if (len == packet_error || (int32_t) len < 1)
1472
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1475
We are trying a normal reconnect after a read timeout;
1476
we suppress prints to .err file as long as the reconnect
1477
happens without problems
1479
*suppress_warnings= true;
1482
sql_print_error(_("Error reading packet from server: %s ( server_errno=%d)"),
1483
drizzle_error(drizzle), drizzle_errno(drizzle));
1484
return(packet_error);
1487
/* Check if eof packet */
1488
if (len < 8 && drizzle->net.read_pos[0] == 254)
1490
sql_print_information(_("Slave: received end packet from server, apparent "
1491
"master shutdown: %s"),
1492
drizzle_error(drizzle));
1493
return(packet_error);
1500
int32_t check_expected_error(Session*, Relay_log_info const *,
1501
int32_t expected_error)
1503
switch (expected_error) {
1504
case ER_NET_READ_ERROR:
1505
case ER_NET_ERROR_ON_WRITE:
1506
case ER_QUERY_INTERRUPTED:
1507
case ER_SERVER_SHUTDOWN:
1508
case ER_NEW_ABORTING_CONNECTION:
1517
Check if the current error is of temporary nature of not.
1518
Some errors are temporary in nature, such as
1519
ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. Ndb also signals
1520
that the error is temporary by pushing a warning with the error code
1521
ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1523
static int32_t has_temporary_error(Session *session)
1525
if (session->is_fatal_error)
1528
if (session->main_da.is_error())
1530
session->clear_error();
1531
my_error(ER_LOCK_DEADLOCK, MYF(0));
1535
If there is no message in Session, we can't say if it's a temporary
1536
error or not. This is currently the case for Incident_log_event,
1537
which sets no message. Return FALSE.
1539
if (!session->is_error())
1543
Temporary error codes:
1544
currently, InnoDB deadlock detected by InnoDB or lock
1545
wait timeout (innodb_lock_wait_timeout exceeded
1547
if (session->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
1548
session->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
1556
Applies the given event and advances the relay log position.
1558
In essence, this function does:
1561
ev->apply_event(rli);
1562
ev->update_pos(rli);
1565
But it also does some maintainance, such as skipping events if
1566
needed and reporting errors.
1568
If the @c skip flag is set, then it is tested whether the event
1569
should be skipped, by looking at the slave_skip_counter and the
1570
server id. The skip flag should be set when calling this from a
1571
replication thread but not set when executing an explicit BINLOG
1576
@retval 1 Error calling ev->apply_event().
1578
@retval 2 No error calling ev->apply_event(), but error calling
1581
int32_t apply_event_and_update_pos(Log_event* ev, Session* session, Relay_log_info* rli,
1584
int32_t exec_res= 0;
1587
Execute the event to change the database and update the binary
1588
log coordinates, but first we set some data that is needed for
1591
The event will be executed unless it is supposed to be skipped.
1593
Queries originating from this server must be skipped. Low-level
1594
events (Format_description_log_event, Rotate_log_event,
1595
Stop_log_event) from this server must also be skipped. But for
1596
those we don't want to modify 'group_master_log_pos', because
1597
these events did not exist on the master.
1598
Format_description_log_event is not completely skipped.
1600
Skip queries specified by the user in 'slave_skip_counter'. We
1601
can't however skip events that has something to do with the log
1604
Filtering on own server id is extremely important, to ignore
1605
execution of events created by the creation/rotation of the relay
1606
log (remember that now the relay log starts with its Format_desc,
1610
session->server_id = ev->server_id; // use the original server id for logging
1611
session->set_time(); // time the query
1612
session->lex->current_select= 0;
1615
ev->when= time(NULL);
1616
if(ev->when == (time_t)-1)
1620
ev->session = session; // because up to this point, ev->session == 0
1624
int32_t reason= ev->shall_skip(rli);
1625
if (reason == Log_event::EVENT_SKIP_COUNT)
1626
--rli->slave_skip_counter;
1627
pthread_mutex_unlock(&rli->data_lock);
1628
if (reason == Log_event::EVENT_SKIP_NOT)
1629
exec_res= ev->apply_event(rli);
1632
exec_res= ev->apply_event(rli);
1636
int32_t error= ev->update_pos(rli);
1638
The update should not fail, so print an error message and
1639
return an error code.
1641
TODO: Replace this with a decent error message when merged
1642
with BUG#24954 (which adds several new error message).
1647
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
1648
_("It was not possible to update the positions"
1649
" of the relay log information: the slave may"
1650
" be in an inconsistent state."
1651
" Stopped in %s position %s"),
1652
rli->group_relay_log_name.c_str(),
1653
llstr(rli->group_relay_log_pos, buf));
1658
return(exec_res ? 1 : 0);
1663
Top-level function for executing the next event from the relay log.
1665
This function reads the event from the relay log, executes it, and
1666
advances the relay log position. It also handles errors, etc.
1668
This function may fail to apply the event for the following reasons:
1670
- The position specfied by the UNTIL condition of the START SLAVE
1673
- It was not possible to read the event from the log.
1675
- The slave is killed.
1677
- An error occurred when applying the event, and the event has been
1678
tried slave_trans_retries times. If the event has been retried
1679
fewer times, 0 is returned.
1681
- init_master_info or init_relay_log_pos failed. (These are called
1682
if a failure occurs when applying the event.)</li>
1684
- An error occurred when updating the binlog position.
1686
@retval 0 The event was applied.
1688
@retval 1 The event was not applied.
1690
static int32_t exec_relay_log_event(Session* session, Relay_log_info* rli)
1693
We acquire this mutex since we need it for all operations except
1694
event execution. But we will release it in places where we will
1695
wait for something for example inside of next_event().
1697
pthread_mutex_lock(&rli->data_lock);
1699
Log_event * ev = next_event(rli);
1701
assert(rli->sql_session==session);
1703
if (sql_slave_killed(session,rli))
1705
pthread_mutex_unlock(&rli->data_lock);
1714
This tests if the position of the beginning of the current event
1715
hits the UNTIL barrier.
1717
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
1718
rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
1719
rli->group_master_log_pos :
1720
ev->log_pos - ev->data_written))
1723
sql_print_information(_("Slave SQL thread stopped because it reached its"
1724
" UNTIL position %s"),
1725
llstr(rli->until_pos(), buf));
1727
Setting abort_slave flag because we do not want additional message about
1728
error in query execution to be printed.
1730
rli->abort_slave= 1;
1731
pthread_mutex_unlock(&rli->data_lock);
1735
exec_res= apply_event_and_update_pos(ev, session, rli, true);
1738
Format_description_log_event should not be deleted because it will be
1739
used to read info about the relay log's format; it will be deleted when
1740
the SQL thread does not need it, i.e. when this thread terminates.
1742
if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
1748
update_log_pos failed: this should not happen, so we don't
1754
if (slave_trans_retries)
1756
int32_t temp_err= 0;
1757
if (exec_res && (temp_err= has_temporary_error(session)))
1761
We were in a transaction which has been rolled back because of a
1763
let's seek back to BEGIN log event and retry it all again.
1764
Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
1765
there is no rollback since 5.0.13 (ref: manual).
1766
We have to not only seek but also
1767
a) init_master_info(), to seek back to hot relay log's start for later
1768
(for when we will come back to this hot log after re-processing the
1769
possibly existing old logs where BEGIN is: check_binlog_magic() will
1770
then need the cache to be at position 0 (see comments at beginning of
1771
init_master_info()).
1772
b) init_relay_log_pos(), because the BEGIN may be an older relay log.
1774
if (rli->trans_retries < slave_trans_retries)
1776
if (rli->mi->init_master_info(0, 0, SLAVE_SQL))
1777
sql_print_error(_("Failed to initialize the master info structure"));
1778
else if (init_relay_log_pos(rli,
1779
rli->group_relay_log_name.c_str(),
1780
rli->group_relay_log_pos,
1782
sql_print_error(_("Error initializing relay log position: %s"),
1787
end_trans(session, ROLLBACK);
1788
/* chance for concurrent connection to get more locks */
1789
safe_sleep(session, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1790
(CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1791
pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1792
rli->trans_retries++;
1793
rli->retried_trans++;
1794
pthread_mutex_unlock(&rli->data_lock);
1798
sql_print_error(_("Slave SQL thread retried transaction %"PRIu64" time(s) "
1799
"in vain, giving up. Consider raising the value of "
1800
"the slave_transaction_retries variable."),
1801
slave_trans_retries);
1803
else if ((exec_res && !temp_err) ||
1804
(opt_using_transactions &&
1805
rli->group_relay_log_pos == rli->event_relay_log_pos))
1808
Only reset the retry counter if the entire group succeeded
1809
or failed with a non-transient error. On a successful
1810
event, the execution will proceed as usual; in the case of a
1811
non-transient error, the slave will stop with an error.
1813
rli->trans_retries= 0; // restart from fresh
1818
pthread_mutex_unlock(&rli->data_lock);
1819
rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
1820
ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
1821
_("Could not parse relay log event entry. The possible reasons "
1822
"are: the master's binary log is corrupted (you can check this "
1823
"by running 'mysqlbinlog' on the binary log), the slave's "
1824
"relay log is corrupted (you can check this by running "
1825
"'mysqlbinlog' on the relay log), a network problem, or a bug "
1826
"in the master's or slave's DRIZZLE code. If you want to check "
1827
"the master's binary log or slave's relay log, you will be "
1828
"able to know their names by issuing 'SHOW SLAVE STATUS' "
1835
@brief Try to reconnect slave IO thread.
1837
@details Terminates current connection to master, sleeps for
1838
@c mi->connect_retry msecs and initiates new connection with
1839
@c safe_reconnect(). Variable pointed by @c retry_count is increased -
1840
if it exceeds @c master_retry_count then connection is not re-established
1841
and function signals error.
1842
Unless @c suppres_warnings is TRUE, a warning is put in the server error log
1843
when reconnecting. The warning message and messages used to report errors
1844
are taken from @c messages array. In case @c master_retry_count is exceeded,
1845
no messages are added to the log.
1847
@param[in] session Thread context.
1848
@param[in] DRIZZLE DRIZZLE connection.
1849
@param[in] mi Master connection information.
1850
@param[in,out] retry_count Number of attempts to reconnect.
1851
@param[in] suppress_warnings TRUE when a normal net read timeout
1852
has caused to reconnecting.
1853
@param[in] messages Messages to print/log, see
1854
reconnect_messages[] array.
1857
@retval 1 There was an error.
1860
static int32_t try_to_reconnect(Session *session, DRIZZLE *drizzle, Master_info *mi,
1861
uint32_t *retry_count, bool suppress_warnings,
1862
const char *messages[SLAVE_RECON_MSG_MAX])
1864
mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1865
session->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
1866
drizzle_disconnect(drizzle);
1867
if ((*retry_count)++)
1869
if (*retry_count > master_retry_count)
1870
return 1; // Don't retry forever
1871
safe_sleep(session, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1874
if (check_io_slave_killed(session, mi,
1875
_(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1877
session->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
1878
if (!suppress_warnings)
1880
char buf[256], llbuff[22];
1881
snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1882
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
1884
Raise a warining during registering on master/requesting dump.
1885
Log a message reading event.
1887
if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
1889
mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1890
ER(ER_SLAVE_MASTER_COM_FAILURE),
1891
_(messages[SLAVE_RECON_MSG_COMMAND]), buf);
1895
sql_print_information("%s",buf);
1898
if (safe_reconnect(session, drizzle, mi, 1) || io_slave_killed(session, mi))
1900
if (global_system_variables.log_warnings)
1901
sql_print_information("%s",_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1908
/* Slave I/O Thread entry point */
1910
pthread_handler_t handle_slave_io(void *arg)
1912
Session *session; // needs to be first for thread_stack
1914
Master_info *mi = (Master_info*)arg;
1915
Relay_log_info *rli= &mi->rli;
1917
uint32_t retry_count;
1918
bool suppress_warnings;
1919
uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
1926
pthread_mutex_lock(&mi->run_lock);
1927
/* Inform waiting threads that slave has started */
1930
mi->events_till_disconnect = disconnect_slave_event_count;
1932
session= new Session;
1933
Session_CHECK_SENTRY(session);
1934
mi->io_session = session;
1936
pthread_detach_this_thread();
1937
session->thread_stack= (char*) &session; // remember where our stack is
1938
if (init_slave_thread(session, SLAVE_Session_IO))
1940
pthread_cond_broadcast(&mi->start_cond);
1941
pthread_mutex_unlock(&mi->run_lock);
1942
sql_print_error(_("Failed during slave I/O thread initialization"));
1945
pthread_mutex_lock(&LOCK_thread_count);
1946
threads.append(session);
1947
pthread_mutex_unlock(&LOCK_thread_count);
1948
mi->slave_running = 1;
1949
mi->abort_slave = 0;
1950
pthread_mutex_unlock(&mi->run_lock);
1951
pthread_cond_broadcast(&mi->start_cond);
1953
if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
1955
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
1956
ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
1960
session->set_proc_info("Connecting to master");
1961
// we can get killed during safe_connect
1962
if (!safe_connect(session, drizzle, mi))
1964
sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
1965
"replication started in log '%s' at position %s"),
1966
mi->getUsername(), mi->getHostname(), mi->getPort(),
1968
llstr(mi->getLogPosition(), llbuff));
1970
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
1971
thread, since a replication event can become this much larger than
1972
the corresponding packet (query) sent from client to master.
1974
drizzle->net.max_packet_size= session->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
1978
sql_print_information(_("Slave I/O thread killed while connecting to master"));
1984
// TODO: the assignment below should be under mutex (5.0)
1985
mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
1986
session->slave_net = &drizzle->net;
1987
session->set_proc_info("Checking master version");
1988
if (get_master_version_and_clock(drizzle, mi))
1991
if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
1994
Register ourselves with the master.
1996
session->set_proc_info("Registering slave on master");
1997
if (register_slave_on_master(drizzle, mi, &suppress_warnings))
1999
if (!check_io_slave_killed(session, mi, "Slave I/O thread killed "
2000
"while registering slave on master"))
2002
sql_print_error(_("Slave I/O thread couldn't register on master"));
2003
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2004
reconnect_messages[SLAVE_RECON_ACT_REG]))
2011
if (!retry_count_reg)
2014
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2015
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2016
reconnect_messages[SLAVE_RECON_ACT_REG]))
2022
while (!io_slave_killed(session,mi))
2024
session->set_proc_info("Requesting binlog dump");
2025
if (request_dump(drizzle, mi, &suppress_warnings))
2027
sql_print_error(_("Failed on request_dump()"));
2028
if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while \
2029
requesting master dump")) ||
2030
try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2031
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2035
if (!retry_count_dump)
2038
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2039
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2040
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2045
while (!io_slave_killed(session,mi))
2049
We say "waiting" because read_event() will wait if there's nothing to
2050
read. But if there's something to read, it will not wait. The
2051
important thing is to not confuse users by saying "reading" whereas
2052
we're in fact receiving nothing.
2054
session->set_proc_info(_("Waiting for master to send event"));
2055
event_len= read_event(drizzle, mi, &suppress_warnings);
2056
if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while "
2059
if (!retry_count_event)
2061
retry_count_event++;
2062
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2063
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2064
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2069
if (event_len == packet_error)
2071
uint32_t drizzle_error_number= drizzle_errno(drizzle);
2072
switch (drizzle_error_number) {
2073
case CR_NET_PACKET_TOO_LARGE:
2074
sql_print_error(_("Log entry on master is longer than "
2075
"max_allowed_packet (%u) on "
2076
"slave. If the entry is correct, restart the "
2077
"server with a higher value of "
2078
"max_allowed_packet"),
2079
session->variables.max_allowed_packet);
2081
case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2082
sql_print_error(ER(drizzle_error_number), drizzle_error_number,
2083
drizzle_error(drizzle));
2085
case EE_OUTOFMEMORY:
2086
case ER_OUTOFMEMORY:
2088
_("Stopping slave I/O thread due to out-of-memory error from master"));
2091
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2092
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2095
} // if (event_len == packet_error)
2097
retry_count=0; // ok event, reset retry counter
2098
session->set_proc_info(_("Queuing master event to the relay log"));
2099
if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2105
sql_print_error(_("Failed to flush master info file"));
2109
See if the relay logs take too much space.
2110
We don't lock mi->rli.log_space_lock here; this dirty read saves time
2111
and does not introduce any problem:
2112
- if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
2113
the clean value is 0), then we are reading only one more event as we
2114
should, and we'll block only at the next event. No big deal.
2115
- if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
2116
the clean value is 1), then we are going into wait_for_relay_log_space()
2117
for no reason, but this function will do a clean read, notice the clean
2118
value and exit immediately.
2120
if (rli->log_space_limit && rli->log_space_limit <
2121
rli->log_space_total &&
2122
!rli->ignore_log_space_limit)
2123
if (wait_for_relay_log_space(rli))
2125
sql_print_error(_("Slave I/O thread aborted while waiting for "
2126
"relay log space"));
2134
// print the current replication position
2135
sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2137
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
2138
pthread_mutex_lock(&LOCK_thread_count);
2139
session->query = session->db = 0; // extra safety
2140
session->query_length= session->db_length= 0;
2141
pthread_mutex_unlock(&LOCK_thread_count);
2145
Here we need to clear the active VIO before closing the
2146
connection with the master. The reason is that Session::awake()
2147
might be called from terminate_slave_thread() because somebody
2148
issued a STOP SLAVE. If that happends, the close_active_vio()
2149
can be called in the middle of closing the VIO associated with
2150
the 'mysql' object, causing a crash.
2152
drizzle_close(drizzle);
2155
write_ignored_events_info_to_relay_log(session, mi);
2156
session->set_proc_info(_("Waiting for slave mutex on exit"));
2157
pthread_mutex_lock(&mi->run_lock);
2159
/* Forget the relay log's format */
2160
delete mi->rli.relay_log.description_event_for_queue;
2161
mi->rli.relay_log.description_event_for_queue= 0;
2162
assert(session->net.buff != 0);
2163
net_end(&session->net); // destructor will not free it, because net.vio is 0
2164
close_thread_tables(session);
2165
pthread_mutex_lock(&LOCK_thread_count);
2166
Session_CHECK_SENTRY(session);
2168
pthread_mutex_unlock(&LOCK_thread_count);
2170
mi->slave_running= 0;
2173
Note: the order of the two following calls (first broadcast, then unlock)
2174
is important. Otherwise a killer_thread can execute between the calls and
2175
delete the mi structure leading to a crash! (see BUG#25306 for details)
2177
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
2178
pthread_mutex_unlock(&mi->run_lock);
2181
return(0); // Can't return anything here
2185
/* Slave SQL Thread entry point */
2187
pthread_handler_t handle_slave_sql(void *arg)
2189
Session *session; /* needs to be first for thread_stack */
2190
char llbuff[22],llbuff1[22];
2192
Relay_log_info* rli = &((Master_info*)arg)->rli;
2197
assert(rli->inited);
2198
pthread_mutex_lock(&rli->run_lock);
2199
assert(!rli->slave_running);
2201
rli->events_till_abort = abort_slave_event_count;
2203
session = new Session;
2204
session->thread_stack = (char*)&session; // remember where our stack is
2205
rli->sql_session= session;
2207
/* Inform waiting threads that slave has started */
2208
rli->slave_run_id++;
2209
rli->slave_running = 1;
2211
pthread_detach_this_thread();
2212
if (init_slave_thread(session, SLAVE_Session_SQL))
2215
TODO: this is currently broken - slave start and change master
2216
will be stuck if we fail here
2218
pthread_cond_broadcast(&rli->start_cond);
2219
pthread_mutex_unlock(&rli->run_lock);
2220
sql_print_error(_("Failed during slave thread initialization"));
2223
session->init_for_queries();
2224
session->temporary_tables = rli->save_temporary_tables; // restore temp tables
2225
pthread_mutex_lock(&LOCK_thread_count);
2226
threads.append(session);
2227
pthread_mutex_unlock(&LOCK_thread_count);
2229
We are going to set slave_running to 1. Assuming slave I/O thread is
2230
alive and connected, this is going to make Seconds_Behind_Master be 0
2231
i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
2232
the moment we start we can think we are caught up, and the next second we
2233
start receiving data so we realize we are not caught up and
2234
Seconds_Behind_Master grows. No big deal.
2236
rli->abort_slave = 0;
2237
pthread_mutex_unlock(&rli->run_lock);
2238
pthread_cond_broadcast(&rli->start_cond);
2241
Reset errors for a clean start (otherwise, if the master is idle, the SQL
2242
thread may execute no Query_log_event, so the error will remain even
2243
though there's no problem anymore). Do not reset the master timestamp
2244
(imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
2245
as we are not sure that we are going to receive a query, we want to
2246
remember the last master timestamp (to say how many seconds behind we are
2248
But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
2252
//tell the I/O thread to take relay_log_space_limit into account from now on
2253
pthread_mutex_lock(&rli->log_space_lock);
2254
rli->ignore_log_space_limit= 0;
2255
pthread_mutex_unlock(&rli->log_space_lock);
2256
rli->trans_retries= 0; // start from "no error"
2258
if (init_relay_log_pos(rli,
2259
rli->group_relay_log_name.c_str(),
2260
rli->group_relay_log_pos,
2261
1 /*need data lock*/, &errmsg,
2262
1 /*look for a description_event*/))
2264
sql_print_error(_("Error initializing relay log position: %s"),
2268
Session_CHECK_SENTRY(session);
2269
assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2271
Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2272
correct position when it's called just after my_b_seek() (the questionable
2273
stuff is those "seek is done on next read" comments in the my_b_seek()
2275
The crude reality is that this assertion randomly fails whereas
2276
replication seems to work fine. And there is no easy explanation why it
2277
fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
2278
init_relay_log_pos() called above). Maybe the assertion would be
2279
meaningful if we held rli->data_lock between the my_b_seek() and the
2282
assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2283
assert(rli->sql_session == session);
2285
if (global_system_variables.log_warnings)
2286
sql_print_information(_("Slave SQL thread initialized, "
2287
"starting replication in log '%s' at "
2288
"position %s, relay log '%s' position: %s"),
2290
llstr(rli->group_master_log_pos,llbuff),
2291
rli->group_relay_log_name.c_str(),
2292
llstr(rli->group_relay_log_pos,llbuff1));
2294
/* execute init_slave variable */
2295
if (sys_init_slave.value_length)
2297
execute_init_command(session, &sys_init_slave, &LOCK_sys_init_slave);
2298
if (session->is_slave_error)
2300
sql_print_error(_("Slave SQL thread aborted. "
2301
"Can't execute init_slave query"));
2307
First check until condition - probably there is nothing to execute. We
2308
do not want to wait for next event in this case.
2310
pthread_mutex_lock(&rli->data_lock);
2311
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
2312
rli->is_until_satisfied(rli->group_master_log_pos))
2315
sql_print_information(_("Slave SQL thread stopped because it reached its"
2316
" UNTIL position %s"), llstr(rli->until_pos(), buf));
2317
pthread_mutex_unlock(&rli->data_lock);
2320
pthread_mutex_unlock(&rli->data_lock);
2322
/* Read queries from the IO/THREAD until this thread is killed */
2324
while (!sql_slave_killed(session,rli))
2326
session->set_proc_info(_("Reading event from the relay log"));
2327
assert(rli->sql_session == session);
2328
Session_CHECK_SENTRY(session);
2329
if (exec_relay_log_event(session,rli))
2331
// do not scare the user if SQL thread was simply killed or stopped
2332
if (!sql_slave_killed(session,rli))
2335
retrieve as much info as possible from the session and, error
2336
codes and warnings and print this to the error log as to
2337
allow the user to locate the error
2339
uint32_t const last_errno= rli->last_error().number;
2341
if (session->is_error())
2343
char const *const errmsg= session->main_da.message();
2345
if (last_errno == 0)
2347
rli->report(ERROR_LEVEL, session->main_da.sql_errno(), "%s", errmsg);
2349
else if (last_errno != session->main_da.sql_errno())
2351
sql_print_error(_("Slave (additional info): %s Error_code: %d"),
2352
errmsg, session->main_da.sql_errno());
2356
/* Print any warnings issued */
2357
List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
2360
Added controlled slave thread cancel for replication
2361
of user-defined variables.
2363
bool udf_error = false;
2366
if (err->code == ER_CANT_OPEN_LIBRARY)
2368
sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
2371
sql_print_error(_("Error loading user-defined library, slave SQL "
2372
"thread aborted. Install the missing library, "
2373
"and restart the slave SQL thread with "
2374
"\"SLAVE START\". We stopped at log '%s' "
2376
RPL_LOG_NAME, llstr(rli->group_master_log_pos,
2379
sql_print_error(_("Error running query, slave SQL thread aborted. "
2380
"Fix the problem, and restart "
2381
"the slave SQL thread with \"SLAVE START\". "
2382
"We stopped at log '%s' position %s"),
2384
llstr(rli->group_master_log_pos, llbuff));
2390
/* Thread stopped. Print the current replication position to the log */
2391
sql_print_information(_("Slave SQL thread exiting, replication stopped in "
2392
"log '%s' at position %s"),
2394
llstr(rli->group_master_log_pos,llbuff));
2399
Some events set some playgrounds, which won't be cleared because thread
2400
stops. Stopping of this thread may not be known to these events ("stop"
2401
request is detected only by the present function, not by events), so we
2402
must "proactively" clear playgrounds:
2404
rli->cleanup_context(session, 1);
2405
pthread_mutex_lock(&LOCK_thread_count);
2407
Some extra safety, which should not been needed (normally, event deletion
2408
should already have done these assignments (each event which sets these
2409
variables is supposed to set them to 0 before terminating)).
2411
session->query= session->db= session->catalog= 0;
2412
session->query_length= session->db_length= 0;
2413
pthread_mutex_unlock(&LOCK_thread_count);
2414
session->set_proc_info("Waiting for slave mutex on exit");
2415
pthread_mutex_lock(&rli->run_lock);
2416
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
2417
pthread_mutex_lock(&rli->data_lock);
2418
assert(rli->slave_running == 1); // tracking buffer overrun
2419
/* When master_pos_wait() wakes up it will check this and terminate */
2420
rli->slave_running= 0;
2421
/* Forget the relay log's format */
2422
delete rli->relay_log.description_event_for_exec;
2423
rli->relay_log.description_event_for_exec= 0;
2424
/* Wake up master_pos_wait() */
2425
pthread_mutex_unlock(&rli->data_lock);
2426
pthread_cond_broadcast(&rli->data_cond);
2427
rli->ignore_log_space_limit= 0; /* don't need any lock */
2428
rli->save_temporary_tables = session->temporary_tables;
2431
TODO: see if we can do this conditionally in next_event() instead
2432
to avoid unneeded position re-init
2434
session->temporary_tables = 0; // remove tempation from destructor to close them
2435
assert(session->net.buff != 0);
2436
net_end(&session->net); // destructor will not free it, because we are weird
2437
assert(rli->sql_session == session);
2438
Session_CHECK_SENTRY(session);
2439
rli->sql_session= 0;
2440
pthread_mutex_lock(&LOCK_thread_count);
2441
Session_CHECK_SENTRY(session);
2443
pthread_mutex_unlock(&LOCK_thread_count);
2445
Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
2446
is important. Otherwise a killer_thread can execute between the calls and
2447
delete the mi structure leading to a crash! (see BUG#25306 for details)
2449
pthread_cond_broadcast(&rli->stop_cond);
2450
pthread_mutex_unlock(&rli->run_lock); // tell the world we are done
2454
return(0); // Can't return anything here
2459
process_io_create_file()
2462
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
2466
bool cev_not_written;
2467
Session *session = mi->io_session;
2468
NET *net = &mi->drizzle->net;
2470
if (unlikely(!cev->is_valid()))
2473
assert(cev->inited_from_old);
2474
session->file_id = cev->file_id = mi->file_id++;
2475
session->server_id = cev->server_id;
2476
cev_not_written = 1;
2478
if (unlikely(net_request_file(net,cev->fname)))
2480
sql_print_error(_("Slave I/O: failed requesting download of '%s'"),
2486
This dummy block is so we could instantiate Append_block_log_event
2487
once and then modify it slightly instead of doing it multiple times
2491
Append_block_log_event aev(session,0,0,0,0);
2495
if (unlikely((num_bytes=my_net_read(net)) == packet_error))
2497
sql_print_error(_("Network read error downloading '%s' from master"),
2501
if (unlikely(!num_bytes)) /* eof */
2503
/* 3.23 master wants it */
2504
net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0);
2506
If we wrote Create_file_log_event, then we need to write
2507
Execute_load_log_event. If we did not write Create_file_log_event,
2508
then this is an empty file and we can just do as if the LOAD DATA
2509
INFILE had not existed, i.e. write nothing.
2511
if (unlikely(cev_not_written))
2513
Execute_load_log_event xev(session,0,0);
2514
xev.log_pos = cev->log_pos;
2515
if (unlikely(mi->rli.relay_log.append(&xev)))
2517
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2518
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2519
_("error writing Exec_load event to relay log"));
2522
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2525
if (unlikely(cev_not_written))
2527
cev->block = net->read_pos;
2528
cev->block_len = num_bytes;
2529
if (unlikely(mi->rli.relay_log.append(cev)))
2531
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2532
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2533
_("error writing Create_file event to relay log"));
2537
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2541
aev.block = net->read_pos;
2542
aev.block_len = num_bytes;
2543
aev.log_pos = cev->log_pos;
2544
if (unlikely(mi->rli.relay_log.append(&aev)))
2546
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2547
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2548
_("error writing Append_block event to relay log"));
2551
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
2562
Start using a new binary log on the master
2566
mi master_info for the slave
2567
rev The rotate log event read from the binary log
2570
Updates the master info with the place in the next binary
2571
log where we should start reading.
2572
Rotate the relay log to avoid mixed-format relay logs.
2575
We assume we already locked mi->data_lock
2579
1 Log event is illegal
2583
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
2585
safe_mutex_assert_owner(&mi->data_lock);
2587
if (unlikely(!rev->is_valid()))
2590
/* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2591
mi->setLogName(rev->new_log_ident.c_str());
2592
mi->setLogPosition(rev->pos);
2594
If we do not do this, we will be getting the first
2595
rotate event forever, so we need to not disconnect after one.
2597
if (disconnect_slave_event_count)
2598
mi->events_till_disconnect++;
2601
If description_event_for_queue is format <4, there is conversion in the
2602
relay log to the slave's format (4). And Rotate can mean upgrade or
2603
nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
2604
no need to reset description_event_for_queue now. And if it's nothing (same
2605
master version as before), no need (still using the slave's format).
2607
if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
2609
delete mi->rli.relay_log.description_event_for_queue;
2610
/* start from format 3 (DRIZZLE 4.0) again */
2611
mi->rli.relay_log.description_event_for_queue= new
2612
Format_description_log_event(3);
2615
Rotate the relay log makes binlog format detection easier (at next slave
2616
start or mysqlbinlog)
2618
rotate_relay_log(mi); /* will take the right mutexes */
2623
Reads a 3.23 event and converts it to the slave's format. This code was
2624
copied from DRIZZLE 4.0.
2626
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
2629
const char *errmsg = 0;
2631
bool ignore_event= 0;
2633
Relay_log_info *rli= &mi->rli;
2636
If we get Load event, we need to pass a non-reusable buffer
2637
to read_log_event, so we do a trick
2639
if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
2641
if (unlikely(!(tmp_buf=(char*)malloc(event_len+1))))
2643
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2644
ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
2647
memcpy(tmp_buf,buf,event_len);
2649
Create_file constructor wants a 0 as last char of buffer, this 0 will
2650
serve as the string-termination char for the file's name (which is at the
2652
We must increment event_len, otherwise the event constructor will not see
2653
this end 0, which leads to segfault.
2655
tmp_buf[event_len++]=0;
2656
int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
2657
buf = (const char*)tmp_buf;
2660
This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
2661
send the loaded file, and write it to the relay log in the form of
2662
Append_block/Exec_load (the SQL thread needs the data, as that thread is not
2663
connected to the master).
2665
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2666
mi->rli.relay_log.description_event_for_queue);
2669
sql_print_error(_("Read invalid event from master: '%s', "
2670
"master could be corrupt but a more likely cause "
2671
"of this is a bug"),
2673
free((char*) tmp_buf);
2677
pthread_mutex_lock(&mi->data_lock);
2678
ev->log_pos= mi->getLogPosition(); /* 3.23 events don't contain log_pos */
2679
switch (ev->get_type_code()) {
2685
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2688
pthread_mutex_unlock(&mi->data_lock);
2693
case CREATE_FILE_EVENT:
2695
Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
2696
queue_old_event() which is for 3.23 events which don't comprise
2697
CREATE_FILE_EVENT. This is because read_log_event() above has just
2698
transformed LOAD_EVENT into CREATE_FILE_EVENT.
2701
/* We come here when and only when tmp_buf != 0 */
2702
assert(tmp_buf != 0);
2704
ev->log_pos+= inc_pos;
2705
int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2707
mi->incrementLogPosition(inc_pos);
2708
pthread_mutex_unlock(&mi->data_lock);
2709
free((char*)tmp_buf);
2716
if (likely(!ignore_event))
2720
Don't do it for fake Rotate events (see comment in
2721
Log_event::Log_event(const char* buf...) in log_event.cc).
2723
ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
2724
if (unlikely(rli->relay_log.append(ev)))
2727
pthread_mutex_unlock(&mi->data_lock);
2730
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2733
mi->incrementLogPosition(inc_pos);
2734
pthread_mutex_unlock(&mi->data_lock);
2739
Reads a 4.0 event and converts it to the slave's format. This code was copied
2740
from queue_binlog_ver_1_event(), with some affordable simplifications.
2742
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
2745
const char *errmsg = 0;
2748
Relay_log_info *rli= &mi->rli;
2750
/* read_log_event() will adjust log_pos to be end_log_pos */
2751
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2752
mi->rli.relay_log.description_event_for_queue);
2755
sql_print_error(_("Read invalid event from master: '%s', "
2756
"master could be corrupt but a more likely cause of "
2759
free((char*) tmp_buf);
2762
pthread_mutex_lock(&mi->data_lock);
2763
switch (ev->get_type_code()) {
2767
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2770
pthread_mutex_unlock(&mi->data_lock);
2779
if (unlikely(rli->relay_log.append(ev)))
2782
pthread_mutex_unlock(&mi->data_lock);
2785
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2787
mi->incrementLogPosition(inc_pos);
2789
pthread_mutex_unlock(&mi->data_lock);
2796
Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
2797
(exactly, slave's) format. To do the conversion, we create a 5.0 event from
2798
the 3.23/4.0 bytes, then write this event to the relay log.
2801
Test this code before release - it has to be tested on a separate
2802
setup with 3.23 master or 4.0 master
2805
static int32_t queue_old_event(Master_info *mi, const char *buf,
2808
switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
2811
return(queue_binlog_ver_1_event(mi,buf,event_len));
2813
return(queue_binlog_ver_3_event(mi,buf,event_len));
2814
default: /* unsupported format; eg version 2 */
2822
If the event is 3.23/4.0, passes it to queue_old_event() which will convert
2823
it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
2824
no format conversion, it's pure read/write of bytes.
2825
So a 5.0.0 slave's relay log can contain events in the slave's format or in
2829
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
2833
uint32_t inc_pos= 0;
2834
Relay_log_info *rli= &mi->rli;
2835
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
2838
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
2839
buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
2840
return(queue_old_event(mi,buf,event_len));
2842
pthread_mutex_lock(&mi->data_lock);
2844
switch (buf[EVENT_TYPE_OFFSET]) {
2847
We needn't write this event to the relay log. Indeed, it just indicates a
2848
master server shutdown. The only thing this does is cleaning. But
2849
cleaning is already done on a per-master-thread basis (as the master
2850
server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
2851
prepared statements' deletion are TODO only when we binlog prep stmts).
2853
We don't even increment mi->master_log_pos, because we may be just after
2854
a Rotate event. Btw, in a few milliseconds we are going to have a Start
2855
event from the next binlog (unless the master is presently running
2861
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
2862
if (unlikely(process_io_rotate(mi,&rev)))
2864
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2868
Now the I/O thread has just changed its mi->master_log_name, so
2869
incrementing mi->master_log_pos is nonsense.
2874
case FORMAT_DESCRIPTION_EVENT:
2877
Create an event, and save it (when we rotate the relay log, we will have
2878
to write this event again).
2881
We are the only thread which reads/writes description_event_for_queue.
2882
The relay_log struct does not move (though some members of it can
2883
change), so we needn't any lock (no rli->data_lock, no log lock).
2885
Format_description_log_event* tmp;
2887
if (!(tmp= (Format_description_log_event*)
2888
Log_event::read_log_event(buf, event_len, &errmsg,
2889
mi->rli.relay_log.description_event_for_queue)))
2891
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2894
delete mi->rli.relay_log.description_event_for_queue;
2895
mi->rli.relay_log.description_event_for_queue= tmp;
2897
Though this does some conversion to the slave's format, this will
2898
preserve the master's binlog format version, and number of event types.
2901
If the event was not requested by the slave (the slave did not ask for
2902
it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
2904
inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
2908
case HEARTBEAT_LOG_EVENT:
2911
HB (heartbeat) cannot come before RL (Relay)
2914
Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
2917
error= ER_SLAVE_HEARTBEAT_FAILURE;
2918
error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2919
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2920
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2921
error_msg.append(STRING_WITH_LEN(" log_pos "));
2922
llstr(hb.log_pos, llbuf);
2923
error_msg.append(llbuf, strlen(llbuf));
2926
mi->received_heartbeats++;
2928
compare local and event's versions of log_file, log_pos.
2930
Heartbeat is sent only after an event corresponding to the corrdinates
2931
the heartbeat carries.
2932
Slave can not have a difference in coordinates except in the only
2933
special case when mi->master_log_name, master_log_pos have never
2934
been updated by Rotate event i.e when slave does not have any history
2935
with the master (and thereafter mi->master_log_pos is NULL).
2937
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2939
if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
2940
|| mi->getLogPosition() != hb.log_pos)
2942
/* missed events of heartbeat from the past */
2943
error= ER_SLAVE_HEARTBEAT_FAILURE;
2944
error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
2945
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2946
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2947
error_msg.append(STRING_WITH_LEN(" log_pos "));
2948
llstr(hb.log_pos, llbuf);
2949
error_msg.append(llbuf, strlen(llbuf));
2952
goto skip_relay_logging;
2962
If this event is originating from this server, don't queue it.
2963
We don't check this for 3.23 events because it's simpler like this; 3.23
2964
will be filtered anyway by the SQL slave thread which also tests the
2965
server id (we must also keep this test in the SQL thread, in case somebody
2966
upgrades a 4.0 slave which has a not-filtered relay log).
2968
ANY event coming from ourselves can be ignored: it is obvious for queries;
2969
for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
2970
(--log-slave-updates would not log that) unless this slave is also its
2971
direct master (an unsupported, useless setup!).
2974
pthread_mutex_lock(log_lock);
2976
if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
2977
!mi->rli.replicate_same_server_id)
2980
Do not write it to the relay log.
2981
a) We still want to increment mi->master_log_pos, so that we won't
2982
re-read this event from the master if the slave IO thread is now
2983
stopped/restarted (more efficient if the events we are ignoring are big
2985
b) We want to record that we are skipping events, for the information of
2986
the slave SQL thread, otherwise that thread may let
2987
rli->group_relay_log_pos stay too small if the last binlog's event is
2989
But events which were generated by this slave and which do not exist in
2990
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
2993
if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
2994
buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
2995
buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
2997
mi->incrementLogPosition(inc_pos);
2998
memcpy(rli->ign_master_log_name_end, mi->getLogName(), FN_REFLEN);
2999
assert(rli->ign_master_log_name_end[0]);
3000
rli->ign_master_log_pos_end= mi->getLogPosition();
3002
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3006
/* write the event to the relay log */
3007
if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3009
mi->incrementLogPosition(inc_pos);
3010
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3014
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
3016
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
3018
pthread_mutex_unlock(log_lock);
3023
pthread_mutex_unlock(&mi->data_lock);
3025
mi->report(ERROR_LEVEL, error, ER(error),
3026
(error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
3027
_("could not queue event from master") :
3033
void end_relay_log_info(Relay_log_info* rli)
3037
if (rli->info_fd >= 0)
3039
end_io_cache(&rli->info_file);
3040
(void) my_close(rli->info_fd, MYF(MY_WME));
3043
if (rli->cur_log_fd >= 0)
3045
end_io_cache(&rli->cache_buf);
3046
(void)my_close(rli->cur_log_fd, MYF(MY_WME));
3047
rli->cur_log_fd = -1;
3050
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
3051
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3053
Delete the slave's temporary tables from memory.
3054
In the future there will be other actions than this, to ensure persistance
3055
of slave's temp tables after shutdown.
3057
rli->close_temporary_tables();
3062
Try to connect until successful or slave killed
3066
session Thread handler for slave
3067
DRIZZLE DRIZZLE connection handle
3068
mi Replication handle
3075
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi)
3077
return(connect_to_master(session, drizzle, mi, 0, 0));
3086
Try to connect until successful or slave killed or we have retried
3087
master_retry_count times
3090
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
3091
bool reconnect, bool suppress_warnings)
3093
int32_t slave_was_killed;
3094
int32_t last_errno= -2; // impossible error
3095
uint32_t err_count=0;
3098
mi->events_till_disconnect = disconnect_slave_event_count;
3099
uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
3100
if (opt_slave_compressed_protocol)
3101
client_flag=CLIENT_COMPRESS; /* We will use compression */
3103
drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3104
drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3106
while (!(slave_was_killed = io_slave_killed(session,mi)) &&
3107
(reconnect ? drizzle_reconnect(drizzle) != 0 :
3108
drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
3109
mi->getPort(), 0, client_flag) == 0))
3111
/* Don't repeat last error */
3112
if ((int32_t)drizzle_errno(drizzle) != last_errno)
3114
last_errno=drizzle_errno(drizzle);
3115
suppress_warnings= 0;
3116
mi->report(ERROR_LEVEL, last_errno,
3117
_("error %s to master '%s@%s:%d'"
3118
" - retry-time: %d retries: %u"),
3119
(reconnect ? _("reconnecting") : _("connecting")),
3120
mi->getUsername(), mi->getHostname(), mi->getPort(),
3121
mi->getConnectionRetry(), master_retry_count);
3124
By default we try forever. The reason is that failure will trigger
3125
master election, so if the user did not set master_retry_count we
3126
do not want to have election triggered on the first failure to
3129
if (++err_count == master_retry_count)
3134
safe_sleep(session,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3138
if (!slave_was_killed)
3142
if (!suppress_warnings && global_system_variables.log_warnings)
3143
sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3144
"replication resumed in log '%s' at "
3145
"position %s"), mi->getUsername(),
3146
mi->getHostname(), mi->getPort(),
3148
llstr(mi->getLogPosition(),llbuff));
3151
drizzle->reconnect= 1;
3152
return(slave_was_killed);
3160
Try to connect until successful or slave killed or we have retried
3161
master_retry_count times
3164
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
3165
bool suppress_warnings)
3167
return(connect_to_master(session, drizzle, mi, 1, suppress_warnings));
3172
Store the file and position where the execute-slave thread are in the
3176
flush_relay_log_info()
3177
rli Relay log information
3180
- As this is only called by the slave thread, we don't need to
3181
have a lock on this.
3182
- If there is an active transaction, then we don't update the position
3183
in the relay log. This is to ensure that we re-execute statements
3184
if we die in the middle of an transaction that was rolled back.
3185
- As a transaction never spans binary logs, we don't have to handle the
3186
case where we do a relay-log-rotation in the middle of the transaction.
3187
If this would not be the case, we would have to ensure that we
3188
don't delete the relay log file where the transaction started when
3189
we switch to a new relay log file.
3192
- Change the log file information to a binary format to avoid calling
3200
bool flush_relay_log_info(Relay_log_info* rli)
3204
if (unlikely(rli->no_storage))
3212
Called when we notice that the current "hot" log got rotated under our feet.
3215
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
3217
assert(rli->cur_log != &rli->cache_buf);
3218
assert(rli->cur_log_fd == -1);
3220
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3221
if ((rli->cur_log_fd=open_binlog(cur_log, rli->event_relay_log_name.c_str(), errmsg)) < 0)
3224
We want to start exactly where we was before:
3225
relay_log_pos Current log pos
3226
pending Number of bytes already processed from the event
3228
rli->event_relay_log_pos= cmax(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3229
my_b_seek(cur_log,rli->event_relay_log_pos);
3234
static Log_event* next_event(Relay_log_info* rli)
3237
IO_CACHE* cur_log = rli->cur_log;
3238
pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3239
const char* errmsg=0;
3240
Session* session = rli->sql_session;
3242
assert(session != 0);
3244
if (abort_slave_event_count && !rli->events_till_abort--)
3248
For most operations we need to protect rli members with data_lock,
3249
so we assume calling function acquired this mutex for us and we will
3250
hold it for the most of the loop below However, we will release it
3251
whenever it is worth the hassle, and in the cases when we go into a
3252
pthread_cond_wait() with the non-data_lock mutex
3254
safe_mutex_assert_owner(&rli->data_lock);
3256
while (!sql_slave_killed(session,rli))
3259
We can have two kinds of log reading:
3261
rli->cur_log points at the IO_CACHE of relay_log, which
3262
is actively being updated by the I/O thread. We need to be careful
3263
in this case and make sure that we are not looking at a stale log that
3264
has already been rotated. If it has been, we reopen the log.
3266
The other case is much simpler:
3267
We just have a read only log that nobody else will be updating.
3270
if ((hot_log = (cur_log != &rli->cache_buf)))
3272
assert(rli->cur_log_fd == -1); // foreign descriptor
3273
pthread_mutex_lock(log_lock);
3276
Reading xxx_file_id is safe because the log will only
3277
be rotated when we hold relay_log.LOCK_log
3279
if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
3281
// The master has switched to a new log file; Reopen the old log file
3282
cur_log=reopen_relay_log(rli, &errmsg);
3283
pthread_mutex_unlock(log_lock);
3284
if (!cur_log) // No more log files
3286
hot_log=0; // Using old binary log
3290
As there is no guarantee that the relay is open (for example, an I/O
3291
error during a write by the slave I/O thread may have closed it), we
3294
if (!my_b_inited(cur_log))
3296
assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3297
assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
3300
Relay log is always in new format - if the master is 3.23, the
3301
I/O thread will convert the format for us.
3302
A problem: the description event may be in a previous relay log. So if
3303
the slave has been shutdown meanwhile, we would have to look in old relay
3304
logs, which may even have been deleted. So we need to write this
3305
description event at the beginning of the relay log.
3306
When the relay log is created when the I/O thread starts, easy: the
3307
master will send the description event and we will queue it.
3308
But if the relay log is created by new_file(): then the solution is:
3309
DRIZZLE_BIN_LOG::open() will write the buffered description event.
3311
if ((ev=Log_event::read_log_event(cur_log,0,
3312
rli->relay_log.description_event_for_exec)))
3315
assert(session==rli->sql_session);
3317
read it while we have a lock, to avoid a mutex lock in
3318
inc_event_relay_log_pos()
3320
rli->future_event_relay_log_pos= my_b_tell(cur_log);
3322
pthread_mutex_unlock(log_lock);
3325
assert(session==rli->sql_session);
3326
if (opt_reckless_slave) // For mysql-test
3328
if (cur_log->error < 0)
3330
errmsg = "slave SQL thread aborted because of I/O error";
3332
pthread_mutex_unlock(log_lock);
3335
if (!cur_log->error) /* EOF */
3338
On a hot log, EOF means that there are no more updates to
3339
process and we must block until I/O thread adds some and
3340
signals us to continue
3345
We say in Seconds_Behind_Master that we have "caught up". Note that
3346
for example if network link is broken but I/O slave thread hasn't
3347
noticed it (slave_net_timeout not elapsed), then we'll say "caught
3348
up" whereas we're not really caught up. Fixing that would require
3349
internally cutting timeout in smaller pieces in network read, no
3350
thanks. Another example: SQL has caught up on I/O, now I/O has read
3351
a new event and is queuing it; the false "0" will exist until SQL
3352
finishes executing the new event; it will be look abnormal only if
3353
the events have old timestamps (then you get "many", 0, "many").
3355
Transient phases like this can be fixed with implemeting
3356
Heartbeat event which provides the slave the status of the
3357
master at time the master does not have any new update to send.
3358
Seconds_Behind_Master would be zero only when master has no
3359
more updates in binlog for slave. The heartbeat can be sent
3360
in a (small) fraction of slave_net_timeout. Until it's done
3361
rli->last_master_timestamp is temporarely (for time of
3362
waiting for the following event) reset whenever EOF is
3365
time_t save_timestamp= rli->last_master_timestamp;
3366
rli->last_master_timestamp= 0;
3368
assert(rli->relay_log.get_open_count() ==
3369
rli->cur_log_old_open_count);
3371
if (rli->ign_master_log_name_end[0])
3373
/* We generate and return a Rotate, to make our positions advance */
3374
ev= new Rotate_log_event(rli->ign_master_log_name_end,
3375
0, rli->ign_master_log_pos_end,
3376
Rotate_log_event::DUP_NAME);
3377
rli->ign_master_log_name_end[0]= 0;
3378
pthread_mutex_unlock(log_lock);
3381
errmsg= "Slave SQL thread failed to create a Rotate event "
3382
"(out of memory?), SHOW SLAVE STATUS may be inaccurate";
3385
ev->server_id= 0; // don't be ignored by slave SQL thread
3390
We can, and should release data_lock while we are waiting for
3391
update. If we do not, show slave status will block
3393
pthread_mutex_unlock(&rli->data_lock);
3397
- the I/O thread has reached log_space_limit
3398
- the SQL thread has read all relay logs, but cannot purge for some
3400
* it has already purged all logs except the current one
3401
* there are other logs than the current one but they're involved in
3402
a transaction that finishes in the current one (or is not finished)
3404
Wake up the possibly waiting I/O thread, and set a boolean asking
3405
the I/O thread to temporarily ignore the log_space_limit
3406
constraint, because we do not want the I/O thread to block because of
3407
space (it's ok if it blocks for any other reason (e.g. because the
3408
master does not send anything). Then the I/O thread stops waiting
3409
and reads more events.
3410
The SQL thread decides when the I/O thread should take log_space_limit
3411
into account again : ignore_log_space_limit is reset to 0
3412
in purge_first_log (when the SQL thread purges the just-read relay
3413
log), and also when the SQL thread starts. We should also reset
3414
ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
3415
fact, no need as RESET SLAVE requires that the slave
3416
be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
3419
pthread_mutex_lock(&rli->log_space_lock);
3420
// prevent the I/O thread from blocking next times
3421
rli->ignore_log_space_limit= 1;
3423
If the I/O thread is blocked, unblock it. Ok to broadcast
3424
after unlock, because the mutex is only destroyed in
3425
~Relay_log_info(), i.e. when rli is destroyed, and rli will
3426
not be destroyed before we exit the present function.
3428
pthread_mutex_unlock(&rli->log_space_lock);
3429
pthread_cond_broadcast(&rli->log_space_cond);
3430
// Note that wait_for_update_relay_log unlocks lock_log !
3431
rli->relay_log.wait_for_update_relay_log(rli->sql_session);
3432
// re-acquire data lock since we released it earlier
3433
pthread_mutex_lock(&rli->data_lock);
3434
rli->last_master_timestamp= save_timestamp;
3438
If the log was not hot, we need to move to the next log in
3439
sequence. The next log could be hot or cold, we deal with both
3440
cases separately after doing some common initialization
3442
end_io_cache(cur_log);
3443
assert(rli->cur_log_fd >= 0);
3444
my_close(rli->cur_log_fd, MYF(MY_WME));
3445
rli->cur_log_fd = -1;
3447
if (relay_log_purge)
3450
purge_first_log will properly set up relay log coordinates in rli.
3451
If the group's coordinates are equal to the event's coordinates
3452
(i.e. the relay log was not rotated in the middle of a group),
3453
we can purge this relay log too.
3454
We do uint64_t and string comparisons, this may be slow but
3455
- purging the last relay log is nice (it can save 1GB of disk), so we
3456
like to detect the case where we can do it, and given this,
3457
- I see no better detection method
3458
- purge_first_log is not called that often
3460
if (rli->relay_log.purge_first_log
3462
rli->group_relay_log_pos == rli->event_relay_log_pos
3463
&& !strcmp(rli->group_relay_log_name.c_str(), rli->event_relay_log_name.c_str())))
3465
errmsg = "Error purging processed logs";
3472
If hot_log is set, then we already have a lock on
3473
LOCK_log. If not, we have to get the lock.
3475
According to Sasha, the only time this code will ever be executed
3476
is if we are recovering from a bug.
3478
if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
3480
errmsg = "error switching to the next log";
3483
rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3484
rli->event_relay_log_name.assign(rli->linfo.log_file_name);
3485
flush_relay_log_info(rli);
3489
Now we want to open this next log. To know if it's a hot log (the one
3490
being written by the I/O thread now) or a cold log, we can use
3491
is_active(); if it is hot, we use the I/O cache; if it's cold we open
3492
the file normally. But if is_active() reports that the log is hot, this
3493
may change between the test and the consequence of the test. So we may
3494
open the I/O cache whereas the log is now cold, which is nonsense.
3495
To guard against this, we need to have LOCK_log.
3498
if (!hot_log) /* if hot_log, we already have this mutex */
3499
pthread_mutex_lock(log_lock);
3500
if (rli->relay_log.is_active(rli->linfo.log_file_name))
3503
if (global_system_variables.log_warnings)
3504
sql_print_information(_("next log '%s' is currently active"),
3505
rli->linfo.log_file_name);
3507
rli->cur_log= cur_log= rli->relay_log.get_log_file();
3508
rli->cur_log_old_open_count= rli->relay_log.get_open_count();
3509
assert(rli->cur_log_fd == -1);
3512
Read pointer has to be at the start since we are the only
3514
We must keep the LOCK_log to read the 4 first bytes, as this is a hot
3515
log (same as when we call read_log_event() above: for a hot log we
3518
if (check_binlog_magic(cur_log,&errmsg))
3520
if (!hot_log) pthread_mutex_unlock(log_lock);
3523
if (!hot_log) pthread_mutex_unlock(log_lock);
3526
if (!hot_log) pthread_mutex_unlock(log_lock);
3528
if we get here, the log was not hot, so we will have to open it
3529
ourselves. We are sure that the log is still not hot now (a log can get
3530
from hot to cold, but not from cold to hot). No need for LOCK_log.
3533
if (global_system_variables.log_warnings)
3534
sql_print_information(_("next log '%s' is not active"),
3535
rli->linfo.log_file_name);
3537
// open_binlog() will check the magic header
3538
if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
3545
Read failed with a non-EOF error.
3546
TODO: come up with something better to handle this error
3549
pthread_mutex_unlock(log_lock);
3550
sql_print_error(_("Slave SQL thread: I/O error reading "
3551
"event(errno: %d cur_log->error: %d)"),
3552
my_errno,cur_log->error);
3553
// set read position to the beginning of the event
3554
my_b_seek(cur_log,rli->event_relay_log_pos);
3555
/* otherwise, we have had a partial read */
3556
errmsg = _("Aborting slave SQL thread because of partial event read");
3557
break; // To end of function
3560
if (!errmsg && global_system_variables.log_warnings)
3562
sql_print_information(_("Error reading relay log event: %s"),
3563
_("slave SQL thread was killed"));
3569
sql_print_error(_("Error reading relay log event: %s"), errmsg);
3574
Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
3575
because of size is simpler because when we do it we already have all relevant
3576
locks; here we don't, so this function is mainly taking locks).
3577
Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
3581
void rotate_relay_log(Master_info* mi)
3583
Relay_log_info* rli= &mi->rli;
3585
/* We don't lock rli->run_lock. This would lead to deadlocks. */
3586
pthread_mutex_lock(&mi->run_lock);
3589
We need to test inited because otherwise, new_file() will attempt to lock
3590
LOCK_log, which may not be inited (if we're not a slave).
3597
/* If the relay log is closed, new_file() will do nothing. */
3598
rli->relay_log.new_file();
3601
We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
3602
be counted, so imagine a succession of FLUSH LOGS and assume the slave
3603
threads are started:
3604
relay_log_space decreases by the size of the deleted relay log, but does
3605
not increase, so flush-after-flush we may become negative, which is wrong.
3606
Even if this will be corrected as soon as a query is replicated on the
3607
slave (because the I/O thread will then call harvest_bytes_written() which
3608
will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
3609
output in SHOW SLAVE STATUS meanwhile. So we harvest now.
3610
If the log is closed, then this will just harvest the last writes, probably
3611
0 as they probably have been harvested.
3613
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3615
pthread_mutex_unlock(&mi->run_lock);
3621
Detects, based on master's version (as found in the relay log), if master
3623
@param rli Relay_log_info which tells the master's version
3624
@param bug_id Number of the bug as found in bugs.mysql.com
3625
@param report bool report error message, default TRUE
3626
@return true if master has the bug, FALSE if it does not.
3628
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
3630
struct st_version_range_for_one_bug {
3632
const unsigned char introduced_in[3]; // first version with bug
3633
const unsigned char fixed_in[3]; // first version with fix
3635
static struct st_version_range_for_one_bug versions_for_all_bugs[]=
3637
{24432, { 5, 0, 24 }, { 5, 0, 38 } },
3638
{24432, { 5, 1, 12 }, { 5, 1, 17 } },
3639
{33029, { 5, 0, 0 }, { 5, 0, 58 } },
3640
{33029, { 5, 1, 0 }, { 5, 1, 12 } },
3642
const unsigned char *master_ver=
3643
rli->relay_log.description_event_for_exec->server_version_split;
3645
assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3648
i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3650
const unsigned char *introduced_in= versions_for_all_bugs[i].introduced_in,
3651
*fixed_in= versions_for_all_bugs[i].fixed_in;
3652
if ((versions_for_all_bugs[i].bug_id == bug_id) &&
3653
(memcmp(introduced_in, master_ver, 3) <= 0) &&
3654
(memcmp(fixed_in, master_ver, 3) > 0))
3659
// a short message for SHOW SLAVE STATUS (message length constraints)
3660
my_printf_error(ER_UNKNOWN_ERROR,
3661
_("master may suffer from"
3662
" http://bugs.mysql.com/bug.php?id=%u"
3663
" so slave stops; check error log on slave"
3664
" for more info"), MYF(0), bug_id);
3665
// a verbose message for the error log
3666
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
3667
_("According to the master's version ('%s'),"
3668
" it is probable that master suffers from this bug:"
3669
" http://bugs.mysql.com/bug.php?id=%u"
3670
" and thus replicating the current binary log event"
3671
" may make the slave's data become different from the"
3673
" To take no risk, slave refuses to replicate"
3674
" this event and stops."
3675
" We recommend that all updates be stopped on the"
3676
" master and slave, that the data of both be"
3677
" manually synchronized,"
3678
" that master's binary logs be deleted,"
3679
" that master be upgraded to a version at least"
3680
" equal to '%d.%d.%d'. Then replication can be"
3682
rli->relay_log.description_event_for_exec->server_version,
3684
fixed_in[0], fixed_in[1], fixed_in[2]);
3692
BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
3693
exclusive, if one statement in a SP generated AUTO_INCREMENT value
3694
by the top statement, all statements after it would be considered
3695
generated AUTO_INCREMENT value by the top statement, and a
3696
erroneous INSERT_ID value might be associated with these statement,
3697
which could cause duplicate entry error and stop the slave.
3699
Detect buggy master to work around.
3701
bool rpl_master_erroneous_autoinc(Session *session)
3703
if (active_mi && active_mi->rli.sql_session == session)
3705
Relay_log_info *rli= &active_mi->rli;
3706
return rpl_master_has_bug(rli, 33029, false);
3711
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
3712
template class I_List_iterator<i_string>;
3713
template class I_List_iterator<i_string_pair>;
3717
@} (end of group Replication)