1
/* Copyright (C) 2000-2003 DRIZZLE AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
@addtogroup Replication
23
@brief Code to run the io thread and the sql thread on the
26
#include <drizzled/server_includes.h>
28
#include <storage/myisam/myisam.h>
29
#include <drizzled/replication/mi.h>
30
#include <drizzled/replication/rli.h>
31
#include <drizzled/replication/replication.h>
32
#include <libdrizzle/libdrizzle.h>
33
#include <mysys/hash.h>
34
#include <mysys/thr_alarm.h>
35
#include <libdrizzle/errmsg.h>
36
#include <mysys/mysys_err.h>
37
#include <drizzled/error.h>
38
#include <drizzled/sql_parse.h>
39
#include <drizzled/gettext.h>
41
#include <drizzled/session.h>
42
#include <drizzled/log_event.h>
43
#include <drizzled/item/empty_string.h>
44
#include <drizzled/item/return_int.h>
46
#if TIME_WITH_SYS_TIME
47
# include <sys/time.h>
51
# include <sys/time.h>
57
#include <drizzled/tztime.h>
59
#include <drizzled/replication/tblmap.h>
61
#define MAX_SLAVE_RETRY_PAUSE 5
62
bool use_slave_mask = 0;
63
MY_BITMAP slave_error_mask;
65
typedef bool (*CHECK_KILLED_FUNC)(Session*,void*);
67
char* slave_load_tmpdir = 0;
68
Master_info *active_mi= 0;
69
bool replicate_same_server_id;
70
uint64_t relay_log_space_limit = 0;
73
When slave thread exits, we need to remember the temporary tables so we
74
can re-use them on slave start.
76
TODO: move the vars below under Master_info
79
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
80
int32_t events_till_abort = -1;
82
enum enum_slave_reconnect_actions
84
SLAVE_RECON_ACT_REG= 0,
85
SLAVE_RECON_ACT_DUMP= 1,
86
SLAVE_RECON_ACT_EVENT= 2,
90
enum enum_slave_reconnect_messages
92
SLAVE_RECON_MSG_WAIT= 0,
93
SLAVE_RECON_MSG_KILLED_WAITING= 1,
94
SLAVE_RECON_MSG_AFTER= 2,
95
SLAVE_RECON_MSG_FAILED= 3,
96
SLAVE_RECON_MSG_COMMAND= 4,
97
SLAVE_RECON_MSG_KILLED_AFTER= 5,
101
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
104
N_("Waiting to reconnect after a failed registration on master"),
105
N_("Slave I/O thread killed while waiting to reconnect after a "
106
"failed registration on master"),
107
N_("Reconnecting after a failed registration on master"),
108
N_("failed registering on master, reconnecting to try again, "
109
"log '%s' at position %s"),
110
"COM_REGISTER_SLAVE",
111
N_("Slave I/O thread killed during or after reconnect")
114
N_("Waiting to reconnect after a failed binlog dump request"),
115
N_("Slave I/O thread killed while retrying master dump"),
116
N_("Reconnecting after a failed binlog dump request"),
117
N_("failed dump request, reconnecting to try again, "
118
"log '%s' at position %s"),
120
N_("Slave I/O thread killed during or after reconnect")
123
N_("Waiting to reconnect after a failed master event read"),
124
N_("Slave I/O thread killed while waiting to reconnect "
125
"after a failed read"),
126
N_("Reconnecting after a failed master event read"),
127
N_("Slave I/O thread: Failed reading log event, "
128
"reconnecting to retry, log '%s' at position %s"),
130
N_("Slave I/O thread killed during or after a "
131
"reconnect done to recover from failed read")
136
typedef enum { SLAVE_Session_IO, SLAVE_Session_SQL} SLAVE_Session_TYPE;
138
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
139
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
140
static bool wait_for_relay_log_space(Relay_log_info* rli);
141
static inline bool io_slave_killed(Session* session,Master_info* mi);
142
static inline bool sql_slave_killed(Session* session,Relay_log_info* rli);
143
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type);
144
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi);
145
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
146
bool suppress_warnings);
147
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
148
bool reconnect, bool suppress_warnings);
149
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
150
void* thread_killed_arg);
151
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
152
static Log_event* next_event(Relay_log_info* rli);
153
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
154
static int32_t terminate_slave_thread(Session *session,
155
pthread_mutex_t* term_lock,
156
pthread_cond_t* term_cond,
157
volatile uint32_t *slave_running,
159
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info);
162
Find out which replications threads are running
166
mask Return value here
167
mi master_info for slave
168
inverse If set, returns which threads are not running
171
Get a bit mask for which threads are running so that we can later restart
175
mask If inverse == 0, running threads
176
If inverse == 1, stopped threads
179
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
181
bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
182
register int32_t tmp_mask=0;
185
tmp_mask |= SLAVE_IO;
187
tmp_mask |= SLAVE_SQL;
189
tmp_mask^= (SLAVE_IO | SLAVE_SQL);
199
void lock_slave_threads(Master_info* mi)
201
//TODO: see if we can do this without dual mutex
202
pthread_mutex_lock(&mi->run_lock);
203
pthread_mutex_lock(&mi->rli.run_lock);
209
unlock_slave_threads()
212
void unlock_slave_threads(Master_info* mi)
214
//TODO: see if we can do this without dual mutex
215
pthread_mutex_unlock(&mi->rli.run_lock);
216
pthread_mutex_unlock(&mi->run_lock);
221
/* Initialize slave structures */
226
This is called when mysqld starts. Before client connections are
227
accepted. However bootstrap may conflict with us if it does START SLAVE.
228
So it's safer to take the lock.
230
pthread_mutex_lock(&LOCK_active_mi);
232
TODO: re-write this to interate through the list of files for multi-master
234
active_mi= new Master_info;
237
If master_host is not specified, try to read it from the master_info file.
238
If master_host is specified, create the master_info file if it doesn't
243
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate memory for the master info structure"));
247
if (active_mi->init_master_info(master_info_file, relay_log_info_file, (SLAVE_IO | SLAVE_SQL)))
249
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to initialize the master info structure"));
253
/* If server id is not set, start_slave_thread() will say it */
255
if (active_mi->host[0] && !opt_skip_slave_start)
257
if (start_slave_threads(1 /* need mutex */,
258
0 /* no wait for start*/,
262
SLAVE_IO | SLAVE_SQL))
264
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to create slave threads"));
268
pthread_mutex_unlock(&LOCK_active_mi);
272
pthread_mutex_unlock(&LOCK_active_mi);
278
Init function to set up array for errors that should be skipped for slave
281
init_slave_skip_errors()
282
arg List of errors numbers to skip, separated with ','
285
Called from get_options() in mysqld.cc on start-up
288
void init_slave_skip_errors(const char* arg)
292
if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
294
fprintf(stderr, "Badly out of memory, please check your system status\n");
298
for (;my_isspace(system_charset_info,*arg);++arg)
300
if (!my_strnncoll(system_charset_info,(unsigned char*)arg,4,(const unsigned char*)"all",4))
302
bitmap_set_all(&slave_error_mask);
308
if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
310
if (err_code < MAX_SLAVE_ERROR)
311
bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
312
while (!my_isdigit(system_charset_info,*p) && *p)
319
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
322
return(0); /* successfully do nothing */
323
int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
324
pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
326
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
329
if ((error=terminate_slave_thread(mi->io_session,io_lock,
336
if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
338
mi->rli.abort_slave=1;
339
if ((error=terminate_slave_thread(mi->rli.sql_session,sql_lock,
341
&mi->rli.slave_running,
351
Wait for a slave thread to terminate.
353
This function is called after requesting the thread to terminate
354
(by setting @c abort_slave member of @c Relay_log_info or @c
355
Master_info structure to 1). Termination of the thread is
356
controlled with the the predicate <code>*slave_running</code>.
358
Function will acquire @c term_lock before waiting on the condition
359
unless @c skip_lock is true in which case the mutex should be owned
360
by the caller of this function and will remain acquired after
361
return from the function.
364
Associated lock to use when waiting for @c term_cond
367
Condition that is signalled when the thread has terminated
370
Pointer to predicate to check for slave thread termination
373
If @c true the lock will not be acquired before waiting on
374
the condition. In this case, it is assumed that the calling
375
function acquires the lock before calling this function.
380
terminate_slave_thread(Session *session,
381
pthread_mutex_t* term_lock,
382
pthread_cond_t* term_cond,
383
volatile uint32_t *slave_running,
389
pthread_mutex_lock(term_lock);
391
safe_mutex_assert_owner(term_lock);
396
pthread_mutex_unlock(term_lock);
397
return(ER_SLAVE_NOT_RUNNING);
399
assert(session != 0);
400
Session_CHECK_SENTRY(session);
403
Is is critical to test if the slave is running. Otherwise, we might
404
be referening freed memory trying to kick it
407
while (*slave_running) // Should always be true
409
pthread_mutex_lock(&session->LOCK_delete);
410
#ifndef DONT_USE_THR_ALARM
412
Error codes from pthread_kill are:
413
EINVAL: invalid signal number (can't happen)
414
ESRCH: thread already killed (can happen, should be ignored)
416
int32_t err= pthread_kill(session->real_id, thr_client_alarm);
417
assert(err != EINVAL);
419
session->awake(Session::NOT_KILLED);
420
pthread_mutex_unlock(&session->LOCK_delete);
423
There is a small chance that slave thread might miss the first
424
alarm. To protect againts it, resend the signal until it reacts
426
struct timespec abstime;
427
set_timespec(abstime,2);
428
error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
429
assert(error == ETIMEDOUT || error == 0);
432
assert(*slave_running == 0);
435
pthread_mutex_unlock(term_lock);
440
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
441
pthread_mutex_t *cond_lock,
442
pthread_cond_t *start_cond,
443
volatile uint32_t *slave_running,
444
volatile uint32_t *slave_run_id,
454
pthread_mutex_lock(start_lock);
458
pthread_cond_broadcast(start_cond);
460
pthread_mutex_unlock(start_lock);
461
errmsg_printf(ERRMSG_LVL_ERROR, _("Server id not set, will not start slave"));
462
return(ER_BAD_SLAVE);
468
pthread_cond_broadcast(start_cond);
470
pthread_mutex_unlock(start_lock);
471
return(ER_SLAVE_MUST_STOP);
473
start_id= *slave_run_id;
476
struct sched_param tmp_sched_param;
478
memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
479
tmp_sched_param.sched_priority= CONNECT_PRIOR;
480
(void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
482
if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
485
pthread_mutex_unlock(start_lock);
486
return(ER_SLAVE_THREAD);
488
if (start_cond && cond_lock) // caller has cond_lock
490
Session* session = current_session;
491
while (start_id == *slave_run_id)
493
const char* old_msg = session->enter_cond(start_cond,cond_lock,
494
"Waiting for slave thread to start");
495
pthread_cond_wait(start_cond,cond_lock);
496
session->exit_cond(old_msg);
497
pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
499
return(session->killed_errno());
503
pthread_mutex_unlock(start_lock);
509
start_slave_threads()
512
SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
513
sense to do that for starting a slave--we always care if it actually
514
started the threads that were not previously running
517
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
518
Master_info* mi, const char*, const char*,
521
pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
522
pthread_cond_t* cond_io=0,*cond_sql=0;
525
if (need_slave_mutex)
527
lock_io = &mi->run_lock;
528
lock_sql = &mi->rli.run_lock;
532
cond_io = &mi->start_cond;
533
cond_sql = &mi->rli.start_cond;
534
lock_cond_io = &mi->run_lock;
535
lock_cond_sql = &mi->rli.run_lock;
538
if (thread_mask & SLAVE_IO)
539
error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
541
&mi->slave_running, &mi->slave_run_id,
542
mi, 1); //high priority, to read the most possible
543
if (!error && (thread_mask & SLAVE_SQL))
545
error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
547
&mi->rli.slave_running, &mi->rli.slave_run_id,
550
terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
557
static int32_t end_slave_on_walk(Master_info* mi, unsigned char* /*unused*/)
566
Free all resources used by slave
575
This is called when the server terminates, in close_connections().
576
It terminates slave threads. However, some CHANGE MASTER etc may still be
577
running presently. If a START SLAVE was in progress, the mutex lock below
578
will make us wait until slave threads have started, and START SLAVE
579
returns, then we terminate them here.
581
pthread_mutex_lock(&LOCK_active_mi);
585
TODO: replace the line below with
586
list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
587
once multi-master code is ready.
589
terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
590
active_mi->end_master_info();
594
pthread_mutex_unlock(&LOCK_active_mi);
599
static bool io_slave_killed(Session* session, Master_info* mi)
601
assert(mi->io_session == session);
602
assert(mi->slave_running); // tracking buffer overrun
603
return(mi->abort_slave || abort_loop || session->killed);
607
static bool sql_slave_killed(Session* session, Relay_log_info* rli)
609
assert(rli->sql_session == session);
610
assert(rli->slave_running == 1);// tracking buffer overrun
611
if (abort_loop || session->killed || rli->abort_slave)
614
If we are in an unsafe situation (stopping could corrupt replication),
615
we give one minute to the slave SQL thread of grace before really
616
terminating, in the hope that it will be able to read more events and
617
the unsafe situation will soon be left. Note that this one minute starts
618
from the last time anything happened in the slave SQL thread. So it's
619
really one minute of idleness, we don't timeout if the slave SQL thread
622
if (rli->last_event_start_time == 0)
624
if (difftime(time(NULL), rli->last_event_start_time) > 60)
626
rli->report(ERROR_LEVEL, 0,
627
_("SQL thread had to stop in an unsafe situation, in "
628
"the middle of applying updates to a "
629
"non-transactional table without any primary key. "
630
"There is a risk of duplicate updates when the slave "
631
"SQL thread is restarted. Please check your tables' "
632
"contents after restart."));
641
skip_load_data_infile()
644
This is used to tell a 3.23 master to break send_file()
647
void skip_load_data_infile(NET *net)
649
(void)net_request_file(net, "/dev/null");
650
(void)my_net_read(net); // discard response
651
(void)net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0); // ok
656
bool net_request_file(NET* net, const char* fname)
658
return(net_write_command(net, 251, (unsigned char*) fname, strlen(fname),
659
(unsigned char*) "", 0));
663
From other comments and tests in code, it looks like
664
sometimes Query_log_event and Load_log_event can have db == 0
665
(see rewrite_db() above for example)
666
(cases where this happens are unclear; it may be when the master is 3.23).
669
const char *print_slave_db_safe(const char* db)
671
return((db ? db : ""));
674
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
675
const char *default_val)
679
if ((length=my_b_gets(f,var, max_size)))
681
char* last_p = var + length -1;
683
*last_p = 0; // if we stopped on newline, kill it
687
If we truncated a line or stopped on last char, remove all chars
688
up to and including newline.
691
while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
695
else if (default_val)
697
strncpy(var, default_val, max_size-1);
704
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
709
if (my_b_gets(f, buf, sizeof(buf)))
714
else if (default_val)
722
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
727
if (my_b_gets(f, buf, sizeof(buf)))
729
if (sscanf(buf, "%f", var) != 1)
734
else if (default_val != 0.0)
742
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info)
744
if (io_slave_killed(session, mi))
746
if (info && global_system_variables.log_warnings)
747
errmsg_printf(ERRMSG_LVL_INFO, "%s",info);
755
Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
756
relying on the binlog's version. This is not perfect: imagine an upgrade
757
of the master without waiting that all slaves are in sync with the master;
758
then a slave could be fooled about the binlog's format. This is what happens
759
when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
760
slaves are fooled. So we do this only to distinguish between 3.23 and more
761
recent masters (it's too late to change things for 3.23).
768
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
771
String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
772
char err_buff[MAX_SLAVE_ERRMSG];
773
const char* errmsg= 0;
775
DRIZZLE_RES *master_res= 0;
776
DRIZZLE_ROW master_row;
780
Free old description_event_for_queue (that is needed if we are in
783
delete mi->rli.relay_log.description_event_for_queue;
784
mi->rli.relay_log.description_event_for_queue= 0;
786
if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
788
errmsg = _("Master reported unrecognized DRIZZLE version");
789
err_code= ER_SLAVE_FATAL_ERROR;
790
sprintf(err_buff, ER(err_code), errmsg);
791
err_msg.append(err_buff);
796
Note the following switch will bug when we have DRIZZLE branch 30 ;)
798
switch (*drizzle->server_version)
803
errmsg = _("Master reported unrecognized DRIZZLE version");
804
err_code= ER_SLAVE_FATAL_ERROR;
805
sprintf(err_buff, ER(err_code), errmsg);
806
err_msg.append(err_buff);
809
mi->rli.relay_log.description_event_for_queue= new
810
Format_description_log_event(1, drizzle->server_version);
813
mi->rli.relay_log.description_event_for_queue= new
814
Format_description_log_event(3, drizzle->server_version);
818
Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
819
take the early steps (like tests for "is this a 3.23 master") which we
820
have to take before we receive the real master's Format_desc which will
821
override this one. Note that the Format_desc we create below is garbage
822
(it has the format of the *slave*); it's only good to help know if the
823
master is 3.23, 4.0, etc.
825
mi->rli.relay_log.description_event_for_queue= new
826
Format_description_log_event(4, drizzle->server_version);
832
This does not mean that a 5.0 slave will be able to read a 6.0 master; but
833
as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
834
can't read a 6.0 master, this will show up when the slave can't read some
835
events sent by the master, and there will be error messages.
838
if (err_msg.length() != 0)
841
/* as we are here, we tried to allocate the event */
842
if (!mi->rli.relay_log.description_event_for_queue)
844
errmsg= _("default Format_description_log_event");
845
err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
846
sprintf(err_buff, ER(err_code), errmsg);
847
err_msg.append(err_buff);
852
Compare the master and slave's clock. Do not die if master's clock is
853
unavailable (very old master not supporting UNIX_TIMESTAMP()?).
856
if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
857
(master_res= drizzle_store_result(drizzle)) &&
858
(master_row= drizzle_fetch_row(master_res)))
860
mi->clock_diff_with_master=
861
(long) (time(NULL) - strtoul(master_row[0], 0, 10));
863
else if (!check_io_slave_killed(mi->io_session, mi, NULL))
865
mi->clock_diff_with_master= 0; /* The "most sensible" value */
866
errmsg_printf(ERRMSG_LVL_WARN, _("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
867
"do not trust column Seconds_Behind_Master of SHOW "
868
"SLAVE STATUS. Error: %s (%d)"),
869
drizzle_error(drizzle), drizzle_errno(drizzle));
872
drizzle_free_result(master_res);
875
Check that the master's server id and ours are different. Because if they
876
are equal (which can result from a simple copy of master's datadir to slave,
877
thus copying some drizzle.cnf), replication will work but all events will be
879
Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
881
Note: we could have put a @@SERVER_ID in the previous SELECT
882
UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
884
if (!drizzle_real_query(drizzle,
885
STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
886
(master_res= drizzle_store_result(drizzle)))
888
if ((master_row= drizzle_fetch_row(master_res)) &&
889
(::server_id == strtoul(master_row[1], 0, 10)) &&
890
!mi->rli.replicate_same_server_id)
893
_("The slave I/O thread stops because master and slave have equal "
894
"DRIZZLE server ids; these ids must be different "
895
"for replication to work (or "
896
"the --replicate-same-server-id option must be used "
897
"on slave but this does "
898
"not always make sense; please check the manual before using it).");
899
err_code= ER_SLAVE_FATAL_ERROR;
900
sprintf(err_buff, ER(err_code), errmsg);
901
err_msg.append(err_buff);
903
drizzle_free_result(master_res);
909
Check that the master's global character_set_server and ours are the same.
910
Not fatal if query fails (old master?).
911
Note that we don't check for equality of global character_set_client and
912
collation_connection (neither do we prevent their setting in
913
set_var.cc). That's because from what I (Guilhem) have tested, the global
914
values of these 2 are never used (new connections don't use them).
915
We don't test equality of global collation_database either as it's is
916
going to be deprecated (made read-only) in 4.1 very soon.
917
The test is only relevant if master < 5.0.3 (we'll test only if it's older
918
than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
919
charset info in each binlog event.
920
We don't do it for 3.23 because masters <3.23.50 hang on
921
SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
922
test only if master is 4.x.
925
/* redundant with rest of code but safer against later additions */
926
if (*drizzle->server_version == '3')
929
if ((*drizzle->server_version == '4') &&
930
!drizzle_real_query(drizzle,
931
STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
932
(master_res= drizzle_store_result(drizzle)))
934
if ((master_row= drizzle_fetch_row(master_res)) &&
935
strcmp(master_row[0], global_system_variables.collation_server->name))
938
_("The slave I/O thread stops because master and slave have"
939
" different values for the COLLATION_SERVER global variable."
940
" The values must be equal for replication to work");
941
err_code= ER_SLAVE_FATAL_ERROR;
942
sprintf(err_buff, ER(err_code), errmsg);
943
err_msg.append(err_buff);
945
drizzle_free_result(master_res);
951
Perform analogous check for time zone. Theoretically we also should
952
perform check here to verify that SYSTEM time zones are the same on
953
slave and master, but we can't rely on value of @@system_time_zone
954
variable (it is time zone abbreviation) since it determined at start
955
time and so could differ for slave and master even if they are really
956
in the same system time zone. So we are omiting this check and just
957
relying on documentation. Also according to Monty there are many users
958
who are using replication between servers in various time zones. Hence
959
such check will broke everything for them. (And now everything will
960
work for them because by default both their master and slave will have
962
This check is only necessary for 4.x masters (and < 5.0.4 masters but
965
if ((*drizzle->server_version == '4') &&
966
!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
967
(master_res= drizzle_store_result(drizzle)))
969
if ((master_row= drizzle_fetch_row(master_res)) &&
970
strcmp(master_row[0],
971
global_system_variables.time_zone->get_name()->ptr()))
974
_("The slave I/O thread stops because master and slave have"
975
" different values for the TIME_ZONE global variable."
976
" The values must be equal for replication to work");
977
err_code= ER_SLAVE_FATAL_ERROR;
978
sprintf(err_buff, ER(err_code), errmsg);
979
err_msg.append(err_buff);
981
drizzle_free_result(master_res);
987
if (mi->heartbeat_period != 0.0)
990
const char query_format[]= "SET @master_heartbeat_period= %s";
991
char query[sizeof(query_format) - 2 + sizeof(llbuf)];
993
the period is an uint64_t of nano-secs.
995
llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
996
sprintf(query, query_format, llbuf);
998
if (drizzle_real_query(drizzle, query, strlen(query))
999
&& !check_io_slave_killed(mi->io_session, mi, NULL))
1001
err_msg.append("The slave I/O thread stops because querying master with '");
1002
err_msg.append(query);
1003
err_msg.append("' failed;");
1004
err_msg.append(" error: ");
1005
err_code= drizzle_errno(drizzle);
1006
err_msg.qs_append(err_code);
1007
err_msg.append(" '");
1008
err_msg.append(drizzle_error(drizzle));
1009
err_msg.append("'");
1010
drizzle_free_result(drizzle_store_result(drizzle));
1013
drizzle_free_result(drizzle_store_result(drizzle));
1017
if (err_msg.length() != 0)
1019
errmsg_printf(ERRMSG_LVL_ERROR, "%s",err_msg.ptr());
1020
assert(err_code != 0);
1021
mi->report(ERROR_LEVEL, err_code, "%s",err_msg.ptr());
1029
static bool wait_for_relay_log_space(Relay_log_info* rli)
1031
bool slave_killed=0;
1032
Master_info* mi = rli->mi;
1033
const char *save_proc_info;
1034
Session* session = mi->io_session;
1036
pthread_mutex_lock(&rli->log_space_lock);
1037
save_proc_info= session->enter_cond(&rli->log_space_cond,
1038
&rli->log_space_lock,
1039
_("Waiting for the slave SQL thread "
1040
"to free enough relay log space"));
1041
while (rli->log_space_limit < rli->log_space_total &&
1042
!(slave_killed=io_slave_killed(session,mi)) &&
1043
!rli->ignore_log_space_limit)
1044
pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1045
session->exit_cond(save_proc_info);
1046
return(slave_killed);
1051
Builds a Rotate from the ignored events' info and writes it to relay log.
1054
write_ignored_events_info_to_relay_log()
1055
session pointer to I/O thread's session
1059
Slave I/O thread, going to die, must leave a durable trace of the
1060
ignored events' end position for the use of the slave SQL thread, by
1061
calling this function. Only that thread can call it (see assertion).
1063
static void write_ignored_events_info_to_relay_log(Session *session,
1066
Relay_log_info *rli= &mi->rli;
1067
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1069
assert(session == mi->io_session);
1070
pthread_mutex_lock(log_lock);
1071
if (rli->ign_master_log_name_end[0])
1073
Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1074
0, rli->ign_master_log_pos_end,
1075
Rotate_log_event::DUP_NAME);
1076
rli->ign_master_log_name_end[0]= 0;
1077
/* can unlock before writing as slave SQL session will soon see our Rotate */
1078
pthread_mutex_unlock(log_lock);
1079
if (likely((bool)ev))
1081
ev->server_id= 0; // don't be ignored by slave SQL thread
1082
if (unlikely(rli->relay_log.append(ev)))
1083
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
1084
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
1085
_("failed to write a Rotate event"
1086
" to the relay log, SHOW SLAVE STATUS may be"
1088
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1090
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to flush master info file"));
1094
mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
1095
ER(ER_SLAVE_CREATE_EVENT_FAILURE),
1096
_("Rotate_event (out of memory?),"
1097
" SHOW SLAVE STATUS may be inaccurate"));
1100
pthread_mutex_unlock(log_lock);
1105
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
1106
bool *suppress_warnings)
1108
unsigned char buf[1024], *pos= buf;
1109
uint32_t report_host_len, report_user_len=0, report_password_len=0;
1111
*suppress_warnings= false;
1114
report_host_len= strlen(report_host);
1115
/* 30 is a good safety margin */
1116
if (report_host_len + report_user_len + report_password_len + 30 >
1118
return(0); // safety
1120
int4store(pos, server_id); pos+= 4;
1121
pos= net_store_data(pos, (unsigned char*) report_host, report_host_len);
1122
pos= net_store_data(pos, NULL, report_user_len);
1123
pos= net_store_data(pos, NULL, report_password_len);
1124
int2store(pos, (uint16_t) report_port); pos+= 2;
1125
int4store(pos, 0); pos+= 4;
1126
/* The master will fill in master_id */
1127
int4store(pos, 0); pos+= 4;
1129
if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
1131
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1133
*suppress_warnings= true; // Suppress reconnect warning
1135
else if (!check_io_slave_killed(mi->io_session, mi, NULL))
1138
snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle),
1139
drizzle_errno(drizzle));
1140
mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1141
ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1149
bool show_master_info(Session* session, Master_info* mi)
1151
// TODO: fix this for multi-master
1152
List<Item> field_list;
1153
Protocol *protocol= session->protocol;
1155
field_list.push_back(new Item_empty_string("Slave_IO_State",
1157
field_list.push_back(new Item_empty_string("Master_Host",
1159
field_list.push_back(new Item_empty_string("Master_User",
1161
field_list.push_back(new Item_return_int("Master_Port", 7,
1162
DRIZZLE_TYPE_LONG));
1163
field_list.push_back(new Item_return_int("Connect_Retry", 10,
1164
DRIZZLE_TYPE_LONG));
1165
field_list.push_back(new Item_empty_string("Master_Log_File",
1167
field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
1168
DRIZZLE_TYPE_LONGLONG));
1169
field_list.push_back(new Item_empty_string("Relay_Log_File",
1171
field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
1172
DRIZZLE_TYPE_LONGLONG));
1173
field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
1175
field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
1176
field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1177
field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
1178
field_list.push_back(new Item_empty_string("Last_Error", 20));
1179
field_list.push_back(new Item_return_int("Skip_Counter", 10,
1180
DRIZZLE_TYPE_LONG));
1181
field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
1182
DRIZZLE_TYPE_LONGLONG));
1183
field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
1184
DRIZZLE_TYPE_LONGLONG));
1185
field_list.push_back(new Item_empty_string("Until_Condition", 6));
1186
field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1187
field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1188
DRIZZLE_TYPE_LONGLONG));
1189
field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1190
DRIZZLE_TYPE_LONGLONG));
1191
field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
1192
field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1193
field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
1194
field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
1196
if (protocol->send_fields(&field_list,
1197
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1202
String *packet= &session->packet;
1203
protocol->prepare_for_resend();
1206
slave_running can be accessed without run_lock but not other
1207
non-volotile members like mi->io_session, which is guarded by the mutex.
1209
pthread_mutex_lock(&mi->run_lock);
1210
protocol->store(mi->io_session ? mi->io_session->get_proc_info() : "", &my_charset_bin);
1211
pthread_mutex_unlock(&mi->run_lock);
1213
pthread_mutex_lock(&mi->data_lock);
1214
pthread_mutex_lock(&mi->rli.data_lock);
1215
protocol->store(mi->getHostname(), &my_charset_bin);
1216
protocol->store(mi->getUsername(), &my_charset_bin);
1217
protocol->store((uint32_t) mi->getPort());
1218
protocol->store(mi->getConnectionRetry());
1219
protocol->store(mi->getLogName(), &my_charset_bin);
1220
protocol->store((uint64_t) mi->getLogPosition());
1221
protocol->store(mi->rli.group_relay_log_name.c_str() +
1222
dirname_length(mi->rli.group_relay_log_name.c_str()),
1224
protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1225
protocol->store(mi->rli.group_master_log_name.c_str(), &my_charset_bin);
1226
protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1227
"Yes" : "No", &my_charset_bin);
1228
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1230
protocol->store(mi->rli.last_error().number);
1231
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1232
protocol->store((uint32_t) mi->rli.slave_skip_counter);
1233
protocol->store((uint64_t) mi->rli.group_master_log_pos);
1234
protocol->store((uint64_t) mi->rli.log_space_total);
1237
mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
1238
( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
1239
"Relay"), &my_charset_bin);
1240
protocol->store(mi->rli.until_log_name, &my_charset_bin);
1241
protocol->store((uint64_t) mi->rli.until_log_pos);
1244
Seconds_Behind_Master: if SQL thread is running and I/O thread is
1245
connected, we can compute it otherwise show NULL (i.e. unknown).
1247
if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
1248
mi->rli.slave_running)
1250
long time_diff= ((long)(time(NULL) - mi->rli.last_master_timestamp)
1251
- mi->clock_diff_with_master);
1253
Apparently on some systems time_diff can be <0. Here are possible
1254
reasons related to MySQL:
1255
- the master is itself a slave of another master whose time is ahead.
1256
- somebody used an explicit SET TIMESTAMP on the master.
1257
Possible reason related to granularity-to-second of time functions
1258
(nothing to do with MySQL), which can explain a value of -1:
1259
assume the master's and slave's time are perfectly synchronized, and
1260
that at slave's connection time, when the master's timestamp is read,
1261
it is at the very end of second 1, and (a very short time later) when
1262
the slave's timestamp is read it is at the very beginning of second
1263
2. Then the recorded value for master is 1 and the recorded value for
1264
slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1265
between timestamp of slave and rli->last_master_timestamp is 0
1266
(i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1267
This confuses users, so we don't go below 0: hence the cmax().
1269
last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1270
special marker to say "consider we have caught up".
1272
protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1273
cmax((long)0, time_diff) : 0));
1277
protocol->store_null();
1281
protocol->store(mi->last_error().number);
1283
protocol->store(mi->last_error().message, &my_charset_bin);
1285
protocol->store(mi->rli.last_error().number);
1287
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1289
pthread_mutex_unlock(&mi->rli.data_lock);
1290
pthread_mutex_unlock(&mi->data_lock);
1292
if (my_net_write(&session->net, (unsigned char*) session->packet.ptr(), packet->length()))
1300
void set_slave_thread_options(Session* session)
1303
It's nonsense to constrain the slave threads with max_join_size; if a
1304
query succeeded on master, we HAVE to execute it. So set
1305
OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
1306
(and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
1307
SELECT examining more than 4 billion rows would still fail (yes, because
1308
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1309
only for client threads.
1311
uint64_t options= session->options | OPTION_BIG_SELECTS;
1312
if (opt_log_slave_updates)
1313
options|= OPTION_BIN_LOG;
1315
options&= ~OPTION_BIN_LOG;
1316
session->options= options;
1317
session->variables.completion_type= 0;
1325
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type)
1327
int32_t simulate_error= 0;
1328
session->system_thread = (session_type == SLAVE_Session_SQL) ?
1329
SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1330
session->security_ctx.skip_grants();
1331
my_net_init(&session->net, 0);
1333
Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1334
slave threads, since a replication event can become this much larger
1335
than the corresponding packet (query) sent from client to master.
1337
session->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1338
+ MAX_LOG_EVENT_HEADER; /* note, incr over the global not session var */
1339
session->slave_thread = 1;
1340
set_slave_thread_options(session);
1341
session->client_capabilities = CLIENT_LOCAL_FILES;
1342
pthread_mutex_lock(&LOCK_thread_count);
1343
session->thread_id= session->variables.pseudo_thread_id= thread_id++;
1344
pthread_mutex_unlock(&LOCK_thread_count);
1346
simulate_error|= (1 << SLAVE_Session_IO);
1347
simulate_error|= (1 << SLAVE_Session_SQL);
1348
if (init_thr_lock() || session->store_globals() || simulate_error & (1<< session_type))
1355
if (session_type == SLAVE_Session_SQL)
1356
session->set_proc_info("Waiting for the next event in relay log");
1358
session->set_proc_info("Waiting for master update");
1359
session->version=refresh_version;
1360
session->set_time();
1364
/* Returns non zero on error */
1365
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1366
void* thread_killed_arg)
1369
thr_alarm_t alarmed;
1371
thr_alarm_init(&alarmed);
1372
time_t start_time, end_time;
1374
start_time= time(NULL);
1375
if (start_time == (time_t)-1)
1377
end_time= start_time+sec;
1379
while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1383
The only reason we are asking for alarm is so that
1384
we will be woken up in case of murder, so if we do not get killed,
1385
set the alarm so it goes off after we wake up naturally
1387
thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
1389
thr_end_alarm(&alarmed);
1391
if ((*thread_killed)(session,thread_killed_arg))
1394
start_time= time(NULL);
1395
if (start_time == (time_t)-1)
1402
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
1403
bool *suppress_warnings)
1405
unsigned char buf[FN_REFLEN + 10];
1407
int32_t binlog_flags = 0; // for now
1408
const char* logname = mi->getLogName();
1410
*suppress_warnings= false;
1412
// TODO if big log files: Change next to int8store()
1413
int4store(buf, (uint32_t) mi->getLogPosition());
1414
int2store(buf + 4, binlog_flags);
1415
int4store(buf + 6, server_id);
1416
len = (uint32_t) strlen(logname);
1417
memcpy(buf + 10, logname,len);
1418
if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
1421
Something went wrong, so we will just reconnect and retry later
1422
in the future, we should do a better error analysis, but for
1423
now we just fill up the error log :-)
1425
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1426
*suppress_warnings= true; // Suppress reconnect warning
1428
errmsg_printf(ERRMSG_LVL_ERROR, _("Error on COM_BINLOG_DUMP: %d %s, will retry in %d secs"),
1429
drizzle_errno(drizzle), drizzle_error(drizzle),
1438
Read one event from the master
1442
DRIZZLE DRIZZLE connection
1443
mi Master connection information
1444
suppress_warnings TRUE when a normal net read timeout has caused us to
1445
try a reconnect. We do not want to print anything to
1446
the error log in this case because this a anormal
1447
event in an idle server.
1450
'packet_error' Error
1451
number Length of packet
1454
static uint32_t read_event(DRIZZLE *drizzle,
1456
bool* suppress_warnings)
1460
*suppress_warnings= false;
1462
my_real_read() will time us out
1463
We check if we were told to die, and if not, try reading again
1465
if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
1466
return(packet_error);
1468
len = cli_safe_read(drizzle);
1469
if (len == packet_error || (int32_t) len < 1)
1471
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1474
We are trying a normal reconnect after a read timeout;
1475
we suppress prints to .err file as long as the reconnect
1476
happens without problems
1478
*suppress_warnings= true;
1481
errmsg_printf(ERRMSG_LVL_ERROR, _("Error reading packet from server: %s ( server_errno=%d)"),
1482
drizzle_error(drizzle), drizzle_errno(drizzle));
1483
return(packet_error);
1486
/* Check if eof packet */
1487
if (len < 8 && drizzle->net.read_pos[0] == 254)
1489
errmsg_printf(ERRMSG_LVL_INFO, _("Slave: received end packet from server, apparent "
1490
"master shutdown: %s"),
1491
drizzle_error(drizzle));
1492
return(packet_error);
1499
int32_t check_expected_error(Session*, Relay_log_info const *,
1500
int32_t expected_error)
1502
switch (expected_error) {
1503
case ER_NET_READ_ERROR:
1504
case ER_NET_ERROR_ON_WRITE:
1505
case ER_QUERY_INTERRUPTED:
1506
case ER_SERVER_SHUTDOWN:
1507
case ER_NEW_ABORTING_CONNECTION:
1516
Check if the current error is of temporary nature of not.
1517
Some errors are temporary in nature, such as
1518
ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. Ndb also signals
1519
that the error is temporary by pushing a warning with the error code
1520
ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1522
static int32_t has_temporary_error(Session *session)
1524
if (session->is_fatal_error)
1527
if (session->main_da.is_error())
1529
session->clear_error();
1530
my_error(ER_LOCK_DEADLOCK, MYF(0));
1534
If there is no message in Session, we can't say if it's a temporary
1535
error or not. This is currently the case for Incident_log_event,
1536
which sets no message. Return FALSE.
1538
if (!session->is_error())
1542
Temporary error codes:
1543
currently, InnoDB deadlock detected by InnoDB or lock
1544
wait timeout (innodb_lock_wait_timeout exceeded
1546
if (session->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
1547
session->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
1555
Applies the given event and advances the relay log position.
1557
In essence, this function does:
1560
ev->apply_event(rli);
1561
ev->update_pos(rli);
1564
But it also does some maintainance, such as skipping events if
1565
needed and reporting errors.
1567
If the @c skip flag is set, then it is tested whether the event
1568
should be skipped, by looking at the slave_skip_counter and the
1569
server id. The skip flag should be set when calling this from a
1570
replication thread but not set when executing an explicit BINLOG
1575
@retval 1 Error calling ev->apply_event().
1577
@retval 2 No error calling ev->apply_event(), but error calling
1580
int32_t apply_event_and_update_pos(Log_event* ev, Session* session, Relay_log_info* rli,
1583
int32_t exec_res= 0;
1586
Execute the event to change the database and update the binary
1587
log coordinates, but first we set some data that is needed for
1590
The event will be executed unless it is supposed to be skipped.
1592
Queries originating from this server must be skipped. Low-level
1593
events (Format_description_log_event, Rotate_log_event,
1594
Stop_log_event) from this server must also be skipped. But for
1595
those we don't want to modify 'group_master_log_pos', because
1596
these events did not exist on the master.
1597
Format_description_log_event is not completely skipped.
1599
Skip queries specified by the user in 'slave_skip_counter'. We
1600
can't however skip events that has something to do with the log
1603
Filtering on own server id is extremely important, to ignore
1604
execution of events created by the creation/rotation of the relay
1605
log (remember that now the relay log starts with its Format_desc,
1609
session->server_id = ev->server_id; // use the original server id for logging
1610
session->set_time(); // time the query
1611
session->lex->current_select= 0;
1614
ev->when= time(NULL);
1615
if(ev->when == (time_t)-1)
1619
ev->session = session; // because up to this point, ev->session == 0
1623
int32_t reason= ev->shall_skip(rli);
1624
if (reason == Log_event::EVENT_SKIP_COUNT)
1625
--rli->slave_skip_counter;
1626
pthread_mutex_unlock(&rli->data_lock);
1627
if (reason == Log_event::EVENT_SKIP_NOT)
1628
exec_res= ev->apply_event(rli);
1631
exec_res= ev->apply_event(rli);
1635
int32_t error= ev->update_pos(rli);
1637
The update should not fail, so print an error message and
1638
return an error code.
1640
TODO: Replace this with a decent error message when merged
1641
with BUG#24954 (which adds several new error message).
1646
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
1647
_("It was not possible to update the positions"
1648
" of the relay log information: the slave may"
1649
" be in an inconsistent state."
1650
" Stopped in %s position %s"),
1651
rli->group_relay_log_name.c_str(),
1652
llstr(rli->group_relay_log_pos, buf));
1657
return(exec_res ? 1 : 0);
1662
Top-level function for executing the next event from the relay log.
1664
This function reads the event from the relay log, executes it, and
1665
advances the relay log position. It also handles errors, etc.
1667
This function may fail to apply the event for the following reasons:
1669
- The position specfied by the UNTIL condition of the START SLAVE
1672
- It was not possible to read the event from the log.
1674
- The slave is killed.
1676
- An error occurred when applying the event, and the event has been
1677
tried slave_trans_retries times. If the event has been retried
1678
fewer times, 0 is returned.
1680
- init_master_info or init_relay_log_pos failed. (These are called
1681
if a failure occurs when applying the event.)</li>
1683
- An error occurred when updating the binlog position.
1685
@retval 0 The event was applied.
1687
@retval 1 The event was not applied.
1689
static int32_t exec_relay_log_event(Session* session, Relay_log_info* rli)
1692
We acquire this mutex since we need it for all operations except
1693
event execution. But we will release it in places where we will
1694
wait for something for example inside of next_event().
1696
pthread_mutex_lock(&rli->data_lock);
1698
Log_event * ev = next_event(rli);
1700
assert(rli->sql_session==session);
1702
if (sql_slave_killed(session,rli))
1704
pthread_mutex_unlock(&rli->data_lock);
1713
This tests if the position of the beginning of the current event
1714
hits the UNTIL barrier.
1716
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
1717
rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
1718
rli->group_master_log_pos :
1719
ev->log_pos - ev->data_written))
1722
errmsg_printf(ERRMSG_LVL_INFO, _("Slave SQL thread stopped because it reached its"
1723
" UNTIL position %s"),
1724
llstr(rli->until_pos(), buf));
1726
Setting abort_slave flag because we do not want additional message about
1727
error in query execution to be printed.
1729
rli->abort_slave= 1;
1730
pthread_mutex_unlock(&rli->data_lock);
1734
exec_res= apply_event_and_update_pos(ev, session, rli, true);
1737
Format_description_log_event should not be deleted because it will be
1738
used to read info about the relay log's format; it will be deleted when
1739
the SQL thread does not need it, i.e. when this thread terminates.
1741
if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
1747
update_log_pos failed: this should not happen, so we don't
1753
if (slave_trans_retries)
1755
int32_t temp_err= 0;
1756
if (exec_res && (temp_err= has_temporary_error(session)))
1760
We were in a transaction which has been rolled back because of a
1762
let's seek back to BEGIN log event and retry it all again.
1763
Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
1764
there is no rollback since 5.0.13 (ref: manual).
1765
We have to not only seek but also
1766
a) init_master_info(), to seek back to hot relay log's start for later
1767
(for when we will come back to this hot log after re-processing the
1768
possibly existing old logs where BEGIN is: check_binlog_magic() will
1769
then need the cache to be at position 0 (see comments at beginning of
1770
init_master_info()).
1771
b) init_relay_log_pos(), because the BEGIN may be an older relay log.
1773
if (rli->trans_retries < slave_trans_retries)
1775
if (rli->mi->init_master_info(0, 0, SLAVE_SQL))
1776
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to initialize the master info structure"));
1777
else if (init_relay_log_pos(rli,
1778
rli->group_relay_log_name.c_str(),
1779
rli->group_relay_log_pos,
1781
errmsg_printf(ERRMSG_LVL_ERROR, _("Error initializing relay log position: %s"),
1786
end_trans(session, ROLLBACK);
1787
/* chance for concurrent connection to get more locks */
1788
safe_sleep(session, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1789
(CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1790
pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1791
rli->trans_retries++;
1792
rli->retried_trans++;
1793
pthread_mutex_unlock(&rli->data_lock);
1797
errmsg_printf(ERRMSG_LVL_ERROR, _("Slave SQL thread retried transaction %"PRIu64" time(s) "
1798
"in vain, giving up. Consider raising the value of "
1799
"the slave_transaction_retries variable."),
1800
slave_trans_retries);
1802
else if ((exec_res && !temp_err) ||
1803
(opt_using_transactions &&
1804
rli->group_relay_log_pos == rli->event_relay_log_pos))
1807
Only reset the retry counter if the entire group succeeded
1808
or failed with a non-transient error. On a successful
1809
event, the execution will proceed as usual; in the case of a
1810
non-transient error, the slave will stop with an error.
1812
rli->trans_retries= 0; // restart from fresh
1817
pthread_mutex_unlock(&rli->data_lock);
1818
rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
1819
ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
1820
_("Could not parse relay log event entry. The possible reasons "
1821
"are: the master's binary log is corrupted (you can check this "
1822
"by running 'mysqlbinlog' on the binary log), the slave's "
1823
"relay log is corrupted (you can check this by running "
1824
"'mysqlbinlog' on the relay log), a network problem, or a bug "
1825
"in the master's or slave's DRIZZLE code. If you want to check "
1826
"the master's binary log or slave's relay log, you will be "
1827
"able to know their names by issuing 'SHOW SLAVE STATUS' "
1834
@brief Try to reconnect slave IO thread.
1836
@details Terminates current connection to master, sleeps for
1837
@c mi->connect_retry msecs and initiates new connection with
1838
@c safe_reconnect(). Variable pointed by @c retry_count is increased -
1839
if it exceeds @c master_retry_count then connection is not re-established
1840
and function signals error.
1841
Unless @c suppres_warnings is TRUE, a warning is put in the server error log
1842
when reconnecting. The warning message and messages used to report errors
1843
are taken from @c messages array. In case @c master_retry_count is exceeded,
1844
no messages are added to the log.
1846
@param[in] session Thread context.
1847
@param[in] DRIZZLE DRIZZLE connection.
1848
@param[in] mi Master connection information.
1849
@param[in,out] retry_count Number of attempts to reconnect.
1850
@param[in] suppress_warnings TRUE when a normal net read timeout
1851
has caused to reconnecting.
1852
@param[in] messages Messages to print/log, see
1853
reconnect_messages[] array.
1856
@retval 1 There was an error.
1859
static int32_t try_to_reconnect(Session *session, DRIZZLE *drizzle, Master_info *mi,
1860
uint32_t *retry_count, bool suppress_warnings,
1861
const char *messages[SLAVE_RECON_MSG_MAX])
1863
mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1864
session->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
1865
drizzle_disconnect(drizzle);
1866
if ((*retry_count)++)
1868
if (*retry_count > master_retry_count)
1869
return 1; // Don't retry forever
1870
safe_sleep(session, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1873
if (check_io_slave_killed(session, mi,
1874
_(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1876
session->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
1877
if (!suppress_warnings)
1879
char buf[256], llbuff[22];
1880
snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1881
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
1883
Raise a warining during registering on master/requesting dump.
1884
Log a message reading event.
1886
if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
1888
mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1889
ER(ER_SLAVE_MASTER_COM_FAILURE),
1890
_(messages[SLAVE_RECON_MSG_COMMAND]), buf);
1894
errmsg_printf(ERRMSG_LVL_INFO, "%s",buf);
1897
if (safe_reconnect(session, drizzle, mi, 1) || io_slave_killed(session, mi))
1899
if (global_system_variables.log_warnings)
1900
errmsg_printf(ERRMSG_LVL_INFO, "%s",_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1907
/* Slave I/O Thread entry point */
1909
pthread_handler_t handle_slave_io(void *arg)
1911
Session *session; // needs to be first for thread_stack
1913
Master_info *mi = (Master_info*)arg;
1914
Relay_log_info *rli= &mi->rli;
1916
uint32_t retry_count;
1917
bool suppress_warnings;
1918
uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
1925
pthread_mutex_lock(&mi->run_lock);
1926
/* Inform waiting threads that slave has started */
1929
mi->events_till_disconnect = disconnect_slave_event_count;
1931
session= new Session;
1932
Session_CHECK_SENTRY(session);
1933
mi->io_session = session;
1935
pthread_detach_this_thread();
1936
session->thread_stack= (char*) &session; // remember where our stack is
1937
if (init_slave_thread(session, SLAVE_Session_IO))
1939
pthread_cond_broadcast(&mi->start_cond);
1940
pthread_mutex_unlock(&mi->run_lock);
1941
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed during slave I/O thread initialization"));
1944
pthread_mutex_lock(&LOCK_thread_count);
1945
threads.append(session);
1946
pthread_mutex_unlock(&LOCK_thread_count);
1947
mi->slave_running = 1;
1948
mi->abort_slave = 0;
1949
pthread_mutex_unlock(&mi->run_lock);
1950
pthread_cond_broadcast(&mi->start_cond);
1952
if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
1954
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
1955
ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
1959
session->set_proc_info("Connecting to master");
1960
// we can get killed during safe_connect
1961
if (!safe_connect(session, drizzle, mi))
1963
errmsg_printf(ERRMSG_LVL_INFO, _("Slave I/O thread: connected to master '%s@%s:%d',"
1964
"replication started in log '%s' at position %s"),
1965
mi->getUsername(), mi->getHostname(), mi->getPort(),
1967
llstr(mi->getLogPosition(), llbuff));
1969
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
1970
thread, since a replication event can become this much larger than
1971
the corresponding packet (query) sent from client to master.
1973
drizzle->net.max_packet_size= session->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
1977
errmsg_printf(ERRMSG_LVL_INFO, _("Slave I/O thread killed while connecting to master"));
1983
// TODO: the assignment below should be under mutex (5.0)
1984
mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
1985
session->slave_net = &drizzle->net;
1986
session->set_proc_info("Checking master version");
1987
if (get_master_version_and_clock(drizzle, mi))
1990
if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
1993
Register ourselves with the master.
1995
session->set_proc_info("Registering slave on master");
1996
if (register_slave_on_master(drizzle, mi, &suppress_warnings))
1998
if (!check_io_slave_killed(session, mi, "Slave I/O thread killed "
1999
"while registering slave on master"))
2001
errmsg_printf(ERRMSG_LVL_ERROR, _("Slave I/O thread couldn't register on master"));
2002
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2003
reconnect_messages[SLAVE_RECON_ACT_REG]))
2010
if (!retry_count_reg)
2013
errmsg_printf(ERRMSG_LVL_INFO, _("Forcing to reconnect slave I/O thread"));
2014
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2015
reconnect_messages[SLAVE_RECON_ACT_REG]))
2021
while (!io_slave_killed(session,mi))
2023
session->set_proc_info("Requesting binlog dump");
2024
if (request_dump(drizzle, mi, &suppress_warnings))
2026
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed on request_dump()"));
2027
if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while \
2028
requesting master dump")) ||
2029
try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2030
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2034
if (!retry_count_dump)
2037
errmsg_printf(ERRMSG_LVL_INFO, _("Forcing to reconnect slave I/O thread"));
2038
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2039
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2044
while (!io_slave_killed(session,mi))
2048
We say "waiting" because read_event() will wait if there's nothing to
2049
read. But if there's something to read, it will not wait. The
2050
important thing is to not confuse users by saying "reading" whereas
2051
we're in fact receiving nothing.
2053
session->set_proc_info(_("Waiting for master to send event"));
2054
event_len= read_event(drizzle, mi, &suppress_warnings);
2055
if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while "
2058
if (!retry_count_event)
2060
retry_count_event++;
2061
errmsg_printf(ERRMSG_LVL_INFO, _("Forcing to reconnect slave I/O thread"));
2062
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2063
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2068
if (event_len == packet_error)
2070
uint32_t drizzle_error_number= drizzle_errno(drizzle);
2071
switch (drizzle_error_number) {
2072
case CR_NET_PACKET_TOO_LARGE:
2073
errmsg_printf(ERRMSG_LVL_ERROR, _("Log entry on master is longer than "
2074
"max_allowed_packet (%u) on "
2075
"slave. If the entry is correct, restart the "
2076
"server with a higher value of "
2077
"max_allowed_packet"),
2078
session->variables.max_allowed_packet);
2080
case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2081
errmsg_printf(ERRMSG_LVL_ERROR, ER(drizzle_error_number), drizzle_error_number,
2082
drizzle_error(drizzle));
2084
case EE_OUTOFMEMORY:
2085
case ER_OUTOFMEMORY:
2086
errmsg_printf(ERRMSG_LVL_ERROR,
2087
_("Stopping slave I/O thread due to out-of-memory error from master"));
2090
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2091
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2094
} // if (event_len == packet_error)
2096
retry_count=0; // ok event, reset retry counter
2097
session->set_proc_info(_("Queuing master event to the relay log"));
2098
if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2104
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to flush master info file"));
2108
See if the relay logs take too much space.
2109
We don't lock mi->rli.log_space_lock here; this dirty read saves time
2110
and does not introduce any problem:
2111
- if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
2112
the clean value is 0), then we are reading only one more event as we
2113
should, and we'll block only at the next event. No big deal.
2114
- if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
2115
the clean value is 1), then we are going into wait_for_relay_log_space()
2116
for no reason, but this function will do a clean read, notice the clean
2117
value and exit immediately.
2119
if (rli->log_space_limit && rli->log_space_limit <
2120
rli->log_space_total &&
2121
!rli->ignore_log_space_limit)
2122
if (wait_for_relay_log_space(rli))
2124
errmsg_printf(ERRMSG_LVL_ERROR, _("Slave I/O thread aborted while waiting for "
2125
"relay log space"));
2133
// print the current replication position
2134
errmsg_printf(ERRMSG_LVL_INFO, _("Slave I/O thread exiting, read up to log '%s', "
2136
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
2137
pthread_mutex_lock(&LOCK_thread_count);
2138
session->query = session->db = 0; // extra safety
2139
session->query_length= session->db_length= 0;
2140
pthread_mutex_unlock(&LOCK_thread_count);
2144
Here we need to clear the active VIO before closing the
2145
connection with the master. The reason is that Session::awake()
2146
might be called from terminate_slave_thread() because somebody
2147
issued a STOP SLAVE. If that happends, the close_active_vio()
2148
can be called in the middle of closing the VIO associated with
2149
the 'mysql' object, causing a crash.
2151
drizzle_close(drizzle);
2154
write_ignored_events_info_to_relay_log(session, mi);
2155
session->set_proc_info(_("Waiting for slave mutex on exit"));
2156
pthread_mutex_lock(&mi->run_lock);
2158
/* Forget the relay log's format */
2159
delete mi->rli.relay_log.description_event_for_queue;
2160
mi->rli.relay_log.description_event_for_queue= 0;
2161
assert(session->net.buff != 0);
2162
net_end(&session->net); // destructor will not free it, because net.vio is 0
2163
close_thread_tables(session);
2164
pthread_mutex_lock(&LOCK_thread_count);
2165
Session_CHECK_SENTRY(session);
2167
pthread_mutex_unlock(&LOCK_thread_count);
2169
mi->slave_running= 0;
2172
Note: the order of the two following calls (first broadcast, then unlock)
2173
is important. Otherwise a killer_thread can execute between the calls and
2174
delete the mi structure leading to a crash! (see BUG#25306 for details)
2176
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
2177
pthread_mutex_unlock(&mi->run_lock);
2180
return(0); // Can't return anything here
2184
/* Slave SQL Thread entry point */
2186
pthread_handler_t handle_slave_sql(void *arg)
2188
Session *session; /* needs to be first for thread_stack */
2189
char llbuff[22],llbuff1[22];
2191
Relay_log_info* rli = &((Master_info*)arg)->rli;
2196
assert(rli->inited);
2197
pthread_mutex_lock(&rli->run_lock);
2198
assert(!rli->slave_running);
2200
rli->events_till_abort = abort_slave_event_count;
2202
session = new Session;
2203
session->thread_stack = (char*)&session; // remember where our stack is
2204
rli->sql_session= session;
2206
/* Inform waiting threads that slave has started */
2207
rli->slave_run_id++;
2208
rli->slave_running = 1;
2210
pthread_detach_this_thread();
2211
if (init_slave_thread(session, SLAVE_Session_SQL))
2214
TODO: this is currently broken - slave start and change master
2215
will be stuck if we fail here
2217
pthread_cond_broadcast(&rli->start_cond);
2218
pthread_mutex_unlock(&rli->run_lock);
2219
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed during slave thread initialization"));
2222
session->init_for_queries();
2223
session->temporary_tables = rli->save_temporary_tables; // restore temp tables
2224
pthread_mutex_lock(&LOCK_thread_count);
2225
threads.append(session);
2226
pthread_mutex_unlock(&LOCK_thread_count);
2228
We are going to set slave_running to 1. Assuming slave I/O thread is
2229
alive and connected, this is going to make Seconds_Behind_Master be 0
2230
i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
2231
the moment we start we can think we are caught up, and the next second we
2232
start receiving data so we realize we are not caught up and
2233
Seconds_Behind_Master grows. No big deal.
2235
rli->abort_slave = 0;
2236
pthread_mutex_unlock(&rli->run_lock);
2237
pthread_cond_broadcast(&rli->start_cond);
2240
Reset errors for a clean start (otherwise, if the master is idle, the SQL
2241
thread may execute no Query_log_event, so the error will remain even
2242
though there's no problem anymore). Do not reset the master timestamp
2243
(imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
2244
as we are not sure that we are going to receive a query, we want to
2245
remember the last master timestamp (to say how many seconds behind we are
2247
But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
2251
//tell the I/O thread to take relay_log_space_limit into account from now on
2252
pthread_mutex_lock(&rli->log_space_lock);
2253
rli->ignore_log_space_limit= 0;
2254
pthread_mutex_unlock(&rli->log_space_lock);
2255
rli->trans_retries= 0; // start from "no error"
2257
if (init_relay_log_pos(rli,
2258
rli->group_relay_log_name.c_str(),
2259
rli->group_relay_log_pos,
2260
1 /*need data lock*/, &errmsg,
2261
1 /*look for a description_event*/))
2263
errmsg_printf(ERRMSG_LVL_ERROR, _("Error initializing relay log position: %s"),
2267
Session_CHECK_SENTRY(session);
2268
assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2270
Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2271
correct position when it's called just after my_b_seek() (the questionable
2272
stuff is those "seek is done on next read" comments in the my_b_seek()
2274
The crude reality is that this assertion randomly fails whereas
2275
replication seems to work fine. And there is no easy explanation why it
2276
fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
2277
init_relay_log_pos() called above). Maybe the assertion would be
2278
meaningful if we held rli->data_lock between the my_b_seek() and the
2281
assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2282
assert(rli->sql_session == session);
2284
if (global_system_variables.log_warnings)
2285
errmsg_printf(ERRMSG_LVL_INFO, _("Slave SQL thread initialized, "
2286
"starting replication in log '%s' at "
2287
"position %s, relay log '%s' position: %s"),
2289
llstr(rli->group_master_log_pos,llbuff),
2290
rli->group_relay_log_name.c_str(),
2291
llstr(rli->group_relay_log_pos,llbuff1));
2293
/* execute init_slave variable */
2294
if (sys_init_slave.value_length)
2296
execute_init_command(session, &sys_init_slave, &LOCK_sys_init_slave);
2297
if (session->is_slave_error)
2299
errmsg_printf(ERRMSG_LVL_ERROR, _("Slave SQL thread aborted. "
2300
"Can't execute init_slave query"));
2306
First check until condition - probably there is nothing to execute. We
2307
do not want to wait for next event in this case.
2309
pthread_mutex_lock(&rli->data_lock);
2310
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
2311
rli->is_until_satisfied(rli->group_master_log_pos))
2314
errmsg_printf(ERRMSG_LVL_INFO, _("Slave SQL thread stopped because it reached its"
2315
" UNTIL position %s"), llstr(rli->until_pos(), buf));
2316
pthread_mutex_unlock(&rli->data_lock);
2319
pthread_mutex_unlock(&rli->data_lock);
2321
/* Read queries from the IO/THREAD until this thread is killed */
2323
while (!sql_slave_killed(session,rli))
2325
session->set_proc_info(_("Reading event from the relay log"));
2326
assert(rli->sql_session == session);
2327
Session_CHECK_SENTRY(session);
2328
if (exec_relay_log_event(session,rli))
2330
// do not scare the user if SQL thread was simply killed or stopped
2331
if (!sql_slave_killed(session,rli))
2334
retrieve as much info as possible from the session and, error
2335
codes and warnings and print this to the error log as to
2336
allow the user to locate the error
2338
uint32_t const last_errno= rli->last_error().number;
2340
if (session->is_error())
2342
char const *const errmsg= session->main_da.message();
2344
if (last_errno == 0)
2346
rli->report(ERROR_LEVEL, session->main_da.sql_errno(), "%s", errmsg);
2348
else if (last_errno != session->main_da.sql_errno())
2350
errmsg_printf(ERRMSG_LVL_ERROR, _("Slave (additional info): %s Error_code: %d"),
2351
errmsg, session->main_da.sql_errno());
2355
/* Print any warnings issued */
2356
List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
2359
Added controlled slave thread cancel for replication
2360
of user-defined variables.
2362
bool udf_error = false;
2365
if (err->code == ER_CANT_OPEN_LIBRARY)
2367
errmsg_printf(ERRMSG_LVL_WARN, _("Slave: %s Error_code: %d"),err->msg, err->code);
2370
errmsg_printf(ERRMSG_LVL_ERROR, _("Error loading user-defined library, slave SQL "
2371
"thread aborted. Install the missing library, "
2372
"and restart the slave SQL thread with "
2373
"\"SLAVE START\". We stopped at log '%s' "
2375
RPL_LOG_NAME, llstr(rli->group_master_log_pos,
2378
errmsg_printf(ERRMSG_LVL_ERROR, _("Error running query, slave SQL thread aborted. "
2379
"Fix the problem, and restart "
2380
"the slave SQL thread with \"SLAVE START\". "
2381
"We stopped at log '%s' position %s"),
2383
llstr(rli->group_master_log_pos, llbuff));
2389
/* Thread stopped. Print the current replication position to the log */
2390
errmsg_printf(ERRMSG_LVL_INFO, _("Slave SQL thread exiting, replication stopped in "
2391
"log '%s' at position %s"),
2393
llstr(rli->group_master_log_pos,llbuff));
2398
Some events set some playgrounds, which won't be cleared because thread
2399
stops. Stopping of this thread may not be known to these events ("stop"
2400
request is detected only by the present function, not by events), so we
2401
must "proactively" clear playgrounds:
2403
rli->cleanup_context(session, 1);
2404
pthread_mutex_lock(&LOCK_thread_count);
2406
Some extra safety, which should not been needed (normally, event deletion
2407
should already have done these assignments (each event which sets these
2408
variables is supposed to set them to 0 before terminating)).
2410
session->query= session->db= session->catalog= 0;
2411
session->query_length= session->db_length= 0;
2412
pthread_mutex_unlock(&LOCK_thread_count);
2413
session->set_proc_info("Waiting for slave mutex on exit");
2414
pthread_mutex_lock(&rli->run_lock);
2415
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
2416
pthread_mutex_lock(&rli->data_lock);
2417
assert(rli->slave_running == 1); // tracking buffer overrun
2418
/* When master_pos_wait() wakes up it will check this and terminate */
2419
rli->slave_running= 0;
2420
/* Forget the relay log's format */
2421
delete rli->relay_log.description_event_for_exec;
2422
rli->relay_log.description_event_for_exec= 0;
2423
/* Wake up master_pos_wait() */
2424
pthread_mutex_unlock(&rli->data_lock);
2425
pthread_cond_broadcast(&rli->data_cond);
2426
rli->ignore_log_space_limit= 0; /* don't need any lock */
2427
rli->save_temporary_tables = session->temporary_tables;
2430
TODO: see if we can do this conditionally in next_event() instead
2431
to avoid unneeded position re-init
2433
session->temporary_tables = 0; // remove tempation from destructor to close them
2434
assert(session->net.buff != 0);
2435
net_end(&session->net); // destructor will not free it, because we are weird
2436
assert(rli->sql_session == session);
2437
Session_CHECK_SENTRY(session);
2438
rli->sql_session= 0;
2439
pthread_mutex_lock(&LOCK_thread_count);
2440
Session_CHECK_SENTRY(session);
2442
pthread_mutex_unlock(&LOCK_thread_count);
2444
Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
2445
is important. Otherwise a killer_thread can execute between the calls and
2446
delete the mi structure leading to a crash! (see BUG#25306 for details)
2448
pthread_cond_broadcast(&rli->stop_cond);
2449
pthread_mutex_unlock(&rli->run_lock); // tell the world we are done
2453
return(0); // Can't return anything here
2458
process_io_create_file()
2461
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
2465
bool cev_not_written;
2466
Session *session = mi->io_session;
2467
NET *net = &mi->drizzle->net;
2469
if (unlikely(!cev->is_valid()))
2472
assert(cev->inited_from_old);
2473
session->file_id = cev->file_id = mi->file_id++;
2474
session->server_id = cev->server_id;
2475
cev_not_written = 1;
2477
if (unlikely(net_request_file(net,cev->fname)))
2479
errmsg_printf(ERRMSG_LVL_ERROR, _("Slave I/O: failed requesting download of '%s'"),
2485
This dummy block is so we could instantiate Append_block_log_event
2486
once and then modify it slightly instead of doing it multiple times
2490
Append_block_log_event aev(session,0,0,0,0);
2494
if (unlikely((num_bytes=my_net_read(net)) == packet_error))
2496
errmsg_printf(ERRMSG_LVL_ERROR, _("Network read error downloading '%s' from master"),
2500
if (unlikely(!num_bytes)) /* eof */
2502
/* 3.23 master wants it */
2503
net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0);
2505
If we wrote Create_file_log_event, then we need to write
2506
Execute_load_log_event. If we did not write Create_file_log_event,
2507
then this is an empty file and we can just do as if the LOAD DATA
2508
INFILE had not existed, i.e. write nothing.
2510
if (unlikely(cev_not_written))
2512
Execute_load_log_event xev(session,0,0);
2513
xev.log_pos = cev->log_pos;
2514
if (unlikely(mi->rli.relay_log.append(&xev)))
2516
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2517
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2518
_("error writing Exec_load event to relay log"));
2521
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2524
if (unlikely(cev_not_written))
2526
cev->block = net->read_pos;
2527
cev->block_len = num_bytes;
2528
if (unlikely(mi->rli.relay_log.append(cev)))
2530
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2531
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2532
_("error writing Create_file event to relay log"));
2536
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2540
aev.block = net->read_pos;
2541
aev.block_len = num_bytes;
2542
aev.log_pos = cev->log_pos;
2543
if (unlikely(mi->rli.relay_log.append(&aev)))
2545
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2546
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2547
_("error writing Append_block event to relay log"));
2550
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
2561
Start using a new binary log on the master
2565
mi master_info for the slave
2566
rev The rotate log event read from the binary log
2569
Updates the master info with the place in the next binary
2570
log where we should start reading.
2571
Rotate the relay log to avoid mixed-format relay logs.
2574
We assume we already locked mi->data_lock
2578
1 Log event is illegal
2582
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
2584
safe_mutex_assert_owner(&mi->data_lock);
2586
if (unlikely(!rev->is_valid()))
2589
/* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2590
mi->setLogName(rev->new_log_ident.c_str());
2591
mi->setLogPosition(rev->pos);
2593
If we do not do this, we will be getting the first
2594
rotate event forever, so we need to not disconnect after one.
2596
if (disconnect_slave_event_count)
2597
mi->events_till_disconnect++;
2600
If description_event_for_queue is format <4, there is conversion in the
2601
relay log to the slave's format (4). And Rotate can mean upgrade or
2602
nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
2603
no need to reset description_event_for_queue now. And if it's nothing (same
2604
master version as before), no need (still using the slave's format).
2606
if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
2608
delete mi->rli.relay_log.description_event_for_queue;
2609
/* start from format 3 (DRIZZLE 4.0) again */
2610
mi->rli.relay_log.description_event_for_queue= new
2611
Format_description_log_event(3);
2614
Rotate the relay log makes binlog format detection easier (at next slave
2615
start or mysqlbinlog)
2617
rotate_relay_log(mi); /* will take the right mutexes */
2622
Reads a 3.23 event and converts it to the slave's format. This code was
2623
copied from DRIZZLE 4.0.
2625
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
2628
const char *errmsg = 0;
2630
bool ignore_event= 0;
2632
Relay_log_info *rli= &mi->rli;
2635
If we get Load event, we need to pass a non-reusable buffer
2636
to read_log_event, so we do a trick
2638
if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
2640
if (unlikely(!(tmp_buf=(char*)malloc(event_len+1))))
2642
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2643
ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
2646
memcpy(tmp_buf,buf,event_len);
2648
Create_file constructor wants a 0 as last char of buffer, this 0 will
2649
serve as the string-termination char for the file's name (which is at the
2651
We must increment event_len, otherwise the event constructor will not see
2652
this end 0, which leads to segfault.
2654
tmp_buf[event_len++]=0;
2655
int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
2656
buf = (const char*)tmp_buf;
2659
This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
2660
send the loaded file, and write it to the relay log in the form of
2661
Append_block/Exec_load (the SQL thread needs the data, as that thread is not
2662
connected to the master).
2664
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2665
mi->rli.relay_log.description_event_for_queue);
2668
errmsg_printf(ERRMSG_LVL_ERROR, _("Read invalid event from master: '%s', "
2669
"master could be corrupt but a more likely cause "
2670
"of this is a bug"),
2672
free((char*) tmp_buf);
2676
pthread_mutex_lock(&mi->data_lock);
2677
ev->log_pos= mi->getLogPosition(); /* 3.23 events don't contain log_pos */
2678
switch (ev->get_type_code()) {
2684
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2687
pthread_mutex_unlock(&mi->data_lock);
2692
case CREATE_FILE_EVENT:
2694
Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
2695
queue_old_event() which is for 3.23 events which don't comprise
2696
CREATE_FILE_EVENT. This is because read_log_event() above has just
2697
transformed LOAD_EVENT into CREATE_FILE_EVENT.
2700
/* We come here when and only when tmp_buf != 0 */
2701
assert(tmp_buf != 0);
2703
ev->log_pos+= inc_pos;
2704
int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2706
mi->incrementLogPosition(inc_pos);
2707
pthread_mutex_unlock(&mi->data_lock);
2708
free((char*)tmp_buf);
2715
if (likely(!ignore_event))
2719
Don't do it for fake Rotate events (see comment in
2720
Log_event::Log_event(const char* buf...) in log_event.cc).
2722
ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
2723
if (unlikely(rli->relay_log.append(ev)))
2726
pthread_mutex_unlock(&mi->data_lock);
2729
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2732
mi->incrementLogPosition(inc_pos);
2733
pthread_mutex_unlock(&mi->data_lock);
2738
Reads a 4.0 event and converts it to the slave's format. This code was copied
2739
from queue_binlog_ver_1_event(), with some affordable simplifications.
2741
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
2744
const char *errmsg = 0;
2747
Relay_log_info *rli= &mi->rli;
2749
/* read_log_event() will adjust log_pos to be end_log_pos */
2750
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2751
mi->rli.relay_log.description_event_for_queue);
2754
errmsg_printf(ERRMSG_LVL_ERROR, _("Read invalid event from master: '%s', "
2755
"master could be corrupt but a more likely cause of "
2758
free((char*) tmp_buf);
2761
pthread_mutex_lock(&mi->data_lock);
2762
switch (ev->get_type_code()) {
2766
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2769
pthread_mutex_unlock(&mi->data_lock);
2778
if (unlikely(rli->relay_log.append(ev)))
2781
pthread_mutex_unlock(&mi->data_lock);
2784
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2786
mi->incrementLogPosition(inc_pos);
2788
pthread_mutex_unlock(&mi->data_lock);
2795
Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
2796
(exactly, slave's) format. To do the conversion, we create a 5.0 event from
2797
the 3.23/4.0 bytes, then write this event to the relay log.
2800
Test this code before release - it has to be tested on a separate
2801
setup with 3.23 master or 4.0 master
2804
static int32_t queue_old_event(Master_info *mi, const char *buf,
2807
switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
2810
return(queue_binlog_ver_1_event(mi,buf,event_len));
2812
return(queue_binlog_ver_3_event(mi,buf,event_len));
2813
default: /* unsupported format; eg version 2 */
2821
If the event is 3.23/4.0, passes it to queue_old_event() which will convert
2822
it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
2823
no format conversion, it's pure read/write of bytes.
2824
So a 5.0.0 slave's relay log can contain events in the slave's format or in
2828
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
2832
uint32_t inc_pos= 0;
2833
Relay_log_info *rli= &mi->rli;
2834
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
2837
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
2838
buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
2839
return(queue_old_event(mi,buf,event_len));
2841
pthread_mutex_lock(&mi->data_lock);
2843
switch (buf[EVENT_TYPE_OFFSET]) {
2846
We needn't write this event to the relay log. Indeed, it just indicates a
2847
master server shutdown. The only thing this does is cleaning. But
2848
cleaning is already done on a per-master-thread basis (as the master
2849
server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
2850
prepared statements' deletion are TODO only when we binlog prep stmts).
2852
We don't even increment mi->master_log_pos, because we may be just after
2853
a Rotate event. Btw, in a few milliseconds we are going to have a Start
2854
event from the next binlog (unless the master is presently running
2860
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
2861
if (unlikely(process_io_rotate(mi,&rev)))
2863
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2867
Now the I/O thread has just changed its mi->master_log_name, so
2868
incrementing mi->master_log_pos is nonsense.
2873
case FORMAT_DESCRIPTION_EVENT:
2876
Create an event, and save it (when we rotate the relay log, we will have
2877
to write this event again).
2880
We are the only thread which reads/writes description_event_for_queue.
2881
The relay_log struct does not move (though some members of it can
2882
change), so we needn't any lock (no rli->data_lock, no log lock).
2884
Format_description_log_event* tmp;
2886
if (!(tmp= (Format_description_log_event*)
2887
Log_event::read_log_event(buf, event_len, &errmsg,
2888
mi->rli.relay_log.description_event_for_queue)))
2890
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2893
delete mi->rli.relay_log.description_event_for_queue;
2894
mi->rli.relay_log.description_event_for_queue= tmp;
2896
Though this does some conversion to the slave's format, this will
2897
preserve the master's binlog format version, and number of event types.
2900
If the event was not requested by the slave (the slave did not ask for
2901
it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
2903
inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
2907
case HEARTBEAT_LOG_EVENT:
2910
HB (heartbeat) cannot come before RL (Relay)
2913
Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
2916
error= ER_SLAVE_HEARTBEAT_FAILURE;
2917
error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2918
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2919
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2920
error_msg.append(STRING_WITH_LEN(" log_pos "));
2921
llstr(hb.log_pos, llbuf);
2922
error_msg.append(llbuf, strlen(llbuf));
2925
mi->received_heartbeats++;
2927
compare local and event's versions of log_file, log_pos.
2929
Heartbeat is sent only after an event corresponding to the corrdinates
2930
the heartbeat carries.
2931
Slave can not have a difference in coordinates except in the only
2932
special case when mi->master_log_name, master_log_pos have never
2933
been updated by Rotate event i.e when slave does not have any history
2934
with the master (and thereafter mi->master_log_pos is NULL).
2936
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2938
if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
2939
|| mi->getLogPosition() != hb.log_pos)
2941
/* missed events of heartbeat from the past */
2942
error= ER_SLAVE_HEARTBEAT_FAILURE;
2943
error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
2944
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2945
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2946
error_msg.append(STRING_WITH_LEN(" log_pos "));
2947
llstr(hb.log_pos, llbuf);
2948
error_msg.append(llbuf, strlen(llbuf));
2951
goto skip_relay_logging;
2961
If this event is originating from this server, don't queue it.
2962
We don't check this for 3.23 events because it's simpler like this; 3.23
2963
will be filtered anyway by the SQL slave thread which also tests the
2964
server id (we must also keep this test in the SQL thread, in case somebody
2965
upgrades a 4.0 slave which has a not-filtered relay log).
2967
ANY event coming from ourselves can be ignored: it is obvious for queries;
2968
for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
2969
(--log-slave-updates would not log that) unless this slave is also its
2970
direct master (an unsupported, useless setup!).
2973
pthread_mutex_lock(log_lock);
2975
if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
2976
!mi->rli.replicate_same_server_id)
2979
Do not write it to the relay log.
2980
a) We still want to increment mi->master_log_pos, so that we won't
2981
re-read this event from the master if the slave IO thread is now
2982
stopped/restarted (more efficient if the events we are ignoring are big
2984
b) We want to record that we are skipping events, for the information of
2985
the slave SQL thread, otherwise that thread may let
2986
rli->group_relay_log_pos stay too small if the last binlog's event is
2988
But events which were generated by this slave and which do not exist in
2989
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
2992
if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
2993
buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
2994
buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
2996
mi->incrementLogPosition(inc_pos);
2997
memcpy(rli->ign_master_log_name_end, mi->getLogName(), FN_REFLEN);
2998
assert(rli->ign_master_log_name_end[0]);
2999
rli->ign_master_log_pos_end= mi->getLogPosition();
3001
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3005
/* write the event to the relay log */
3006
if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3008
mi->incrementLogPosition(inc_pos);
3009
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3013
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
3015
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
3017
pthread_mutex_unlock(log_lock);
3022
pthread_mutex_unlock(&mi->data_lock);
3024
mi->report(ERROR_LEVEL, error, ER(error),
3025
(error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
3026
_("could not queue event from master") :
3032
void end_relay_log_info(Relay_log_info* rli)
3036
if (rli->info_fd >= 0)
3038
end_io_cache(&rli->info_file);
3039
(void) my_close(rli->info_fd, MYF(MY_WME));
3042
if (rli->cur_log_fd >= 0)
3044
end_io_cache(&rli->cache_buf);
3045
(void)my_close(rli->cur_log_fd, MYF(MY_WME));
3046
rli->cur_log_fd = -1;
3049
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
3050
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3052
Delete the slave's temporary tables from memory.
3053
In the future there will be other actions than this, to ensure persistance
3054
of slave's temp tables after shutdown.
3056
rli->close_temporary_tables();
3061
Try to connect until successful or slave killed
3065
session Thread handler for slave
3066
DRIZZLE DRIZZLE connection handle
3067
mi Replication handle
3074
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi)
3076
return(connect_to_master(session, drizzle, mi, 0, 0));
3085
Try to connect until successful or slave killed or we have retried
3086
master_retry_count times
3089
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
3090
bool reconnect, bool suppress_warnings)
3092
int32_t slave_was_killed;
3093
int32_t last_errno= -2; // impossible error
3094
uint32_t err_count=0;
3097
mi->events_till_disconnect = disconnect_slave_event_count;
3098
uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
3099
if (opt_slave_compressed_protocol)
3100
client_flag=CLIENT_COMPRESS; /* We will use compression */
3102
drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3103
drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3105
while (!(slave_was_killed = io_slave_killed(session,mi)) &&
3106
(reconnect ? drizzle_reconnect(drizzle) != 0 :
3107
drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
3108
mi->getPort(), 0, client_flag) == 0))
3110
/* Don't repeat last error */
3111
if ((int32_t)drizzle_errno(drizzle) != last_errno)
3113
last_errno=drizzle_errno(drizzle);
3114
suppress_warnings= 0;
3115
mi->report(ERROR_LEVEL, last_errno,
3116
_("error %s to master '%s@%s:%d'"
3117
" - retry-time: %d retries: %u"),
3118
(reconnect ? _("reconnecting") : _("connecting")),
3119
mi->getUsername(), mi->getHostname(), mi->getPort(),
3120
mi->getConnectionRetry(), master_retry_count);
3123
By default we try forever. The reason is that failure will trigger
3124
master election, so if the user did not set master_retry_count we
3125
do not want to have election triggered on the first failure to
3128
if (++err_count == master_retry_count)
3133
safe_sleep(session,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3137
if (!slave_was_killed)
3141
if (!suppress_warnings && global_system_variables.log_warnings)
3142
errmsg_printf(ERRMSG_LVL_INFO, _("Slave: connected to master '%s@%s:%d', "
3143
"replication resumed in log '%s' at "
3144
"position %s"), mi->getUsername(),
3145
mi->getHostname(), mi->getPort(),
3147
llstr(mi->getLogPosition(),llbuff));
3150
drizzle->reconnect= 1;
3151
return(slave_was_killed);
3159
Try to connect until successful or slave killed or we have retried
3160
master_retry_count times
3163
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
3164
bool suppress_warnings)
3166
return(connect_to_master(session, drizzle, mi, 1, suppress_warnings));
3171
Store the file and position where the execute-slave thread are in the
3175
flush_relay_log_info()
3176
rli Relay log information
3179
- As this is only called by the slave thread, we don't need to
3180
have a lock on this.
3181
- If there is an active transaction, then we don't update the position
3182
in the relay log. This is to ensure that we re-execute statements
3183
if we die in the middle of an transaction that was rolled back.
3184
- As a transaction never spans binary logs, we don't have to handle the
3185
case where we do a relay-log-rotation in the middle of the transaction.
3186
If this would not be the case, we would have to ensure that we
3187
don't delete the relay log file where the transaction started when
3188
we switch to a new relay log file.
3191
- Change the log file information to a binary format to avoid calling
3199
bool flush_relay_log_info(Relay_log_info* rli)
3203
if (unlikely(rli->no_storage))
3211
Called when we notice that the current "hot" log got rotated under our feet.
3214
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
3216
assert(rli->cur_log != &rli->cache_buf);
3217
assert(rli->cur_log_fd == -1);
3219
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3220
if ((rli->cur_log_fd=open_binlog(cur_log, rli->event_relay_log_name.c_str(), errmsg)) < 0)
3223
We want to start exactly where we was before:
3224
relay_log_pos Current log pos
3225
pending Number of bytes already processed from the event
3227
rli->event_relay_log_pos= cmax(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3228
my_b_seek(cur_log,rli->event_relay_log_pos);
3233
static Log_event* next_event(Relay_log_info* rli)
3236
IO_CACHE* cur_log = rli->cur_log;
3237
pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3238
const char* errmsg=0;
3239
Session* session = rli->sql_session;
3241
assert(session != 0);
3243
if (abort_slave_event_count && !rli->events_till_abort--)
3247
For most operations we need to protect rli members with data_lock,
3248
so we assume calling function acquired this mutex for us and we will
3249
hold it for the most of the loop below However, we will release it
3250
whenever it is worth the hassle, and in the cases when we go into a
3251
pthread_cond_wait() with the non-data_lock mutex
3253
safe_mutex_assert_owner(&rli->data_lock);
3255
while (!sql_slave_killed(session,rli))
3258
We can have two kinds of log reading:
3260
rli->cur_log points at the IO_CACHE of relay_log, which
3261
is actively being updated by the I/O thread. We need to be careful
3262
in this case and make sure that we are not looking at a stale log that
3263
has already been rotated. If it has been, we reopen the log.
3265
The other case is much simpler:
3266
We just have a read only log that nobody else will be updating.
3269
if ((hot_log = (cur_log != &rli->cache_buf)))
3271
assert(rli->cur_log_fd == -1); // foreign descriptor
3272
pthread_mutex_lock(log_lock);
3275
Reading xxx_file_id is safe because the log will only
3276
be rotated when we hold relay_log.LOCK_log
3278
if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
3280
// The master has switched to a new log file; Reopen the old log file
3281
cur_log=reopen_relay_log(rli, &errmsg);
3282
pthread_mutex_unlock(log_lock);
3283
if (!cur_log) // No more log files
3285
hot_log=0; // Using old binary log
3289
As there is no guarantee that the relay is open (for example, an I/O
3290
error during a write by the slave I/O thread may have closed it), we
3293
if (!my_b_inited(cur_log))
3295
assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3296
assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
3299
Relay log is always in new format - if the master is 3.23, the
3300
I/O thread will convert the format for us.
3301
A problem: the description event may be in a previous relay log. So if
3302
the slave has been shutdown meanwhile, we would have to look in old relay
3303
logs, which may even have been deleted. So we need to write this
3304
description event at the beginning of the relay log.
3305
When the relay log is created when the I/O thread starts, easy: the
3306
master will send the description event and we will queue it.
3307
But if the relay log is created by new_file(): then the solution is:
3308
DRIZZLE_BIN_LOG::open() will write the buffered description event.
3310
if ((ev=Log_event::read_log_event(cur_log,0,
3311
rli->relay_log.description_event_for_exec)))
3314
assert(session==rli->sql_session);
3316
read it while we have a lock, to avoid a mutex lock in
3317
inc_event_relay_log_pos()
3319
rli->future_event_relay_log_pos= my_b_tell(cur_log);
3321
pthread_mutex_unlock(log_lock);
3324
assert(session==rli->sql_session);
3325
if (opt_reckless_slave) // For mysql-test
3327
if (cur_log->error < 0)
3329
errmsg = "slave SQL thread aborted because of I/O error";
3331
pthread_mutex_unlock(log_lock);
3334
if (!cur_log->error) /* EOF */
3337
On a hot log, EOF means that there are no more updates to
3338
process and we must block until I/O thread adds some and
3339
signals us to continue
3344
We say in Seconds_Behind_Master that we have "caught up". Note that
3345
for example if network link is broken but I/O slave thread hasn't
3346
noticed it (slave_net_timeout not elapsed), then we'll say "caught
3347
up" whereas we're not really caught up. Fixing that would require
3348
internally cutting timeout in smaller pieces in network read, no
3349
thanks. Another example: SQL has caught up on I/O, now I/O has read
3350
a new event and is queuing it; the false "0" will exist until SQL
3351
finishes executing the new event; it will be look abnormal only if
3352
the events have old timestamps (then you get "many", 0, "many").
3354
Transient phases like this can be fixed with implemeting
3355
Heartbeat event which provides the slave the status of the
3356
master at time the master does not have any new update to send.
3357
Seconds_Behind_Master would be zero only when master has no
3358
more updates in binlog for slave. The heartbeat can be sent
3359
in a (small) fraction of slave_net_timeout. Until it's done
3360
rli->last_master_timestamp is temporarely (for time of
3361
waiting for the following event) reset whenever EOF is
3364
time_t save_timestamp= rli->last_master_timestamp;
3365
rli->last_master_timestamp= 0;
3367
assert(rli->relay_log.get_open_count() ==
3368
rli->cur_log_old_open_count);
3370
if (rli->ign_master_log_name_end[0])
3372
/* We generate and return a Rotate, to make our positions advance */
3373
ev= new Rotate_log_event(rli->ign_master_log_name_end,
3374
0, rli->ign_master_log_pos_end,
3375
Rotate_log_event::DUP_NAME);
3376
rli->ign_master_log_name_end[0]= 0;
3377
pthread_mutex_unlock(log_lock);
3380
errmsg= "Slave SQL thread failed to create a Rotate event "
3381
"(out of memory?), SHOW SLAVE STATUS may be inaccurate";
3384
ev->server_id= 0; // don't be ignored by slave SQL thread
3389
We can, and should release data_lock while we are waiting for
3390
update. If we do not, show slave status will block
3392
pthread_mutex_unlock(&rli->data_lock);
3396
- the I/O thread has reached log_space_limit
3397
- the SQL thread has read all relay logs, but cannot purge for some
3399
* it has already purged all logs except the current one
3400
* there are other logs than the current one but they're involved in
3401
a transaction that finishes in the current one (or is not finished)
3403
Wake up the possibly waiting I/O thread, and set a boolean asking
3404
the I/O thread to temporarily ignore the log_space_limit
3405
constraint, because we do not want the I/O thread to block because of
3406
space (it's ok if it blocks for any other reason (e.g. because the
3407
master does not send anything). Then the I/O thread stops waiting
3408
and reads more events.
3409
The SQL thread decides when the I/O thread should take log_space_limit
3410
into account again : ignore_log_space_limit is reset to 0
3411
in purge_first_log (when the SQL thread purges the just-read relay
3412
log), and also when the SQL thread starts. We should also reset
3413
ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
3414
fact, no need as RESET SLAVE requires that the slave
3415
be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
3418
pthread_mutex_lock(&rli->log_space_lock);
3419
// prevent the I/O thread from blocking next times
3420
rli->ignore_log_space_limit= 1;
3422
If the I/O thread is blocked, unblock it. Ok to broadcast
3423
after unlock, because the mutex is only destroyed in
3424
~Relay_log_info(), i.e. when rli is destroyed, and rli will
3425
not be destroyed before we exit the present function.
3427
pthread_mutex_unlock(&rli->log_space_lock);
3428
pthread_cond_broadcast(&rli->log_space_cond);
3429
// Note that wait_for_update_relay_log unlocks lock_log !
3430
rli->relay_log.wait_for_update_relay_log(rli->sql_session);
3431
// re-acquire data lock since we released it earlier
3432
pthread_mutex_lock(&rli->data_lock);
3433
rli->last_master_timestamp= save_timestamp;
3437
If the log was not hot, we need to move to the next log in
3438
sequence. The next log could be hot or cold, we deal with both
3439
cases separately after doing some common initialization
3441
end_io_cache(cur_log);
3442
assert(rli->cur_log_fd >= 0);
3443
my_close(rli->cur_log_fd, MYF(MY_WME));
3444
rli->cur_log_fd = -1;
3446
if (relay_log_purge)
3449
purge_first_log will properly set up relay log coordinates in rli.
3450
If the group's coordinates are equal to the event's coordinates
3451
(i.e. the relay log was not rotated in the middle of a group),
3452
we can purge this relay log too.
3453
We do uint64_t and string comparisons, this may be slow but
3454
- purging the last relay log is nice (it can save 1GB of disk), so we
3455
like to detect the case where we can do it, and given this,
3456
- I see no better detection method
3457
- purge_first_log is not called that often
3459
if (rli->relay_log.purge_first_log
3461
rli->group_relay_log_pos == rli->event_relay_log_pos
3462
&& !strcmp(rli->group_relay_log_name.c_str(), rli->event_relay_log_name.c_str())))
3464
errmsg = "Error purging processed logs";
3471
If hot_log is set, then we already have a lock on
3472
LOCK_log. If not, we have to get the lock.
3474
According to Sasha, the only time this code will ever be executed
3475
is if we are recovering from a bug.
3477
if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
3479
errmsg = "error switching to the next log";
3482
rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3483
rli->event_relay_log_name.assign(rli->linfo.log_file_name);
3484
flush_relay_log_info(rli);
3488
Now we want to open this next log. To know if it's a hot log (the one
3489
being written by the I/O thread now) or a cold log, we can use
3490
is_active(); if it is hot, we use the I/O cache; if it's cold we open
3491
the file normally. But if is_active() reports that the log is hot, this
3492
may change between the test and the consequence of the test. So we may
3493
open the I/O cache whereas the log is now cold, which is nonsense.
3494
To guard against this, we need to have LOCK_log.
3497
if (!hot_log) /* if hot_log, we already have this mutex */
3498
pthread_mutex_lock(log_lock);
3499
if (rli->relay_log.is_active(rli->linfo.log_file_name))
3502
if (global_system_variables.log_warnings)
3503
errmsg_printf(ERRMSG_LVL_INFO, _("next log '%s' is currently active"),
3504
rli->linfo.log_file_name);
3506
rli->cur_log= cur_log= rli->relay_log.get_log_file();
3507
rli->cur_log_old_open_count= rli->relay_log.get_open_count();
3508
assert(rli->cur_log_fd == -1);
3511
Read pointer has to be at the start since we are the only
3513
We must keep the LOCK_log to read the 4 first bytes, as this is a hot
3514
log (same as when we call read_log_event() above: for a hot log we
3517
if (check_binlog_magic(cur_log,&errmsg))
3519
if (!hot_log) pthread_mutex_unlock(log_lock);
3522
if (!hot_log) pthread_mutex_unlock(log_lock);
3525
if (!hot_log) pthread_mutex_unlock(log_lock);
3527
if we get here, the log was not hot, so we will have to open it
3528
ourselves. We are sure that the log is still not hot now (a log can get
3529
from hot to cold, but not from cold to hot). No need for LOCK_log.
3532
if (global_system_variables.log_warnings)
3533
errmsg_printf(ERRMSG_LVL_INFO, _("next log '%s' is not active"),
3534
rli->linfo.log_file_name);
3536
// open_binlog() will check the magic header
3537
if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
3544
Read failed with a non-EOF error.
3545
TODO: come up with something better to handle this error
3548
pthread_mutex_unlock(log_lock);
3549
errmsg_printf(ERRMSG_LVL_ERROR, _("Slave SQL thread: I/O error reading "
3550
"event(errno: %d cur_log->error: %d)"),
3551
my_errno,cur_log->error);
3552
// set read position to the beginning of the event
3553
my_b_seek(cur_log,rli->event_relay_log_pos);
3554
/* otherwise, we have had a partial read */
3555
errmsg = _("Aborting slave SQL thread because of partial event read");
3556
break; // To end of function
3559
if (!errmsg && global_system_variables.log_warnings)
3561
errmsg_printf(ERRMSG_LVL_INFO, _("Error reading relay log event: %s"),
3562
_("slave SQL thread was killed"));
3568
errmsg_printf(ERRMSG_LVL_ERROR, _("Error reading relay log event: %s"), errmsg);
3573
Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
3574
because of size is simpler because when we do it we already have all relevant
3575
locks; here we don't, so this function is mainly taking locks).
3576
Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
3580
void rotate_relay_log(Master_info* mi)
3582
Relay_log_info* rli= &mi->rli;
3584
/* We don't lock rli->run_lock. This would lead to deadlocks. */
3585
pthread_mutex_lock(&mi->run_lock);
3588
We need to test inited because otherwise, new_file() will attempt to lock
3589
LOCK_log, which may not be inited (if we're not a slave).
3596
/* If the relay log is closed, new_file() will do nothing. */
3597
rli->relay_log.new_file();
3600
We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
3601
be counted, so imagine a succession of FLUSH LOGS and assume the slave
3602
threads are started:
3603
relay_log_space decreases by the size of the deleted relay log, but does
3604
not increase, so flush-after-flush we may become negative, which is wrong.
3605
Even if this will be corrected as soon as a query is replicated on the
3606
slave (because the I/O thread will then call harvest_bytes_written() which
3607
will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
3608
output in SHOW SLAVE STATUS meanwhile. So we harvest now.
3609
If the log is closed, then this will just harvest the last writes, probably
3610
0 as they probably have been harvested.
3612
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3614
pthread_mutex_unlock(&mi->run_lock);
3620
Detects, based on master's version (as found in the relay log), if master
3622
@param rli Relay_log_info which tells the master's version
3623
@param bug_id Number of the bug as found in bugs.mysql.com
3624
@param report bool report error message, default TRUE
3625
@return true if master has the bug, FALSE if it does not.
3627
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
3629
struct st_version_range_for_one_bug {
3631
const unsigned char introduced_in[3]; // first version with bug
3632
const unsigned char fixed_in[3]; // first version with fix
3634
static struct st_version_range_for_one_bug versions_for_all_bugs[]=
3636
{24432, { 5, 0, 24 }, { 5, 0, 38 } },
3637
{24432, { 5, 1, 12 }, { 5, 1, 17 } },
3638
{33029, { 5, 0, 0 }, { 5, 0, 58 } },
3639
{33029, { 5, 1, 0 }, { 5, 1, 12 } },
3641
const unsigned char *master_ver=
3642
rli->relay_log.description_event_for_exec->server_version_split;
3644
assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3647
i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3649
const unsigned char *introduced_in= versions_for_all_bugs[i].introduced_in,
3650
*fixed_in= versions_for_all_bugs[i].fixed_in;
3651
if ((versions_for_all_bugs[i].bug_id == bug_id) &&
3652
(memcmp(introduced_in, master_ver, 3) <= 0) &&
3653
(memcmp(fixed_in, master_ver, 3) > 0))
3658
// a short message for SHOW SLAVE STATUS (message length constraints)
3659
my_printf_error(ER_UNKNOWN_ERROR,
3660
_("master may suffer from"
3661
" http://bugs.mysql.com/bug.php?id=%u"
3662
" so slave stops; check error log on slave"
3663
" for more info"), MYF(0), bug_id);
3664
// a verbose message for the error log
3665
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
3666
_("According to the master's version ('%s'),"
3667
" it is probable that master suffers from this bug:"
3668
" http://bugs.mysql.com/bug.php?id=%u"
3669
" and thus replicating the current binary log event"
3670
" may make the slave's data become different from the"
3672
" To take no risk, slave refuses to replicate"
3673
" this event and stops."
3674
" We recommend that all updates be stopped on the"
3675
" master and slave, that the data of both be"
3676
" manually synchronized,"
3677
" that master's binary logs be deleted,"
3678
" that master be upgraded to a version at least"
3679
" equal to '%d.%d.%d'. Then replication can be"
3681
rli->relay_log.description_event_for_exec->server_version,
3683
fixed_in[0], fixed_in[1], fixed_in[2]);
3691
BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
3692
exclusive, if one statement in a SP generated AUTO_INCREMENT value
3693
by the top statement, all statements after it would be considered
3694
generated AUTO_INCREMENT value by the top statement, and a
3695
erroneous INSERT_ID value might be associated with these statement,
3696
which could cause duplicate entry error and stop the slave.
3698
Detect buggy master to work around.
3700
bool rpl_master_erroneous_autoinc(Session *session)
3702
if (active_mi && active_mi->rli.sql_session == session)
3704
Relay_log_info *rli= &active_mi->rli;
3705
return rpl_master_has_bug(rli, 33029, false);
3710
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
3711
template class I_List_iterator<i_string>;
3712
template class I_List_iterator<i_string_pair>;
3716
@} (end of group Replication)