1
/* Copyright (C) 2000-2003 DRIZZLE AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
@addtogroup Replication
23
@brief Code to run the io thread and the sql thread on the
26
#include <drizzled/server_includes.h>
28
#include <storage/myisam/myisam.h>
29
#include <drizzled/rpl_mi.h>
30
#include <drizzled/rpl_rli.h>
31
#include <drizzled/sql_repl.h>
32
#include <drizzled/rpl_filter.h>
33
#include <mysys/thr_alarm.h>
34
#include <libdrizzle/errmsg.h>
35
#include <mysys/mysys_err.h>
36
#include <drizzled/error.h>
37
#include <drizzled/sql_parse.h>
38
#include <drizzled/gettext.h>
41
#if TIME_WITH_SYS_TIME
42
# include <sys/time.h>
46
# include <sys/time.h>
52
#include <drizzled/tztime.h>
54
#include "rpl_tblmap.h"
56
#define MAX_SLAVE_RETRY_PAUSE 5
57
bool use_slave_mask = 0;
58
MY_BITMAP slave_error_mask;
60
typedef bool (*CHECK_KILLED_FUNC)(Session*,void*);
62
char* slave_load_tmpdir = 0;
63
Master_info *active_mi= 0;
64
bool replicate_same_server_id;
65
uint64_t relay_log_space_limit = 0;
68
When slave thread exits, we need to remember the temporary tables so we
69
can re-use them on slave start.
71
TODO: move the vars below under Master_info
74
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
75
int32_t events_till_abort = -1;
77
enum enum_slave_reconnect_actions
79
SLAVE_RECON_ACT_REG= 0,
80
SLAVE_RECON_ACT_DUMP= 1,
81
SLAVE_RECON_ACT_EVENT= 2,
85
enum enum_slave_reconnect_messages
87
SLAVE_RECON_MSG_WAIT= 0,
88
SLAVE_RECON_MSG_KILLED_WAITING= 1,
89
SLAVE_RECON_MSG_AFTER= 2,
90
SLAVE_RECON_MSG_FAILED= 3,
91
SLAVE_RECON_MSG_COMMAND= 4,
92
SLAVE_RECON_MSG_KILLED_AFTER= 5,
96
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
99
N_("Waiting to reconnect after a failed registration on master"),
100
N_("Slave I/O thread killed while waitnig to reconnect after a "
101
"failed registration on master"),
102
N_("Reconnecting after a failed registration on master"),
103
N_("failed registering on master, reconnecting to try again, "
104
"log '%s' at postion %s"),
105
"COM_REGISTER_SLAVE",
106
N_("Slave I/O thread killed during or after reconnect")
109
N_("Waiting to reconnect after a failed binlog dump request"),
110
N_("Slave I/O thread killed while retrying master dump"),
111
N_("Reconnecting after a failed binlog dump request"),
112
N_("failed dump request, reconnecting to try again, "
113
"log '%s' at postion %s"),
115
N_("Slave I/O thread killed during or after reconnect")
118
N_("Waiting to reconnect after a failed master event read"),
119
N_("Slave I/O thread killed while waiting to reconnect "
120
"after a failed read"),
121
N_("Reconnecting after a failed master event read"),
122
N_("Slave I/O thread: Failed reading log event, "
123
"reconnecting to retry, log '%s' at postion %s"),
125
N_("Slave I/O thread killed during or after a "
126
"reconnect done to recover from failed read")
131
typedef enum { SLAVE_Session_IO, SLAVE_Session_SQL} SLAVE_Session_TYPE;
133
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
134
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
135
static bool wait_for_relay_log_space(Relay_log_info* rli);
136
static inline bool io_slave_killed(Session* session,Master_info* mi);
137
static inline bool sql_slave_killed(Session* session,Relay_log_info* rli);
138
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type);
139
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi);
140
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
141
bool suppress_warnings);
142
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
143
bool reconnect, bool suppress_warnings);
144
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
145
void* thread_killed_arg);
146
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
147
static Log_event* next_event(Relay_log_info* rli);
148
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
149
static int32_t terminate_slave_thread(Session *session,
150
pthread_mutex_t* term_lock,
151
pthread_cond_t* term_cond,
152
volatile uint32_t *slave_running,
154
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info);
157
Find out which replications threads are running
161
mask Return value here
162
mi master_info for slave
163
inverse If set, returns which threads are not running
166
Get a bit mask for which threads are running so that we can later restart
170
mask If inverse == 0, running threads
171
If inverse == 1, stopped threads
174
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
176
bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
177
register int32_t tmp_mask=0;
180
tmp_mask |= SLAVE_IO;
182
tmp_mask |= SLAVE_SQL;
184
tmp_mask^= (SLAVE_IO | SLAVE_SQL);
194
void lock_slave_threads(Master_info* mi)
196
//TODO: see if we can do this without dual mutex
197
pthread_mutex_lock(&mi->run_lock);
198
pthread_mutex_lock(&mi->rli.run_lock);
204
unlock_slave_threads()
207
void unlock_slave_threads(Master_info* mi)
209
//TODO: see if we can do this without dual mutex
210
pthread_mutex_unlock(&mi->rli.run_lock);
211
pthread_mutex_unlock(&mi->run_lock);
216
/* Initialize slave structures */
221
This is called when mysqld starts. Before client connections are
222
accepted. However bootstrap may conflict with us if it does START SLAVE.
223
So it's safer to take the lock.
225
pthread_mutex_lock(&LOCK_active_mi);
227
TODO: re-write this to interate through the list of files
230
active_mi= new Master_info;
233
If master_host is not specified, try to read it from the master_info file.
234
If master_host is specified, create the master_info file if it doesn't
239
sql_print_error(_("Failed to allocate memory for the master info structure"));
243
if (active_mi->init_master_info(master_info_file, relay_log_info_file, (SLAVE_IO | SLAVE_SQL)))
245
sql_print_error(_("Failed to initialize the master info structure"));
249
/* If server id is not set, start_slave_thread() will say it */
251
if (active_mi->host[0] && !opt_skip_slave_start)
253
if (start_slave_threads(1 /* need mutex */,
254
0 /* no wait for start*/,
258
SLAVE_IO | SLAVE_SQL))
260
sql_print_error(_("Failed to create slave threads"));
264
pthread_mutex_unlock(&LOCK_active_mi);
268
pthread_mutex_unlock(&LOCK_active_mi);
274
Init function to set up array for errors that should be skipped for slave
277
init_slave_skip_errors()
278
arg List of errors numbers to skip, separated with ','
281
Called from get_options() in mysqld.cc on start-up
284
void init_slave_skip_errors(const char* arg)
288
if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
290
fprintf(stderr, "Badly out of memory, please check your system status\n");
294
for (;my_isspace(system_charset_info,*arg);++arg)
296
if (!my_strnncoll(system_charset_info,(unsigned char*)arg,4,(const unsigned char*)"all",4))
298
bitmap_set_all(&slave_error_mask);
304
if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
306
if (err_code < MAX_SLAVE_ERROR)
307
bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
308
while (!my_isdigit(system_charset_info,*p) && *p)
315
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
318
return(0); /* successfully do nothing */
319
int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
320
pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
322
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
325
if ((error=terminate_slave_thread(mi->io_session,io_lock,
332
if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
334
mi->rli.abort_slave=1;
335
if ((error=terminate_slave_thread(mi->rli.sql_session,sql_lock,
337
&mi->rli.slave_running,
347
Wait for a slave thread to terminate.
349
This function is called after requesting the thread to terminate
350
(by setting @c abort_slave member of @c Relay_log_info or @c
351
Master_info structure to 1). Termination of the thread is
352
controlled with the the predicate <code>*slave_running</code>.
354
Function will acquire @c term_lock before waiting on the condition
355
unless @c skip_lock is true in which case the mutex should be owned
356
by the caller of this function and will remain acquired after
357
return from the function.
360
Associated lock to use when waiting for @c term_cond
363
Condition that is signalled when the thread has terminated
366
Pointer to predicate to check for slave thread termination
369
If @c true the lock will not be acquired before waiting on
370
the condition. In this case, it is assumed that the calling
371
function acquires the lock before calling this function.
376
terminate_slave_thread(Session *session,
377
pthread_mutex_t* term_lock,
378
pthread_cond_t* term_cond,
379
volatile uint32_t *slave_running,
385
pthread_mutex_lock(term_lock);
387
safe_mutex_assert_owner(term_lock);
392
pthread_mutex_unlock(term_lock);
393
return(ER_SLAVE_NOT_RUNNING);
395
assert(session != 0);
396
Session_CHECK_SENTRY(session);
399
Is is critical to test if the slave is running. Otherwise, we might
400
be referening freed memory trying to kick it
403
while (*slave_running) // Should always be true
405
pthread_mutex_lock(&session->LOCK_delete);
406
#ifndef DONT_USE_THR_ALARM
408
Error codes from pthread_kill are:
409
EINVAL: invalid signal number (can't happen)
410
ESRCH: thread already killed (can happen, should be ignored)
412
int32_t err= pthread_kill(session->real_id, thr_client_alarm);
413
assert(err != EINVAL);
415
session->awake(Session::NOT_KILLED);
416
pthread_mutex_unlock(&session->LOCK_delete);
419
There is a small chance that slave thread might miss the first
420
alarm. To protect againts it, resend the signal until it reacts
422
struct timespec abstime;
423
set_timespec(abstime,2);
424
error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
425
assert(error == ETIMEDOUT || error == 0);
428
assert(*slave_running == 0);
431
pthread_mutex_unlock(term_lock);
436
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
437
pthread_mutex_t *cond_lock,
438
pthread_cond_t *start_cond,
439
volatile uint32_t *slave_running,
440
volatile uint32_t *slave_run_id,
450
pthread_mutex_lock(start_lock);
454
pthread_cond_broadcast(start_cond);
456
pthread_mutex_unlock(start_lock);
457
sql_print_error(_("Server id not set, will not start slave"));
458
return(ER_BAD_SLAVE);
464
pthread_cond_broadcast(start_cond);
466
pthread_mutex_unlock(start_lock);
467
return(ER_SLAVE_MUST_STOP);
469
start_id= *slave_run_id;
472
struct sched_param tmp_sched_param;
474
memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
475
tmp_sched_param.sched_priority= CONNECT_PRIOR;
476
(void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
478
if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
481
pthread_mutex_unlock(start_lock);
482
return(ER_SLAVE_THREAD);
484
if (start_cond && cond_lock) // caller has cond_lock
486
Session* session = current_session;
487
while (start_id == *slave_run_id)
489
const char* old_msg = session->enter_cond(start_cond,cond_lock,
490
"Waiting for slave thread to start");
491
pthread_cond_wait(start_cond,cond_lock);
492
session->exit_cond(old_msg);
493
pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
495
return(session->killed_errno());
499
pthread_mutex_unlock(start_lock);
505
start_slave_threads()
508
SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
509
sense to do that for starting a slave--we always care if it actually
510
started the threads that were not previously running
513
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
515
const char* master_info_fname __attribute__((unused)),
516
const char* slave_info_fname __attribute__((unused)),
519
pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
520
pthread_cond_t* cond_io=0,*cond_sql=0;
523
if (need_slave_mutex)
525
lock_io = &mi->run_lock;
526
lock_sql = &mi->rli.run_lock;
530
cond_io = &mi->start_cond;
531
cond_sql = &mi->rli.start_cond;
532
lock_cond_io = &mi->run_lock;
533
lock_cond_sql = &mi->rli.run_lock;
536
if (thread_mask & SLAVE_IO)
537
error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
539
&mi->slave_running, &mi->slave_run_id,
540
mi, 1); //high priority, to read the most possible
541
if (!error && (thread_mask & SLAVE_SQL))
543
error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
545
&mi->rli.slave_running, &mi->rli.slave_run_id,
548
terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
555
static int32_t end_slave_on_walk(Master_info* mi, unsigned char* /*unused*/)
564
Free all resources used by slave
573
This is called when the server terminates, in close_connections().
574
It terminates slave threads. However, some CHANGE MASTER etc may still be
575
running presently. If a START SLAVE was in progress, the mutex lock below
576
will make us wait until slave threads have started, and START SLAVE
577
returns, then we terminate them here.
579
pthread_mutex_lock(&LOCK_active_mi);
583
TODO: replace the line below with
584
list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
585
once multi-master code is ready.
587
terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
588
active_mi->end_master_info();
592
pthread_mutex_unlock(&LOCK_active_mi);
597
static bool io_slave_killed(Session* session, Master_info* mi)
599
assert(mi->io_session == session);
600
assert(mi->slave_running); // tracking buffer overrun
601
return(mi->abort_slave || abort_loop || session->killed);
605
static bool sql_slave_killed(Session* session, Relay_log_info* rli)
607
assert(rli->sql_session == session);
608
assert(rli->slave_running == 1);// tracking buffer overrun
609
if (abort_loop || session->killed || rli->abort_slave)
612
If we are in an unsafe situation (stopping could corrupt replication),
613
we give one minute to the slave SQL thread of grace before really
614
terminating, in the hope that it will be able to read more events and
615
the unsafe situation will soon be left. Note that this one minute starts
616
from the last time anything happened in the slave SQL thread. So it's
617
really one minute of idleness, we don't timeout if the slave SQL thread
620
if (rli->last_event_start_time == 0)
622
if (difftime(time(0), rli->last_event_start_time) > 60)
624
rli->report(ERROR_LEVEL, 0,
625
_("SQL thread had to stop in an unsafe situation, in "
626
"the middle of applying updates to a "
627
"non-transactional table without any primary key. "
628
"There is a risk of duplicate updates when the slave "
629
"SQL thread is restarted. Please check your tables' "
630
"contents after restart."));
639
skip_load_data_infile()
642
This is used to tell a 3.23 master to break send_file()
645
void skip_load_data_infile(NET *net)
647
(void)net_request_file(net, "/dev/null");
648
(void)my_net_read(net); // discard response
649
(void)net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0); // ok
654
bool net_request_file(NET* net, const char* fname)
656
return(net_write_command(net, 251, (unsigned char*) fname, strlen(fname),
657
(unsigned char*) "", 0));
661
From other comments and tests in code, it looks like
662
sometimes Query_log_event and Load_log_event can have db == 0
663
(see rewrite_db() above for example)
664
(cases where this happens are unclear; it may be when the master is 3.23).
667
const char *print_slave_db_safe(const char* db)
669
return((db ? db : ""));
672
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
673
const char *default_val)
677
if ((length=my_b_gets(f,var, max_size)))
679
char* last_p = var + length -1;
681
*last_p = 0; // if we stopped on newline, kill it
685
If we truncated a line or stopped on last char, remove all chars
686
up to and including newline.
689
while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
693
else if (default_val)
695
strmake(var, default_val, max_size-1);
702
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
707
if (my_b_gets(f, buf, sizeof(buf)))
712
else if (default_val)
720
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
725
if (my_b_gets(f, buf, sizeof(buf)))
727
if (sscanf(buf, "%f", var) != 1)
732
else if (default_val != 0.0)
740
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info)
742
if (io_slave_killed(session, mi))
744
if (info && global_system_variables.log_warnings)
745
sql_print_information("%s",info);
753
Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
754
relying on the binlog's version. This is not perfect: imagine an upgrade
755
of the master without waiting that all slaves are in sync with the master;
756
then a slave could be fooled about the binlog's format. This is what happens
757
when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
758
slaves are fooled. So we do this only to distinguish between 3.23 and more
759
recent masters (it's too late to change things for 3.23).
766
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
769
String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
770
char err_buff[MAX_SLAVE_ERRMSG];
771
const char* errmsg= 0;
773
DRIZZLE_RES *master_res= 0;
774
DRIZZLE_ROW master_row;
778
Free old description_event_for_queue (that is needed if we are in
781
delete mi->rli.relay_log.description_event_for_queue;
782
mi->rli.relay_log.description_event_for_queue= 0;
784
if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
786
errmsg = _("Master reported unrecognized DRIZZLE version");
787
err_code= ER_SLAVE_FATAL_ERROR;
788
sprintf(err_buff, ER(err_code), errmsg);
789
err_msg.append(err_buff);
794
Note the following switch will bug when we have DRIZZLE branch 30 ;)
796
switch (*drizzle->server_version)
801
errmsg = _("Master reported unrecognized DRIZZLE version");
802
err_code= ER_SLAVE_FATAL_ERROR;
803
sprintf(err_buff, ER(err_code), errmsg);
804
err_msg.append(err_buff);
807
mi->rli.relay_log.description_event_for_queue= new
808
Format_description_log_event(1, drizzle->server_version);
811
mi->rli.relay_log.description_event_for_queue= new
812
Format_description_log_event(3, drizzle->server_version);
816
Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
817
take the early steps (like tests for "is this a 3.23 master") which we
818
have to take before we receive the real master's Format_desc which will
819
override this one. Note that the Format_desc we create below is garbage
820
(it has the format of the *slave*); it's only good to help know if the
821
master is 3.23, 4.0, etc.
823
mi->rli.relay_log.description_event_for_queue= new
824
Format_description_log_event(4, drizzle->server_version);
830
This does not mean that a 5.0 slave will be able to read a 6.0 master; but
831
as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
832
can't read a 6.0 master, this will show up when the slave can't read some
833
events sent by the master, and there will be error messages.
836
if (err_msg.length() != 0)
839
/* as we are here, we tried to allocate the event */
840
if (!mi->rli.relay_log.description_event_for_queue)
842
errmsg= _("default Format_description_log_event");
843
err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
844
sprintf(err_buff, ER(err_code), errmsg);
845
err_msg.append(err_buff);
850
Compare the master and slave's clock. Do not die if master's clock is
851
unavailable (very old master not supporting UNIX_TIMESTAMP()?).
854
if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
855
(master_res= drizzle_store_result(drizzle)) &&
856
(master_row= drizzle_fetch_row(master_res)))
858
mi->clock_diff_with_master=
859
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
861
else if (!check_io_slave_killed(mi->io_session, mi, NULL))
863
mi->clock_diff_with_master= 0; /* The "most sensible" value */
864
sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
865
"do not trust column Seconds_Behind_Master of SHOW "
866
"SLAVE STATUS. Error: %s (%d)"),
867
drizzle_error(drizzle), drizzle_errno(drizzle));
870
drizzle_free_result(master_res);
873
Check that the master's server id and ours are different. Because if they
874
are equal (which can result from a simple copy of master's datadir to slave,
875
thus copying some drizzle.cnf), replication will work but all events will be
877
Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
879
Note: we could have put a @@SERVER_ID in the previous SELECT
880
UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
882
if (!drizzle_real_query(drizzle,
883
STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
884
(master_res= drizzle_store_result(drizzle)))
886
if ((master_row= drizzle_fetch_row(master_res)) &&
887
(::server_id == strtoul(master_row[1], 0, 10)) &&
888
!mi->rli.replicate_same_server_id)
891
_("The slave I/O thread stops because master and slave have equal "
892
"DRIZZLE server ids; these ids must be different "
893
"for replication to work (or "
894
"the --replicate-same-server-id option must be used "
895
"on slave but this does"
896
"not always make sense; please check the manual before using it).");
897
err_code= ER_SLAVE_FATAL_ERROR;
898
sprintf(err_buff, ER(err_code), errmsg);
899
err_msg.append(err_buff);
901
drizzle_free_result(master_res);
907
Check that the master's global character_set_server and ours are the same.
908
Not fatal if query fails (old master?).
909
Note that we don't check for equality of global character_set_client and
910
collation_connection (neither do we prevent their setting in
911
set_var.cc). That's because from what I (Guilhem) have tested, the global
912
values of these 2 are never used (new connections don't use them).
913
We don't test equality of global collation_database either as it's is
914
going to be deprecated (made read-only) in 4.1 very soon.
915
The test is only relevant if master < 5.0.3 (we'll test only if it's older
916
than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
917
charset info in each binlog event.
918
We don't do it for 3.23 because masters <3.23.50 hang on
919
SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
920
test only if master is 4.x.
923
/* redundant with rest of code but safer against later additions */
924
if (*drizzle->server_version == '3')
927
if ((*drizzle->server_version == '4') &&
928
!drizzle_real_query(drizzle,
929
STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
930
(master_res= drizzle_store_result(drizzle)))
932
if ((master_row= drizzle_fetch_row(master_res)) &&
933
strcmp(master_row[0], global_system_variables.collation_server->name))
936
_("The slave I/O thread stops because master and slave have"
937
" different values for the COLLATION_SERVER global variable."
938
" The values must be equal for replication to work");
939
err_code= ER_SLAVE_FATAL_ERROR;
940
sprintf(err_buff, ER(err_code), errmsg);
941
err_msg.append(err_buff);
943
drizzle_free_result(master_res);
949
Perform analogous check for time zone. Theoretically we also should
950
perform check here to verify that SYSTEM time zones are the same on
951
slave and master, but we can't rely on value of @@system_time_zone
952
variable (it is time zone abbreviation) since it determined at start
953
time and so could differ for slave and master even if they are really
954
in the same system time zone. So we are omiting this check and just
955
relying on documentation. Also according to Monty there are many users
956
who are using replication between servers in various time zones. Hence
957
such check will broke everything for them. (And now everything will
958
work for them because by default both their master and slave will have
960
This check is only necessary for 4.x masters (and < 5.0.4 masters but
963
if ((*drizzle->server_version == '4') &&
964
!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
965
(master_res= drizzle_store_result(drizzle)))
967
if ((master_row= drizzle_fetch_row(master_res)) &&
968
strcmp(master_row[0],
969
global_system_variables.time_zone->get_name()->ptr()))
972
_("The slave I/O thread stops because master and slave have"
973
" different values for the TIME_ZONE global variable."
974
" The values must be equal for replication to work");
975
err_code= ER_SLAVE_FATAL_ERROR;
976
sprintf(err_buff, ER(err_code), errmsg);
977
err_msg.append(err_buff);
979
drizzle_free_result(master_res);
985
if (mi->heartbeat_period != 0.0)
988
const char query_format[]= "SET @master_heartbeat_period= %s";
989
char query[sizeof(query_format) - 2 + sizeof(llbuf)];
991
the period is an uint64_t of nano-secs.
993
llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
994
sprintf(query, query_format, llbuf);
996
if (drizzle_real_query(drizzle, query, strlen(query))
997
&& !check_io_slave_killed(mi->io_session, mi, NULL))
999
err_msg.append("The slave I/O thread stops because querying master with '");
1000
err_msg.append(query);
1001
err_msg.append("' failed;");
1002
err_msg.append(" error: ");
1003
err_code= drizzle_errno(drizzle);
1004
err_msg.qs_append(err_code);
1005
err_msg.append(" '");
1006
err_msg.append(drizzle_error(drizzle));
1007
err_msg.append("'");
1008
drizzle_free_result(drizzle_store_result(drizzle));
1011
drizzle_free_result(drizzle_store_result(drizzle));
1015
if (err_msg.length() != 0)
1017
sql_print_error("%s",err_msg.ptr());
1018
assert(err_code != 0);
1019
mi->report(ERROR_LEVEL, err_code, "%s",err_msg.ptr());
1027
static bool wait_for_relay_log_space(Relay_log_info* rli)
1029
bool slave_killed=0;
1030
Master_info* mi = rli->mi;
1031
const char *save_proc_info;
1032
Session* session = mi->io_session;
1034
pthread_mutex_lock(&rli->log_space_lock);
1035
save_proc_info= session->enter_cond(&rli->log_space_cond,
1036
&rli->log_space_lock,
1037
_("Waiting for the slave SQL thread "
1038
"to free enough relay log space"));
1039
while (rli->log_space_limit < rli->log_space_total &&
1040
!(slave_killed=io_slave_killed(session,mi)) &&
1041
!rli->ignore_log_space_limit)
1042
pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1043
session->exit_cond(save_proc_info);
1044
return(slave_killed);
1049
Builds a Rotate from the ignored events' info and writes it to relay log.
1052
write_ignored_events_info_to_relay_log()
1053
session pointer to I/O thread's session
1057
Slave I/O thread, going to die, must leave a durable trace of the
1058
ignored events' end position for the use of the slave SQL thread, by
1059
calling this function. Only that thread can call it (see assertion).
1061
static void write_ignored_events_info_to_relay_log(Session *session __attribute__((unused)),
1064
Relay_log_info *rli= &mi->rli;
1065
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1067
assert(session == mi->io_session);
1068
pthread_mutex_lock(log_lock);
1069
if (rli->ign_master_log_name_end[0])
1071
Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1072
0, rli->ign_master_log_pos_end,
1073
Rotate_log_event::DUP_NAME);
1074
rli->ign_master_log_name_end[0]= 0;
1075
/* can unlock before writing as slave SQL session will soon see our Rotate */
1076
pthread_mutex_unlock(log_lock);
1077
if (likely((bool)ev))
1079
ev->server_id= 0; // don't be ignored by slave SQL thread
1080
if (unlikely(rli->relay_log.append(ev)))
1081
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
1082
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
1083
_("failed to write a Rotate event"
1084
" to the relay log, SHOW SLAVE STATUS may be"
1086
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1088
sql_print_error(_("Failed to flush master info file"));
1092
mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
1093
ER(ER_SLAVE_CREATE_EVENT_FAILURE),
1094
_("Rotate_event (out of memory?),"
1095
" SHOW SLAVE STATUS may be inaccurate"));
1098
pthread_mutex_unlock(log_lock);
1103
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
1104
bool *suppress_warnings)
1106
unsigned char buf[1024], *pos= buf;
1107
uint32_t report_host_len, report_user_len=0, report_password_len=0;
1109
*suppress_warnings= false;
1112
report_host_len= strlen(report_host);
1114
report_user_len= strlen(report_user);
1115
if (report_password)
1116
report_password_len= strlen(report_password);
1117
/* 30 is a good safety margin */
1118
if (report_host_len + report_user_len + report_password_len + 30 >
1120
return(0); // safety
1122
int4store(pos, server_id); pos+= 4;
1123
pos= net_store_data(pos, (unsigned char*) report_host, report_host_len);
1124
pos= net_store_data(pos, (unsigned char*) report_user, report_user_len);
1125
pos= net_store_data(pos, (unsigned char*) report_password, report_password_len);
1126
int2store(pos, (uint16_t) report_port); pos+= 2;
1127
int4store(pos, 0); pos+= 4;
1128
/* The master will fill in master_id */
1129
int4store(pos, 0); pos+= 4;
1131
if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
1133
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1135
*suppress_warnings= true; // Suppress reconnect warning
1137
else if (!check_io_slave_killed(mi->io_session, mi, NULL))
1140
snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle),
1141
drizzle_errno(drizzle));
1142
mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1143
ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1151
bool show_master_info(Session* session, Master_info* mi)
1153
// TODO: fix this for multi-master
1154
List<Item> field_list;
1155
Protocol *protocol= session->protocol;
1157
field_list.push_back(new Item_empty_string("Slave_IO_State",
1159
field_list.push_back(new Item_empty_string("Master_Host",
1161
field_list.push_back(new Item_empty_string("Master_User",
1163
field_list.push_back(new Item_return_int("Master_Port", 7,
1164
DRIZZLE_TYPE_LONG));
1165
field_list.push_back(new Item_return_int("Connect_Retry", 10,
1166
DRIZZLE_TYPE_LONG));
1167
field_list.push_back(new Item_empty_string("Master_Log_File",
1169
field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
1170
DRIZZLE_TYPE_LONGLONG));
1171
field_list.push_back(new Item_empty_string("Relay_Log_File",
1173
field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
1174
DRIZZLE_TYPE_LONGLONG));
1175
field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
1177
field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
1178
field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1179
field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
1180
field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
1181
field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
1182
field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
1183
field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
1184
field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
1186
field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
1187
field_list.push_back(new Item_empty_string("Last_Error", 20));
1188
field_list.push_back(new Item_return_int("Skip_Counter", 10,
1189
DRIZZLE_TYPE_LONG));
1190
field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
1191
DRIZZLE_TYPE_LONGLONG));
1192
field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
1193
DRIZZLE_TYPE_LONGLONG));
1194
field_list.push_back(new Item_empty_string("Until_Condition", 6));
1195
field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1196
field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1197
DRIZZLE_TYPE_LONGLONG));
1198
field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1199
DRIZZLE_TYPE_LONGLONG));
1200
field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
1201
field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1202
field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
1203
field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
1205
if (protocol->send_fields(&field_list,
1206
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1211
String *packet= &session->packet;
1212
protocol->prepare_for_resend();
1215
slave_running can be accessed without run_lock but not other
1216
non-volotile members like mi->io_session, which is guarded by the mutex.
1218
pthread_mutex_lock(&mi->run_lock);
1219
protocol->store(mi->io_session ? mi->io_session->get_proc_info() : "", &my_charset_bin);
1220
pthread_mutex_unlock(&mi->run_lock);
1222
pthread_mutex_lock(&mi->data_lock);
1223
pthread_mutex_lock(&mi->rli.data_lock);
1224
protocol->store(mi->getHostname(), &my_charset_bin);
1225
protocol->store(mi->getUsername(), &my_charset_bin);
1226
protocol->store((uint32_t) mi->getPort());
1227
protocol->store(mi->getConnectionRetry());
1228
protocol->store(mi->getLogName(), &my_charset_bin);
1229
protocol->store((uint64_t) mi->getLogPosition());
1230
protocol->store(mi->rli.group_relay_log_name.c_str() +
1231
dirname_length(mi->rli.group_relay_log_name.c_str()),
1233
protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1234
protocol->store(mi->rli.group_master_log_name.c_str(), &my_charset_bin);
1235
protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1236
"Yes" : "No", &my_charset_bin);
1237
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1238
protocol->store(rpl_filter->get_do_db());
1239
protocol->store(rpl_filter->get_ignore_db());
1242
String tmp(buf, sizeof(buf), &my_charset_bin);
1243
rpl_filter->get_do_table(&tmp);
1244
protocol->store(&tmp);
1245
rpl_filter->get_ignore_table(&tmp);
1246
protocol->store(&tmp);
1247
rpl_filter->get_wild_do_table(&tmp);
1248
protocol->store(&tmp);
1249
rpl_filter->get_wild_ignore_table(&tmp);
1250
protocol->store(&tmp);
1252
protocol->store(mi->rli.last_error().number);
1253
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1254
protocol->store((uint32_t) mi->rli.slave_skip_counter);
1255
protocol->store((uint64_t) mi->rli.group_master_log_pos);
1256
protocol->store((uint64_t) mi->rli.log_space_total);
1259
mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
1260
( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
1261
"Relay"), &my_charset_bin);
1262
protocol->store(mi->rli.until_log_name, &my_charset_bin);
1263
protocol->store((uint64_t) mi->rli.until_log_pos);
1266
Seconds_Behind_Master: if SQL thread is running and I/O thread is
1267
connected, we can compute it otherwise show NULL (i.e. unknown).
1269
if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
1270
mi->rli.slave_running)
1272
long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
1273
- mi->clock_diff_with_master);
1275
Apparently on some systems time_diff can be <0. Here are possible
1276
reasons related to MySQL:
1277
- the master is itself a slave of another master whose time is ahead.
1278
- somebody used an explicit SET TIMESTAMP on the master.
1279
Possible reason related to granularity-to-second of time functions
1280
(nothing to do with MySQL), which can explain a value of -1:
1281
assume the master's and slave's time are perfectly synchronized, and
1282
that at slave's connection time, when the master's timestamp is read,
1283
it is at the very end of second 1, and (a very short time later) when
1284
the slave's timestamp is read it is at the very beginning of second
1285
2. Then the recorded value for master is 1 and the recorded value for
1286
slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1287
between timestamp of slave and rli->last_master_timestamp is 0
1288
(i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1289
This confuses users, so we don't go below 0: hence the cmax().
1291
last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1292
special marker to say "consider we have caught up".
1294
protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1295
cmax((long)0, time_diff) : 0));
1299
protocol->store_null();
1303
protocol->store(mi->last_error().number);
1305
protocol->store(mi->last_error().message, &my_charset_bin);
1307
protocol->store(mi->rli.last_error().number);
1309
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1311
pthread_mutex_unlock(&mi->rli.data_lock);
1312
pthread_mutex_unlock(&mi->data_lock);
1314
if (my_net_write(&session->net, (unsigned char*) session->packet.ptr(), packet->length()))
1322
void set_slave_thread_options(Session* session)
1325
It's nonsense to constrain the slave threads with max_join_size; if a
1326
query succeeded on master, we HAVE to execute it. So set
1327
OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
1328
(and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
1329
SELECT examining more than 4 billion rows would still fail (yes, because
1330
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1331
only for client threads.
1333
uint64_t options= session->options | OPTION_BIG_SELECTS;
1334
if (opt_log_slave_updates)
1335
options|= OPTION_BIN_LOG;
1337
options&= ~OPTION_BIN_LOG;
1338
session->options= options;
1339
session->variables.completion_type= 0;
1347
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type)
1349
int32_t simulate_error= 0;
1350
session->system_thread = (session_type == SLAVE_Session_SQL) ?
1351
SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1352
session->security_ctx->skip_grants();
1353
my_net_init(&session->net, 0);
1355
Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1356
slave threads, since a replication event can become this much larger
1357
than the corresponding packet (query) sent from client to master.
1359
session->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1360
+ MAX_LOG_EVENT_HEADER; /* note, incr over the global not session var */
1361
session->slave_thread = 1;
1362
set_slave_thread_options(session);
1363
session->client_capabilities = CLIENT_LOCAL_FILES;
1364
pthread_mutex_lock(&LOCK_thread_count);
1365
session->thread_id= session->variables.pseudo_thread_id= thread_id++;
1366
pthread_mutex_unlock(&LOCK_thread_count);
1368
simulate_error|= (1 << SLAVE_Session_IO);
1369
simulate_error|= (1 << SLAVE_Session_SQL);
1370
if (init_thr_lock() || session->store_globals() || simulate_error & (1<< session_type))
1377
if (session_type == SLAVE_Session_SQL)
1378
session->set_proc_info("Waiting for the next event in relay log");
1380
session->set_proc_info("Waiting for master update");
1381
session->version=refresh_version;
1382
session->set_time();
1387
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1388
void* thread_killed_arg)
1391
thr_alarm_t alarmed;
1393
thr_alarm_init(&alarmed);
1394
time_t start_time= my_time(0);
1395
time_t end_time= start_time+sec;
1397
while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1401
The only reason we are asking for alarm is so that
1402
we will be woken up in case of murder, so if we do not get killed,
1403
set the alarm so it goes off after we wake up naturally
1405
thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
1407
thr_end_alarm(&alarmed);
1409
if ((*thread_killed)(session,thread_killed_arg))
1411
start_time= my_time(0);
1417
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
1418
bool *suppress_warnings)
1420
unsigned char buf[FN_REFLEN + 10];
1422
int32_t binlog_flags = 0; // for now
1423
const char* logname = mi->getLogName();
1425
*suppress_warnings= false;
1427
// TODO if big log files: Change next to int8store()
1428
int4store(buf, (uint32_t) mi->getLogPosition());
1429
int2store(buf + 4, binlog_flags);
1430
int4store(buf + 6, server_id);
1431
len = (uint32_t) strlen(logname);
1432
memcpy(buf + 10, logname,len);
1433
if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
1436
Something went wrong, so we will just reconnect and retry later
1437
in the future, we should do a better error analysis, but for
1438
now we just fill up the error log :-)
1440
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1441
*suppress_warnings= true; // Suppress reconnect warning
1443
sql_print_error(_("Error on COM_BINLOG_DUMP: %d %s, will retry in %d secs"),
1444
drizzle_errno(drizzle), drizzle_error(drizzle),
1453
Read one event from the master
1457
DRIZZLE DRIZZLE connection
1458
mi Master connection information
1459
suppress_warnings TRUE when a normal net read timeout has caused us to
1460
try a reconnect. We do not want to print anything to
1461
the error log in this case because this a anormal
1462
event in an idle server.
1465
'packet_error' Error
1466
number Length of packet
1469
static uint32_t read_event(DRIZZLE *drizzle,
1470
Master_info *mi __attribute__((unused)),
1471
bool* suppress_warnings)
1475
*suppress_warnings= false;
1477
my_real_read() will time us out
1478
We check if we were told to die, and if not, try reading again
1480
if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
1481
return(packet_error);
1483
len = cli_safe_read(drizzle);
1484
if (len == packet_error || (int32_t) len < 1)
1486
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1489
We are trying a normal reconnect after a read timeout;
1490
we suppress prints to .err file as long as the reconnect
1491
happens without problems
1493
*suppress_warnings= true;
1496
sql_print_error(_("Error reading packet from server: %s ( server_errno=%d)"),
1497
drizzle_error(drizzle), drizzle_errno(drizzle));
1498
return(packet_error);
1501
/* Check if eof packet */
1502
if (len < 8 && drizzle->net.read_pos[0] == 254)
1504
sql_print_information(_("Slave: received end packet from server, apparent "
1505
"master shutdown: %s"),
1506
drizzle_error(drizzle));
1507
return(packet_error);
1514
int32_t check_expected_error(Session* session __attribute__((unused)),
1515
Relay_log_info const *rli __attribute__((unused)),
1516
int32_t expected_error)
1518
switch (expected_error) {
1519
case ER_NET_READ_ERROR:
1520
case ER_NET_ERROR_ON_WRITE:
1521
case ER_QUERY_INTERRUPTED:
1522
case ER_SERVER_SHUTDOWN:
1523
case ER_NEW_ABORTING_CONNECTION:
1532
Check if the current error is of temporary nature of not.
1533
Some errors are temporary in nature, such as
1534
ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. Ndb also signals
1535
that the error is temporary by pushing a warning with the error code
1536
ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1538
static int32_t has_temporary_error(Session *session)
1540
if (session->is_fatal_error)
1543
if (session->main_da.is_error())
1545
session->clear_error();
1546
my_error(ER_LOCK_DEADLOCK, MYF(0));
1550
If there is no message in Session, we can't say if it's a temporary
1551
error or not. This is currently the case for Incident_log_event,
1552
which sets no message. Return FALSE.
1554
if (!session->is_error())
1558
Temporary error codes:
1559
currently, InnoDB deadlock detected by InnoDB or lock
1560
wait timeout (innodb_lock_wait_timeout exceeded
1562
if (session->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
1563
session->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
1571
Applies the given event and advances the relay log position.
1573
In essence, this function does:
1576
ev->apply_event(rli);
1577
ev->update_pos(rli);
1580
But it also does some maintainance, such as skipping events if
1581
needed and reporting errors.
1583
If the @c skip flag is set, then it is tested whether the event
1584
should be skipped, by looking at the slave_skip_counter and the
1585
server id. The skip flag should be set when calling this from a
1586
replication thread but not set when executing an explicit BINLOG
1591
@retval 1 Error calling ev->apply_event().
1593
@retval 2 No error calling ev->apply_event(), but error calling
1596
int32_t apply_event_and_update_pos(Log_event* ev, Session* session, Relay_log_info* rli,
1599
int32_t exec_res= 0;
1602
Execute the event to change the database and update the binary
1603
log coordinates, but first we set some data that is needed for
1606
The event will be executed unless it is supposed to be skipped.
1608
Queries originating from this server must be skipped. Low-level
1609
events (Format_description_log_event, Rotate_log_event,
1610
Stop_log_event) from this server must also be skipped. But for
1611
those we don't want to modify 'group_master_log_pos', because
1612
these events did not exist on the master.
1613
Format_description_log_event is not completely skipped.
1615
Skip queries specified by the user in 'slave_skip_counter'. We
1616
can't however skip events that has something to do with the log
1619
Filtering on own server id is extremely important, to ignore
1620
execution of events created by the creation/rotation of the relay
1621
log (remember that now the relay log starts with its Format_desc,
1625
session->server_id = ev->server_id; // use the original server id for logging
1626
session->set_time(); // time the query
1627
session->lex->current_select= 0;
1629
ev->when= my_time(0);
1630
ev->session = session; // because up to this point, ev->session == 0
1634
int32_t reason= ev->shall_skip(rli);
1635
if (reason == Log_event::EVENT_SKIP_COUNT)
1636
--rli->slave_skip_counter;
1637
pthread_mutex_unlock(&rli->data_lock);
1638
if (reason == Log_event::EVENT_SKIP_NOT)
1639
exec_res= ev->apply_event(rli);
1642
exec_res= ev->apply_event(rli);
1646
int32_t error= ev->update_pos(rli);
1648
The update should not fail, so print an error message and
1649
return an error code.
1651
TODO: Replace this with a decent error message when merged
1652
with BUG#24954 (which adds several new error message).
1657
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
1658
_("It was not possible to update the positions"
1659
" of the relay log information: the slave may"
1660
" be in an inconsistent state."
1661
" Stopped in %s position %s"),
1662
rli->group_relay_log_name.c_str(),
1663
llstr(rli->group_relay_log_pos, buf));
1668
return(exec_res ? 1 : 0);
1673
Top-level function for executing the next event from the relay log.
1675
This function reads the event from the relay log, executes it, and
1676
advances the relay log position. It also handles errors, etc.
1678
This function may fail to apply the event for the following reasons:
1680
- The position specfied by the UNTIL condition of the START SLAVE
1683
- It was not possible to read the event from the log.
1685
- The slave is killed.
1687
- An error occurred when applying the event, and the event has been
1688
tried slave_trans_retries times. If the event has been retried
1689
fewer times, 0 is returned.
1691
- init_master_info or init_relay_log_pos failed. (These are called
1692
if a failure occurs when applying the event.)</li>
1694
- An error occurred when updating the binlog position.
1696
@retval 0 The event was applied.
1698
@retval 1 The event was not applied.
1700
static int32_t exec_relay_log_event(Session* session, Relay_log_info* rli)
1703
We acquire this mutex since we need it for all operations except
1704
event execution. But we will release it in places where we will
1705
wait for something for example inside of next_event().
1707
pthread_mutex_lock(&rli->data_lock);
1709
Log_event * ev = next_event(rli);
1711
assert(rli->sql_session==session);
1713
if (sql_slave_killed(session,rli))
1715
pthread_mutex_unlock(&rli->data_lock);
1724
This tests if the position of the beginning of the current event
1725
hits the UNTIL barrier.
1727
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
1728
rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
1729
rli->group_master_log_pos :
1730
ev->log_pos - ev->data_written))
1733
sql_print_information(_("Slave SQL thread stopped because it reached its"
1734
" UNTIL position %s"),
1735
llstr(rli->until_pos(), buf));
1737
Setting abort_slave flag because we do not want additional message about
1738
error in query execution to be printed.
1740
rli->abort_slave= 1;
1741
pthread_mutex_unlock(&rli->data_lock);
1745
exec_res= apply_event_and_update_pos(ev, session, rli, true);
1748
Format_description_log_event should not be deleted because it will be
1749
used to read info about the relay log's format; it will be deleted when
1750
the SQL thread does not need it, i.e. when this thread terminates.
1752
if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
1758
update_log_pos failed: this should not happen, so we don't
1764
if (slave_trans_retries)
1766
int32_t temp_err= 0;
1767
if (exec_res && (temp_err= has_temporary_error(session)))
1771
We were in a transaction which has been rolled back because of a
1773
let's seek back to BEGIN log event and retry it all again.
1774
Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
1775
there is no rollback since 5.0.13 (ref: manual).
1776
We have to not only seek but also
1777
a) init_master_info(), to seek back to hot relay log's start for later
1778
(for when we will come back to this hot log after re-processing the
1779
possibly existing old logs where BEGIN is: check_binlog_magic() will
1780
then need the cache to be at position 0 (see comments at beginning of
1781
init_master_info()).
1782
b) init_relay_log_pos(), because the BEGIN may be an older relay log.
1784
if (rli->trans_retries < slave_trans_retries)
1786
if (rli->mi->init_master_info(0, 0, SLAVE_SQL))
1787
sql_print_error(_("Failed to initialize the master info structure"));
1788
else if (init_relay_log_pos(rli,
1789
rli->group_relay_log_name.c_str(),
1790
rli->group_relay_log_pos,
1792
sql_print_error(_("Error initializing relay log position: %s"),
1797
end_trans(session, ROLLBACK);
1798
/* chance for concurrent connection to get more locks */
1799
safe_sleep(session, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1800
(CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1801
pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1802
rli->trans_retries++;
1803
rli->retried_trans++;
1804
pthread_mutex_unlock(&rli->data_lock);
1808
sql_print_error(_("Slave SQL thread retried transaction %lu time(s) "
1809
"in vain, giving up. Consider raising the value of "
1810
"the slave_transaction_retries variable."),
1811
slave_trans_retries);
1813
else if ((exec_res && !temp_err) ||
1814
(opt_using_transactions &&
1815
rli->group_relay_log_pos == rli->event_relay_log_pos))
1818
Only reset the retry counter if the entire group succeeded
1819
or failed with a non-transient error. On a successful
1820
event, the execution will proceed as usual; in the case of a
1821
non-transient error, the slave will stop with an error.
1823
rli->trans_retries= 0; // restart from fresh
1828
pthread_mutex_unlock(&rli->data_lock);
1829
rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
1830
ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
1831
_("Could not parse relay log event entry. The possible reasons "
1832
"are: the master's binary log is corrupted (you can check this "
1833
"by running 'mysqlbinlog' on the binary log), the slave's "
1834
"relay log is corrupted (you can check this by running "
1835
"'mysqlbinlog' on the relay log), a network problem, or a bug "
1836
"in the master's or slave's DRIZZLE code. If you want to check "
1837
"the master's binary log or slave's relay log, you will be "
1838
"able to know their names by issuing 'SHOW SLAVE STATUS' "
1845
@brief Try to reconnect slave IO thread.
1847
@details Terminates current connection to master, sleeps for
1848
@c mi->connect_retry msecs and initiates new connection with
1849
@c safe_reconnect(). Variable pointed by @c retry_count is increased -
1850
if it exceeds @c master_retry_count then connection is not re-established
1851
and function signals error.
1852
Unless @c suppres_warnings is TRUE, a warning is put in the server error log
1853
when reconnecting. The warning message and messages used to report errors
1854
are taken from @c messages array. In case @c master_retry_count is exceeded,
1855
no messages are added to the log.
1857
@param[in] session Thread context.
1858
@param[in] DRIZZLE DRIZZLE connection.
1859
@param[in] mi Master connection information.
1860
@param[in,out] retry_count Number of attempts to reconnect.
1861
@param[in] suppress_warnings TRUE when a normal net read timeout
1862
has caused to reconnecting.
1863
@param[in] messages Messages to print/log, see
1864
reconnect_messages[] array.
1867
@retval 1 There was an error.
1870
static int32_t try_to_reconnect(Session *session, DRIZZLE *drizzle, Master_info *mi,
1871
uint32_t *retry_count, bool suppress_warnings,
1872
const char *messages[SLAVE_RECON_MSG_MAX])
1874
mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1875
session->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
1876
drizzle_disconnect(drizzle);
1877
if ((*retry_count)++)
1879
if (*retry_count > master_retry_count)
1880
return 1; // Don't retry forever
1881
safe_sleep(session, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1884
if (check_io_slave_killed(session, mi,
1885
_(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1887
session->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
1888
if (!suppress_warnings)
1890
char buf[256], llbuff[22];
1891
snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1892
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
1894
Raise a warining during registering on master/requesting dump.
1895
Log a message reading event.
1897
if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
1899
mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1900
ER(ER_SLAVE_MASTER_COM_FAILURE),
1901
_(messages[SLAVE_RECON_MSG_COMMAND]), buf);
1905
sql_print_information("%s",buf);
1908
if (safe_reconnect(session, drizzle, mi, 1) || io_slave_killed(session, mi))
1910
if (global_system_variables.log_warnings)
1911
sql_print_information("%s",_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1918
/* Slave I/O Thread entry point */
1920
pthread_handler_t handle_slave_io(void *arg)
1922
Session *session; // needs to be first for thread_stack
1924
Master_info *mi = (Master_info*)arg;
1925
Relay_log_info *rli= &mi->rli;
1927
uint32_t retry_count;
1928
bool suppress_warnings;
1929
uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
1936
pthread_mutex_lock(&mi->run_lock);
1937
/* Inform waiting threads that slave has started */
1940
mi->events_till_disconnect = disconnect_slave_event_count;
1942
session= new Session;
1943
Session_CHECK_SENTRY(session);
1944
mi->io_session = session;
1946
pthread_detach_this_thread();
1947
session->thread_stack= (char*) &session; // remember where our stack is
1948
if (init_slave_thread(session, SLAVE_Session_IO))
1950
pthread_cond_broadcast(&mi->start_cond);
1951
pthread_mutex_unlock(&mi->run_lock);
1952
sql_print_error(_("Failed during slave I/O thread initialization"));
1955
pthread_mutex_lock(&LOCK_thread_count);
1956
threads.append(session);
1957
pthread_mutex_unlock(&LOCK_thread_count);
1958
mi->slave_running = 1;
1959
mi->abort_slave = 0;
1960
pthread_mutex_unlock(&mi->run_lock);
1961
pthread_cond_broadcast(&mi->start_cond);
1963
if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
1965
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
1966
ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
1970
session->set_proc_info("Connecting to master");
1971
// we can get killed during safe_connect
1972
if (!safe_connect(session, drizzle, mi))
1974
sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
1975
"replication started in log '%s' at position %s"),
1976
mi->getUsername(), mi->getHostname(), mi->getPort(),
1978
llstr(mi->getLogPosition(), llbuff));
1980
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
1981
thread, since a replication event can become this much larger than
1982
the corresponding packet (query) sent from client to master.
1984
drizzle->net.max_packet_size= session->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
1988
sql_print_information(_("Slave I/O thread killed while connecting to master"));
1994
// TODO: the assignment below should be under mutex (5.0)
1995
mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
1996
session->slave_net = &drizzle->net;
1997
session->set_proc_info("Checking master version");
1998
if (get_master_version_and_clock(drizzle, mi))
2001
if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
2004
Register ourselves with the master.
2006
session->set_proc_info("Registering slave on master");
2007
if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2009
if (!check_io_slave_killed(session, mi, "Slave I/O thread killed "
2010
"while registering slave on master"))
2012
sql_print_error(_("Slave I/O thread couldn't register on master"));
2013
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2014
reconnect_messages[SLAVE_RECON_ACT_REG]))
2021
if (!retry_count_reg)
2024
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2025
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2026
reconnect_messages[SLAVE_RECON_ACT_REG]))
2032
while (!io_slave_killed(session,mi))
2034
session->set_proc_info("Requesting binlog dump");
2035
if (request_dump(drizzle, mi, &suppress_warnings))
2037
sql_print_error(_("Failed on request_dump()"));
2038
if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while \
2039
requesting master dump")) ||
2040
try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2041
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2045
if (!retry_count_dump)
2048
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2049
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2050
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2055
while (!io_slave_killed(session,mi))
2059
We say "waiting" because read_event() will wait if there's nothing to
2060
read. But if there's something to read, it will not wait. The
2061
important thing is to not confuse users by saying "reading" whereas
2062
we're in fact receiving nothing.
2064
session->set_proc_info(_("Waiting for master to send event"));
2065
event_len= read_event(drizzle, mi, &suppress_warnings);
2066
if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while "
2069
if (!retry_count_event)
2071
retry_count_event++;
2072
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2073
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2074
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2079
if (event_len == packet_error)
2081
uint32_t drizzle_error_number= drizzle_errno(drizzle);
2082
switch (drizzle_error_number) {
2083
case CR_NET_PACKET_TOO_LARGE:
2084
sql_print_error(_("Log entry on master is longer than "
2085
"max_allowed_packet (%ld) on "
2086
"slave. If the entry is correct, restart the "
2087
"server with a higher value of "
2088
"max_allowed_packet"),
2089
session->variables.max_allowed_packet);
2091
case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2092
sql_print_error(ER(drizzle_error_number), drizzle_error_number,
2093
drizzle_error(drizzle));
2095
case EE_OUTOFMEMORY:
2096
case ER_OUTOFMEMORY:
2098
_("Stopping slave I/O thread due to out-of-memory error from master"));
2101
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2102
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2105
} // if (event_len == packet_error)
2107
retry_count=0; // ok event, reset retry counter
2108
session->set_proc_info(_("Queueing master event to the relay log"));
2109
if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2115
sql_print_error(_("Failed to flush master info file"));
2119
See if the relay logs take too much space.
2120
We don't lock mi->rli.log_space_lock here; this dirty read saves time
2121
and does not introduce any problem:
2122
- if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
2123
the clean value is 0), then we are reading only one more event as we
2124
should, and we'll block only at the next event. No big deal.
2125
- if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
2126
the clean value is 1), then we are going into wait_for_relay_log_space()
2127
for no reason, but this function will do a clean read, notice the clean
2128
value and exit immediately.
2130
if (rli->log_space_limit && rli->log_space_limit <
2131
rli->log_space_total &&
2132
!rli->ignore_log_space_limit)
2133
if (wait_for_relay_log_space(rli))
2135
sql_print_error(_("Slave I/O thread aborted while waiting for "
2136
"relay log space"));
2144
// print the current replication position
2145
sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2147
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
2148
pthread_mutex_lock(&LOCK_thread_count);
2149
session->query = session->db = 0; // extra safety
2150
session->query_length= session->db_length= 0;
2151
pthread_mutex_unlock(&LOCK_thread_count);
2155
Here we need to clear the active VIO before closing the
2156
connection with the master. The reason is that Session::awake()
2157
might be called from terminate_slave_thread() because somebody
2158
issued a STOP SLAVE. If that happends, the close_active_vio()
2159
can be called in the middle of closing the VIO associated with
2160
the 'mysql' object, causing a crash.
2162
drizzle_close(drizzle);
2165
write_ignored_events_info_to_relay_log(session, mi);
2166
session->set_proc_info(_("Waiting for slave mutex on exit"));
2167
pthread_mutex_lock(&mi->run_lock);
2169
/* Forget the relay log's format */
2170
delete mi->rli.relay_log.description_event_for_queue;
2171
mi->rli.relay_log.description_event_for_queue= 0;
2172
assert(session->net.buff != 0);
2173
net_end(&session->net); // destructor will not free it, because net.vio is 0
2174
close_thread_tables(session);
2175
pthread_mutex_lock(&LOCK_thread_count);
2176
Session_CHECK_SENTRY(session);
2178
pthread_mutex_unlock(&LOCK_thread_count);
2180
mi->slave_running= 0;
2183
Note: the order of the two following calls (first broadcast, then unlock)
2184
is important. Otherwise a killer_thread can execute between the calls and
2185
delete the mi structure leading to a crash! (see BUG#25306 for details)
2187
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
2188
pthread_mutex_unlock(&mi->run_lock);
2191
return(0); // Can't return anything here
2195
/* Slave SQL Thread entry point */
2197
pthread_handler_t handle_slave_sql(void *arg)
2199
Session *session; /* needs to be first for thread_stack */
2200
char llbuff[22],llbuff1[22];
2202
Relay_log_info* rli = &((Master_info*)arg)->rli;
2207
assert(rli->inited);
2208
pthread_mutex_lock(&rli->run_lock);
2209
assert(!rli->slave_running);
2211
rli->events_till_abort = abort_slave_event_count;
2213
session = new Session;
2214
session->thread_stack = (char*)&session; // remember where our stack is
2215
rli->sql_session= session;
2217
/* Inform waiting threads that slave has started */
2218
rli->slave_run_id++;
2219
rli->slave_running = 1;
2221
pthread_detach_this_thread();
2222
if (init_slave_thread(session, SLAVE_Session_SQL))
2225
TODO: this is currently broken - slave start and change master
2226
will be stuck if we fail here
2228
pthread_cond_broadcast(&rli->start_cond);
2229
pthread_mutex_unlock(&rli->run_lock);
2230
sql_print_error(_("Failed during slave thread initialization"));
2233
session->init_for_queries();
2234
session->temporary_tables = rli->save_temporary_tables; // restore temp tables
2235
pthread_mutex_lock(&LOCK_thread_count);
2236
threads.append(session);
2237
pthread_mutex_unlock(&LOCK_thread_count);
2239
We are going to set slave_running to 1. Assuming slave I/O thread is
2240
alive and connected, this is going to make Seconds_Behind_Master be 0
2241
i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
2242
the moment we start we can think we are caught up, and the next second we
2243
start receiving data so we realize we are not caught up and
2244
Seconds_Behind_Master grows. No big deal.
2246
rli->abort_slave = 0;
2247
pthread_mutex_unlock(&rli->run_lock);
2248
pthread_cond_broadcast(&rli->start_cond);
2251
Reset errors for a clean start (otherwise, if the master is idle, the SQL
2252
thread may execute no Query_log_event, so the error will remain even
2253
though there's no problem anymore). Do not reset the master timestamp
2254
(imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
2255
as we are not sure that we are going to receive a query, we want to
2256
remember the last master timestamp (to say how many seconds behind we are
2258
But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
2262
//tell the I/O thread to take relay_log_space_limit into account from now on
2263
pthread_mutex_lock(&rli->log_space_lock);
2264
rli->ignore_log_space_limit= 0;
2265
pthread_mutex_unlock(&rli->log_space_lock);
2266
rli->trans_retries= 0; // start from "no error"
2268
if (init_relay_log_pos(rli,
2269
rli->group_relay_log_name.c_str(),
2270
rli->group_relay_log_pos,
2271
1 /*need data lock*/, &errmsg,
2272
1 /*look for a description_event*/))
2274
sql_print_error(_("Error initializing relay log position: %s"),
2278
Session_CHECK_SENTRY(session);
2279
assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2281
Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2282
correct position when it's called just after my_b_seek() (the questionable
2283
stuff is those "seek is done on next read" comments in the my_b_seek()
2285
The crude reality is that this assertion randomly fails whereas
2286
replication seems to work fine. And there is no easy explanation why it
2287
fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
2288
init_relay_log_pos() called above). Maybe the assertion would be
2289
meaningful if we held rli->data_lock between the my_b_seek() and the
2292
assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2293
assert(rli->sql_session == session);
2295
if (global_system_variables.log_warnings)
2296
sql_print_information(_("Slave SQL thread initialized, "
2297
"starting replication in log '%s' at "
2298
"position %s, relay log '%s' position: %s"),
2300
llstr(rli->group_master_log_pos,llbuff),
2301
rli->group_relay_log_name.c_str(),
2302
llstr(rli->group_relay_log_pos,llbuff1));
2304
/* execute init_slave variable */
2305
if (sys_init_slave.value_length)
2307
execute_init_command(session, &sys_init_slave, &LOCK_sys_init_slave);
2308
if (session->is_slave_error)
2310
sql_print_error(_("Slave SQL thread aborted. "
2311
"Can't execute init_slave query"));
2317
First check until condition - probably there is nothing to execute. We
2318
do not want to wait for next event in this case.
2320
pthread_mutex_lock(&rli->data_lock);
2321
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
2322
rli->is_until_satisfied(rli->group_master_log_pos))
2325
sql_print_information(_("Slave SQL thread stopped because it reached its"
2326
" UNTIL position %s"), llstr(rli->until_pos(), buf));
2327
pthread_mutex_unlock(&rli->data_lock);
2330
pthread_mutex_unlock(&rli->data_lock);
2332
/* Read queries from the IO/THREAD until this thread is killed */
2334
while (!sql_slave_killed(session,rli))
2336
session->set_proc_info(_("Reading event from the relay log"));
2337
assert(rli->sql_session == session);
2338
Session_CHECK_SENTRY(session);
2339
if (exec_relay_log_event(session,rli))
2341
// do not scare the user if SQL thread was simply killed or stopped
2342
if (!sql_slave_killed(session,rli))
2345
retrieve as much info as possible from the session and, error
2346
codes and warnings and print this to the error log as to
2347
allow the user to locate the error
2349
uint32_t const last_errno= rli->last_error().number;
2351
if (session->is_error())
2353
char const *const errmsg= session->main_da.message();
2355
if (last_errno == 0)
2357
rli->report(ERROR_LEVEL, session->main_da.sql_errno(), "%s", errmsg);
2359
else if (last_errno != session->main_da.sql_errno())
2361
sql_print_error(_("Slave (additional info): %s Error_code: %d"),
2362
errmsg, session->main_da.sql_errno());
2366
/* Print any warnings issued */
2367
List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
2370
Added controlled slave thread cancel for replication
2371
of user-defined variables.
2373
bool udf_error = false;
2376
if (err->code == ER_CANT_OPEN_LIBRARY)
2378
sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
2381
sql_print_error(_("Error loading user-defined library, slave SQL "
2382
"thread aborted. Install the missing library, "
2383
"and restart the slave SQL thread with "
2384
"\"SLAVE START\". We stopped at log '%s' "
2386
RPL_LOG_NAME, llstr(rli->group_master_log_pos,
2389
sql_print_error(_("Error running query, slave SQL thread aborted. "
2390
"Fix the problem, and restart "
2391
"the slave SQL thread with \"SLAVE START\". "
2392
"We stopped at log '%s' position %s"),
2394
llstr(rli->group_master_log_pos, llbuff));
2400
/* Thread stopped. Print the current replication position to the log */
2401
sql_print_information(_("Slave SQL thread exiting, replication stopped in "
2402
"log '%s' at position %s"),
2404
llstr(rli->group_master_log_pos,llbuff));
2409
Some events set some playgrounds, which won't be cleared because thread
2410
stops. Stopping of this thread may not be known to these events ("stop"
2411
request is detected only by the present function, not by events), so we
2412
must "proactively" clear playgrounds:
2414
rli->cleanup_context(session, 1);
2415
pthread_mutex_lock(&LOCK_thread_count);
2417
Some extra safety, which should not been needed (normally, event deletion
2418
should already have done these assignments (each event which sets these
2419
variables is supposed to set them to 0 before terminating)).
2421
session->query= session->db= session->catalog= 0;
2422
session->query_length= session->db_length= 0;
2423
pthread_mutex_unlock(&LOCK_thread_count);
2424
session->set_proc_info("Waiting for slave mutex on exit");
2425
pthread_mutex_lock(&rli->run_lock);
2426
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
2427
pthread_mutex_lock(&rli->data_lock);
2428
assert(rli->slave_running == 1); // tracking buffer overrun
2429
/* When master_pos_wait() wakes up it will check this and terminate */
2430
rli->slave_running= 0;
2431
/* Forget the relay log's format */
2432
delete rli->relay_log.description_event_for_exec;
2433
rli->relay_log.description_event_for_exec= 0;
2434
/* Wake up master_pos_wait() */
2435
pthread_mutex_unlock(&rli->data_lock);
2436
pthread_cond_broadcast(&rli->data_cond);
2437
rli->ignore_log_space_limit= 0; /* don't need any lock */
2438
rli->save_temporary_tables = session->temporary_tables;
2441
TODO: see if we can do this conditionally in next_event() instead
2442
to avoid unneeded position re-init
2444
session->temporary_tables = 0; // remove tempation from destructor to close them
2445
assert(session->net.buff != 0);
2446
net_end(&session->net); // destructor will not free it, because we are weird
2447
assert(rli->sql_session == session);
2448
Session_CHECK_SENTRY(session);
2449
rli->sql_session= 0;
2450
pthread_mutex_lock(&LOCK_thread_count);
2451
Session_CHECK_SENTRY(session);
2453
pthread_mutex_unlock(&LOCK_thread_count);
2455
Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
2456
is important. Otherwise a killer_thread can execute between the calls and
2457
delete the mi structure leading to a crash! (see BUG#25306 for details)
2459
pthread_cond_broadcast(&rli->stop_cond);
2460
pthread_mutex_unlock(&rli->run_lock); // tell the world we are done
2464
return(0); // Can't return anything here
2469
process_io_create_file()
2472
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
2476
bool cev_not_written;
2477
Session *session = mi->io_session;
2478
NET *net = &mi->drizzle->net;
2480
if (unlikely(!cev->is_valid()))
2483
if (!rpl_filter->db_ok(cev->db))
2485
skip_load_data_infile(net);
2488
assert(cev->inited_from_old);
2489
session->file_id = cev->file_id = mi->file_id++;
2490
session->server_id = cev->server_id;
2491
cev_not_written = 1;
2493
if (unlikely(net_request_file(net,cev->fname)))
2495
sql_print_error(_("Slave I/O: failed requesting download of '%s'"),
2501
This dummy block is so we could instantiate Append_block_log_event
2502
once and then modify it slightly instead of doing it multiple times
2506
Append_block_log_event aev(session,0,0,0,0);
2510
if (unlikely((num_bytes=my_net_read(net)) == packet_error))
2512
sql_print_error(_("Network read error downloading '%s' from master"),
2516
if (unlikely(!num_bytes)) /* eof */
2518
/* 3.23 master wants it */
2519
net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0);
2521
If we wrote Create_file_log_event, then we need to write
2522
Execute_load_log_event. If we did not write Create_file_log_event,
2523
then this is an empty file and we can just do as if the LOAD DATA
2524
INFILE had not existed, i.e. write nothing.
2526
if (unlikely(cev_not_written))
2528
Execute_load_log_event xev(session,0,0);
2529
xev.log_pos = cev->log_pos;
2530
if (unlikely(mi->rli.relay_log.append(&xev)))
2532
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2533
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2534
_("error writing Exec_load event to relay log"));
2537
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2540
if (unlikely(cev_not_written))
2542
cev->block = net->read_pos;
2543
cev->block_len = num_bytes;
2544
if (unlikely(mi->rli.relay_log.append(cev)))
2546
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2547
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2548
_("error writing Create_file event to relay log"));
2552
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2556
aev.block = net->read_pos;
2557
aev.block_len = num_bytes;
2558
aev.log_pos = cev->log_pos;
2559
if (unlikely(mi->rli.relay_log.append(&aev)))
2561
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2562
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2563
_("error writing Append_block event to relay log"));
2566
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
2577
Start using a new binary log on the master
2581
mi master_info for the slave
2582
rev The rotate log event read from the binary log
2585
Updates the master info with the place in the next binary
2586
log where we should start reading.
2587
Rotate the relay log to avoid mixed-format relay logs.
2590
We assume we already locked mi->data_lock
2594
1 Log event is illegal
2598
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
2600
safe_mutex_assert_owner(&mi->data_lock);
2602
if (unlikely(!rev->is_valid()))
2605
/* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2606
mi->setLogName(rev->new_log_ident);
2607
mi->setLogPosition(rev->pos);
2609
If we do not do this, we will be getting the first
2610
rotate event forever, so we need to not disconnect after one.
2612
if (disconnect_slave_event_count)
2613
mi->events_till_disconnect++;
2616
If description_event_for_queue is format <4, there is conversion in the
2617
relay log to the slave's format (4). And Rotate can mean upgrade or
2618
nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
2619
no need to reset description_event_for_queue now. And if it's nothing (same
2620
master version as before), no need (still using the slave's format).
2622
if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
2624
delete mi->rli.relay_log.description_event_for_queue;
2625
/* start from format 3 (DRIZZLE 4.0) again */
2626
mi->rli.relay_log.description_event_for_queue= new
2627
Format_description_log_event(3);
2630
Rotate the relay log makes binlog format detection easier (at next slave
2631
start or mysqlbinlog)
2633
rotate_relay_log(mi); /* will take the right mutexes */
2638
Reads a 3.23 event and converts it to the slave's format. This code was
2639
copied from DRIZZLE 4.0.
2641
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
2644
const char *errmsg = 0;
2646
bool ignore_event= 0;
2648
Relay_log_info *rli= &mi->rli;
2651
If we get Load event, we need to pass a non-reusable buffer
2652
to read_log_event, so we do a trick
2654
if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
2656
if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
2658
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2659
ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
2662
memcpy(tmp_buf,buf,event_len);
2664
Create_file constructor wants a 0 as last char of buffer, this 0 will
2665
serve as the string-termination char for the file's name (which is at the
2667
We must increment event_len, otherwise the event constructor will not see
2668
this end 0, which leads to segfault.
2670
tmp_buf[event_len++]=0;
2671
int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
2672
buf = (const char*)tmp_buf;
2675
This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
2676
send the loaded file, and write it to the relay log in the form of
2677
Append_block/Exec_load (the SQL thread needs the data, as that thread is not
2678
connected to the master).
2680
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2681
mi->rli.relay_log.description_event_for_queue);
2684
sql_print_error(_("Read invalid event from master: '%s', "
2685
"master could be corrupt but a more likely cause "
2686
"of this is a bug"),
2688
free((char*) tmp_buf);
2692
pthread_mutex_lock(&mi->data_lock);
2693
ev->log_pos= mi->getLogPosition(); /* 3.23 events don't contain log_pos */
2694
switch (ev->get_type_code()) {
2700
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2703
pthread_mutex_unlock(&mi->data_lock);
2708
case CREATE_FILE_EVENT:
2710
Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
2711
queue_old_event() which is for 3.23 events which don't comprise
2712
CREATE_FILE_EVENT. This is because read_log_event() above has just
2713
transformed LOAD_EVENT into CREATE_FILE_EVENT.
2716
/* We come here when and only when tmp_buf != 0 */
2717
assert(tmp_buf != 0);
2719
ev->log_pos+= inc_pos;
2720
int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2722
mi->incrementLogPosition(inc_pos);
2723
pthread_mutex_unlock(&mi->data_lock);
2724
free((char*)tmp_buf);
2731
if (likely(!ignore_event))
2735
Don't do it for fake Rotate events (see comment in
2736
Log_event::Log_event(const char* buf...) in log_event.cc).
2738
ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
2739
if (unlikely(rli->relay_log.append(ev)))
2742
pthread_mutex_unlock(&mi->data_lock);
2745
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2748
mi->incrementLogPosition(inc_pos);
2749
pthread_mutex_unlock(&mi->data_lock);
2754
Reads a 4.0 event and converts it to the slave's format. This code was copied
2755
from queue_binlog_ver_1_event(), with some affordable simplifications.
2757
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
2760
const char *errmsg = 0;
2763
Relay_log_info *rli= &mi->rli;
2765
/* read_log_event() will adjust log_pos to be end_log_pos */
2766
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2767
mi->rli.relay_log.description_event_for_queue);
2770
sql_print_error(_("Read invalid event from master: '%s', "
2771
"master could be corrupt but a more likely cause of "
2774
free((char*) tmp_buf);
2777
pthread_mutex_lock(&mi->data_lock);
2778
switch (ev->get_type_code()) {
2782
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2785
pthread_mutex_unlock(&mi->data_lock);
2794
if (unlikely(rli->relay_log.append(ev)))
2797
pthread_mutex_unlock(&mi->data_lock);
2800
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2802
mi->incrementLogPosition(inc_pos);
2804
pthread_mutex_unlock(&mi->data_lock);
2811
Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
2812
(exactly, slave's) format. To do the conversion, we create a 5.0 event from
2813
the 3.23/4.0 bytes, then write this event to the relay log.
2816
Test this code before release - it has to be tested on a separate
2817
setup with 3.23 master or 4.0 master
2820
static int32_t queue_old_event(Master_info *mi, const char *buf,
2823
switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
2826
return(queue_binlog_ver_1_event(mi,buf,event_len));
2828
return(queue_binlog_ver_3_event(mi,buf,event_len));
2829
default: /* unsupported format; eg version 2 */
2837
If the event is 3.23/4.0, passes it to queue_old_event() which will convert
2838
it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
2839
no format conversion, it's pure read/write of bytes.
2840
So a 5.0.0 slave's relay log can contain events in the slave's format or in
2844
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
2848
uint32_t inc_pos= 0;
2849
Relay_log_info *rli= &mi->rli;
2850
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
2853
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
2854
buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
2855
return(queue_old_event(mi,buf,event_len));
2857
pthread_mutex_lock(&mi->data_lock);
2859
switch (buf[EVENT_TYPE_OFFSET]) {
2862
We needn't write this event to the relay log. Indeed, it just indicates a
2863
master server shutdown. The only thing this does is cleaning. But
2864
cleaning is already done on a per-master-thread basis (as the master
2865
server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
2866
prepared statements' deletion are TODO only when we binlog prep stmts).
2868
We don't even increment mi->master_log_pos, because we may be just after
2869
a Rotate event. Btw, in a few milliseconds we are going to have a Start
2870
event from the next binlog (unless the master is presently running
2876
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
2877
if (unlikely(process_io_rotate(mi,&rev)))
2879
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2883
Now the I/O thread has just changed its mi->master_log_name, so
2884
incrementing mi->master_log_pos is nonsense.
2889
case FORMAT_DESCRIPTION_EVENT:
2892
Create an event, and save it (when we rotate the relay log, we will have
2893
to write this event again).
2896
We are the only thread which reads/writes description_event_for_queue.
2897
The relay_log struct does not move (though some members of it can
2898
change), so we needn't any lock (no rli->data_lock, no log lock).
2900
Format_description_log_event* tmp;
2902
if (!(tmp= (Format_description_log_event*)
2903
Log_event::read_log_event(buf, event_len, &errmsg,
2904
mi->rli.relay_log.description_event_for_queue)))
2906
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2909
delete mi->rli.relay_log.description_event_for_queue;
2910
mi->rli.relay_log.description_event_for_queue= tmp;
2912
Though this does some conversion to the slave's format, this will
2913
preserve the master's binlog format version, and number of event types.
2916
If the event was not requested by the slave (the slave did not ask for
2917
it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
2919
inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
2923
case HEARTBEAT_LOG_EVENT:
2926
HB (heartbeat) cannot come before RL (Relay)
2929
Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
2932
error= ER_SLAVE_HEARTBEAT_FAILURE;
2933
error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2934
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2935
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2936
error_msg.append(STRING_WITH_LEN(" log_pos "));
2937
llstr(hb.log_pos, llbuf);
2938
error_msg.append(llbuf, strlen(llbuf));
2941
mi->received_heartbeats++;
2943
compare local and event's versions of log_file, log_pos.
2945
Heartbeat is sent only after an event corresponding to the corrdinates
2946
the heartbeat carries.
2947
Slave can not have a difference in coordinates except in the only
2948
special case when mi->master_log_name, master_log_pos have never
2949
been updated by Rotate event i.e when slave does not have any history
2950
with the master (and thereafter mi->master_log_pos is NULL).
2952
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2954
if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
2955
|| mi->getLogPosition() != hb.log_pos)
2957
/* missed events of heartbeat from the past */
2958
error= ER_SLAVE_HEARTBEAT_FAILURE;
2959
error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
2960
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2961
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2962
error_msg.append(STRING_WITH_LEN(" log_pos "));
2963
llstr(hb.log_pos, llbuf);
2964
error_msg.append(llbuf, strlen(llbuf));
2967
goto skip_relay_logging;
2977
If this event is originating from this server, don't queue it.
2978
We don't check this for 3.23 events because it's simpler like this; 3.23
2979
will be filtered anyway by the SQL slave thread which also tests the
2980
server id (we must also keep this test in the SQL thread, in case somebody
2981
upgrades a 4.0 slave which has a not-filtered relay log).
2983
ANY event coming from ourselves can be ignored: it is obvious for queries;
2984
for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
2985
(--log-slave-updates would not log that) unless this slave is also its
2986
direct master (an unsupported, useless setup!).
2989
pthread_mutex_lock(log_lock);
2991
if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
2992
!mi->rli.replicate_same_server_id)
2995
Do not write it to the relay log.
2996
a) We still want to increment mi->master_log_pos, so that we won't
2997
re-read this event from the master if the slave IO thread is now
2998
stopped/restarted (more efficient if the events we are ignoring are big
3000
b) We want to record that we are skipping events, for the information of
3001
the slave SQL thread, otherwise that thread may let
3002
rli->group_relay_log_pos stay too small if the last binlog's event is
3004
But events which were generated by this slave and which do not exist in
3005
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
3008
if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
3009
buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3010
buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3012
mi->incrementLogPosition(inc_pos);
3013
memcpy(rli->ign_master_log_name_end, mi->getLogName(), FN_REFLEN);
3014
assert(rli->ign_master_log_name_end[0]);
3015
rli->ign_master_log_pos_end= mi->getLogPosition();
3017
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3021
/* write the event to the relay log */
3022
if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3024
mi->incrementLogPosition(inc_pos);
3025
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3029
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
3031
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
3033
pthread_mutex_unlock(log_lock);
3038
pthread_mutex_unlock(&mi->data_lock);
3040
mi->report(ERROR_LEVEL, error, ER(error),
3041
(error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
3042
_("could not queue event from master") :
3048
void end_relay_log_info(Relay_log_info* rli)
3052
if (rli->info_fd >= 0)
3054
end_io_cache(&rli->info_file);
3055
(void) my_close(rli->info_fd, MYF(MY_WME));
3058
if (rli->cur_log_fd >= 0)
3060
end_io_cache(&rli->cache_buf);
3061
(void)my_close(rli->cur_log_fd, MYF(MY_WME));
3062
rli->cur_log_fd = -1;
3065
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
3066
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3068
Delete the slave's temporary tables from memory.
3069
In the future there will be other actions than this, to ensure persistance
3070
of slave's temp tables after shutdown.
3072
rli->close_temporary_tables();
3077
Try to connect until successful or slave killed
3081
session Thread handler for slave
3082
DRIZZLE DRIZZLE connection handle
3083
mi Replication handle
3090
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi)
3092
return(connect_to_master(session, drizzle, mi, 0, 0));
3101
Try to connect until successful or slave killed or we have retried
3102
master_retry_count times
3105
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
3106
bool reconnect, bool suppress_warnings)
3108
int32_t slave_was_killed;
3109
int32_t last_errno= -2; // impossible error
3110
uint32_t err_count=0;
3113
mi->events_till_disconnect = disconnect_slave_event_count;
3114
uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
3115
if (opt_slave_compressed_protocol)
3116
client_flag=CLIENT_COMPRESS; /* We will use compression */
3118
drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3119
drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3121
while (!(slave_was_killed = io_slave_killed(session,mi)) &&
3122
(reconnect ? drizzle_reconnect(drizzle) != 0 :
3123
drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
3124
mi->getPort(), 0, client_flag) == 0))
3126
/* Don't repeat last error */
3127
if ((int32_t)drizzle_errno(drizzle) != last_errno)
3129
last_errno=drizzle_errno(drizzle);
3130
suppress_warnings= 0;
3131
mi->report(ERROR_LEVEL, last_errno,
3132
_("error %s to master '%s@%s:%d'"
3133
" - retry-time: %d retries: %u"),
3134
(reconnect ? _("reconnecting") : _("connecting")),
3135
mi->getUsername(), mi->getHostname(), mi->getPort(),
3136
mi->getConnectionRetry(), master_retry_count);
3139
By default we try forever. The reason is that failure will trigger
3140
master election, so if the user did not set master_retry_count we
3141
do not want to have election triggered on the first failure to
3144
if (++err_count == master_retry_count)
3149
safe_sleep(session,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3153
if (!slave_was_killed)
3157
if (!suppress_warnings && global_system_variables.log_warnings)
3158
sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3159
"replication resumed in log '%s' at "
3160
"position %s"), mi->getUsername(),
3161
mi->getHostname(), mi->getPort(),
3163
llstr(mi->getLogPosition(),llbuff));
3166
drizzle->reconnect= 1;
3167
return(slave_was_killed);
3175
Try to connect until successful or slave killed or we have retried
3176
master_retry_count times
3179
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
3180
bool suppress_warnings)
3182
return(connect_to_master(session, drizzle, mi, 1, suppress_warnings));
3187
Store the file and position where the execute-slave thread are in the
3191
flush_relay_log_info()
3192
rli Relay log information
3195
- As this is only called by the slave thread, we don't need to
3196
have a lock on this.
3197
- If there is an active transaction, then we don't update the position
3198
in the relay log. This is to ensure that we re-execute statements
3199
if we die in the middle of an transaction that was rolled back.
3200
- As a transaction never spans binary logs, we don't have to handle the
3201
case where we do a relay-log-rotation in the middle of the transaction.
3202
If this would not be the case, we would have to ensure that we
3203
don't delete the relay log file where the transaction started when
3204
we switch to a new relay log file.
3207
- Change the log file information to a binary format to avoid calling
3215
bool flush_relay_log_info(Relay_log_info* rli)
3219
if (unlikely(rli->no_storage))
3227
Called when we notice that the current "hot" log got rotated under our feet.
3230
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
3232
assert(rli->cur_log != &rli->cache_buf);
3233
assert(rli->cur_log_fd == -1);
3235
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3236
if ((rli->cur_log_fd=open_binlog(cur_log, rli->event_relay_log_name.c_str(), errmsg)) < 0)
3239
We want to start exactly where we was before:
3240
relay_log_pos Current log pos
3241
pending Number of bytes already processed from the event
3243
rli->event_relay_log_pos= cmax(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3244
my_b_seek(cur_log,rli->event_relay_log_pos);
3249
static Log_event* next_event(Relay_log_info* rli)
3252
IO_CACHE* cur_log = rli->cur_log;
3253
pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3254
const char* errmsg=0;
3255
Session* session = rli->sql_session;
3257
assert(session != 0);
3259
if (abort_slave_event_count && !rli->events_till_abort--)
3263
For most operations we need to protect rli members with data_lock,
3264
so we assume calling function acquired this mutex for us and we will
3265
hold it for the most of the loop below However, we will release it
3266
whenever it is worth the hassle, and in the cases when we go into a
3267
pthread_cond_wait() with the non-data_lock mutex
3269
safe_mutex_assert_owner(&rli->data_lock);
3271
while (!sql_slave_killed(session,rli))
3274
We can have two kinds of log reading:
3276
rli->cur_log points at the IO_CACHE of relay_log, which
3277
is actively being updated by the I/O thread. We need to be careful
3278
in this case and make sure that we are not looking at a stale log that
3279
has already been rotated. If it has been, we reopen the log.
3281
The other case is much simpler:
3282
We just have a read only log that nobody else will be updating.
3285
if ((hot_log = (cur_log != &rli->cache_buf)))
3287
assert(rli->cur_log_fd == -1); // foreign descriptor
3288
pthread_mutex_lock(log_lock);
3291
Reading xxx_file_id is safe because the log will only
3292
be rotated when we hold relay_log.LOCK_log
3294
if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
3296
// The master has switched to a new log file; Reopen the old log file
3297
cur_log=reopen_relay_log(rli, &errmsg);
3298
pthread_mutex_unlock(log_lock);
3299
if (!cur_log) // No more log files
3301
hot_log=0; // Using old binary log
3305
As there is no guarantee that the relay is open (for example, an I/O
3306
error during a write by the slave I/O thread may have closed it), we
3309
if (!my_b_inited(cur_log))
3311
assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3312
assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
3315
Relay log is always in new format - if the master is 3.23, the
3316
I/O thread will convert the format for us.
3317
A problem: the description event may be in a previous relay log. So if
3318
the slave has been shutdown meanwhile, we would have to look in old relay
3319
logs, which may even have been deleted. So we need to write this
3320
description event at the beginning of the relay log.
3321
When the relay log is created when the I/O thread starts, easy: the
3322
master will send the description event and we will queue it.
3323
But if the relay log is created by new_file(): then the solution is:
3324
DRIZZLE_BIN_LOG::open() will write the buffered description event.
3326
if ((ev=Log_event::read_log_event(cur_log,0,
3327
rli->relay_log.description_event_for_exec)))
3330
assert(session==rli->sql_session);
3332
read it while we have a lock, to avoid a mutex lock in
3333
inc_event_relay_log_pos()
3335
rli->future_event_relay_log_pos= my_b_tell(cur_log);
3337
pthread_mutex_unlock(log_lock);
3340
assert(session==rli->sql_session);
3341
if (opt_reckless_slave) // For mysql-test
3343
if (cur_log->error < 0)
3345
errmsg = "slave SQL thread aborted because of I/O error";
3347
pthread_mutex_unlock(log_lock);
3350
if (!cur_log->error) /* EOF */
3353
On a hot log, EOF means that there are no more updates to
3354
process and we must block until I/O thread adds some and
3355
signals us to continue
3360
We say in Seconds_Behind_Master that we have "caught up". Note that
3361
for example if network link is broken but I/O slave thread hasn't
3362
noticed it (slave_net_timeout not elapsed), then we'll say "caught
3363
up" whereas we're not really caught up. Fixing that would require
3364
internally cutting timeout in smaller pieces in network read, no
3365
thanks. Another example: SQL has caught up on I/O, now I/O has read
3366
a new event and is queuing it; the false "0" will exist until SQL
3367
finishes executing the new event; it will be look abnormal only if
3368
the events have old timestamps (then you get "many", 0, "many").
3370
Transient phases like this can be fixed with implemeting
3371
Heartbeat event which provides the slave the status of the
3372
master at time the master does not have any new update to send.
3373
Seconds_Behind_Master would be zero only when master has no
3374
more updates in binlog for slave. The heartbeat can be sent
3375
in a (small) fraction of slave_net_timeout. Until it's done
3376
rli->last_master_timestamp is temporarely (for time of
3377
waiting for the following event) reset whenever EOF is
3380
time_t save_timestamp= rli->last_master_timestamp;
3381
rli->last_master_timestamp= 0;
3383
assert(rli->relay_log.get_open_count() ==
3384
rli->cur_log_old_open_count);
3386
if (rli->ign_master_log_name_end[0])
3388
/* We generate and return a Rotate, to make our positions advance */
3389
ev= new Rotate_log_event(rli->ign_master_log_name_end,
3390
0, rli->ign_master_log_pos_end,
3391
Rotate_log_event::DUP_NAME);
3392
rli->ign_master_log_name_end[0]= 0;
3393
pthread_mutex_unlock(log_lock);
3396
errmsg= "Slave SQL thread failed to create a Rotate event "
3397
"(out of memory?), SHOW SLAVE STATUS may be inaccurate";
3400
ev->server_id= 0; // don't be ignored by slave SQL thread
3405
We can, and should release data_lock while we are waiting for
3406
update. If we do not, show slave status will block
3408
pthread_mutex_unlock(&rli->data_lock);
3412
- the I/O thread has reached log_space_limit
3413
- the SQL thread has read all relay logs, but cannot purge for some
3415
* it has already purged all logs except the current one
3416
* there are other logs than the current one but they're involved in
3417
a transaction that finishes in the current one (or is not finished)
3419
Wake up the possibly waiting I/O thread, and set a boolean asking
3420
the I/O thread to temporarily ignore the log_space_limit
3421
constraint, because we do not want the I/O thread to block because of
3422
space (it's ok if it blocks for any other reason (e.g. because the
3423
master does not send anything). Then the I/O thread stops waiting
3424
and reads more events.
3425
The SQL thread decides when the I/O thread should take log_space_limit
3426
into account again : ignore_log_space_limit is reset to 0
3427
in purge_first_log (when the SQL thread purges the just-read relay
3428
log), and also when the SQL thread starts. We should also reset
3429
ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
3430
fact, no need as RESET SLAVE requires that the slave
3431
be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
3434
pthread_mutex_lock(&rli->log_space_lock);
3435
// prevent the I/O thread from blocking next times
3436
rli->ignore_log_space_limit= 1;
3438
If the I/O thread is blocked, unblock it. Ok to broadcast
3439
after unlock, because the mutex is only destroyed in
3440
~Relay_log_info(), i.e. when rli is destroyed, and rli will
3441
not be destroyed before we exit the present function.
3443
pthread_mutex_unlock(&rli->log_space_lock);
3444
pthread_cond_broadcast(&rli->log_space_cond);
3445
// Note that wait_for_update_relay_log unlocks lock_log !
3446
rli->relay_log.wait_for_update_relay_log(rli->sql_session);
3447
// re-acquire data lock since we released it earlier
3448
pthread_mutex_lock(&rli->data_lock);
3449
rli->last_master_timestamp= save_timestamp;
3453
If the log was not hot, we need to move to the next log in
3454
sequence. The next log could be hot or cold, we deal with both
3455
cases separately after doing some common initialization
3457
end_io_cache(cur_log);
3458
assert(rli->cur_log_fd >= 0);
3459
my_close(rli->cur_log_fd, MYF(MY_WME));
3460
rli->cur_log_fd = -1;
3462
if (relay_log_purge)
3465
purge_first_log will properly set up relay log coordinates in rli.
3466
If the group's coordinates are equal to the event's coordinates
3467
(i.e. the relay log was not rotated in the middle of a group),
3468
we can purge this relay log too.
3469
We do uint64_t and string comparisons, this may be slow but
3470
- purging the last relay log is nice (it can save 1GB of disk), so we
3471
like to detect the case where we can do it, and given this,
3472
- I see no better detection method
3473
- purge_first_log is not called that often
3475
if (rli->relay_log.purge_first_log
3477
rli->group_relay_log_pos == rli->event_relay_log_pos
3478
&& !strcmp(rli->group_relay_log_name.c_str(), rli->event_relay_log_name.c_str())))
3480
errmsg = "Error purging processed logs";
3487
If hot_log is set, then we already have a lock on
3488
LOCK_log. If not, we have to get the lock.
3490
According to Sasha, the only time this code will ever be executed
3491
is if we are recovering from a bug.
3493
if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
3495
errmsg = "error switching to the next log";
3498
rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3499
rli->event_relay_log_name.assign(rli->linfo.log_file_name);
3500
flush_relay_log_info(rli);
3504
Now we want to open this next log. To know if it's a hot log (the one
3505
being written by the I/O thread now) or a cold log, we can use
3506
is_active(); if it is hot, we use the I/O cache; if it's cold we open
3507
the file normally. But if is_active() reports that the log is hot, this
3508
may change between the test and the consequence of the test. So we may
3509
open the I/O cache whereas the log is now cold, which is nonsense.
3510
To guard against this, we need to have LOCK_log.
3513
if (!hot_log) /* if hot_log, we already have this mutex */
3514
pthread_mutex_lock(log_lock);
3515
if (rli->relay_log.is_active(rli->linfo.log_file_name))
3518
if (global_system_variables.log_warnings)
3519
sql_print_information(_("next log '%s' is currently active"),
3520
rli->linfo.log_file_name);
3522
rli->cur_log= cur_log= rli->relay_log.get_log_file();
3523
rli->cur_log_old_open_count= rli->relay_log.get_open_count();
3524
assert(rli->cur_log_fd == -1);
3527
Read pointer has to be at the start since we are the only
3529
We must keep the LOCK_log to read the 4 first bytes, as this is a hot
3530
log (same as when we call read_log_event() above: for a hot log we
3533
if (check_binlog_magic(cur_log,&errmsg))
3535
if (!hot_log) pthread_mutex_unlock(log_lock);
3538
if (!hot_log) pthread_mutex_unlock(log_lock);
3541
if (!hot_log) pthread_mutex_unlock(log_lock);
3543
if we get here, the log was not hot, so we will have to open it
3544
ourselves. We are sure that the log is still not hot now (a log can get
3545
from hot to cold, but not from cold to hot). No need for LOCK_log.
3548
if (global_system_variables.log_warnings)
3549
sql_print_information(_("next log '%s' is not active"),
3550
rli->linfo.log_file_name);
3552
// open_binlog() will check the magic header
3553
if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
3560
Read failed with a non-EOF error.
3561
TODO: come up with something better to handle this error
3564
pthread_mutex_unlock(log_lock);
3565
sql_print_error(_("Slave SQL thread: I/O error reading "
3566
"event(errno: %d cur_log->error: %d)"),
3567
my_errno,cur_log->error);
3568
// set read position to the beginning of the event
3569
my_b_seek(cur_log,rli->event_relay_log_pos);
3570
/* otherwise, we have had a partial read */
3571
errmsg = _("Aborting slave SQL thread because of partial event read");
3572
break; // To end of function
3575
if (!errmsg && global_system_variables.log_warnings)
3577
sql_print_information(_("Error reading relay log event: %s"),
3578
_("slave SQL thread was killed"));
3584
sql_print_error(_("Error reading relay log event: %s"), errmsg);
3589
Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
3590
because of size is simpler because when we do it we already have all relevant
3591
locks; here we don't, so this function is mainly taking locks).
3592
Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
3596
void rotate_relay_log(Master_info* mi)
3598
Relay_log_info* rli= &mi->rli;
3600
/* We don't lock rli->run_lock. This would lead to deadlocks. */
3601
pthread_mutex_lock(&mi->run_lock);
3604
We need to test inited because otherwise, new_file() will attempt to lock
3605
LOCK_log, which may not be inited (if we're not a slave).
3612
/* If the relay log is closed, new_file() will do nothing. */
3613
rli->relay_log.new_file();
3616
We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
3617
be counted, so imagine a succession of FLUSH LOGS and assume the slave
3618
threads are started:
3619
relay_log_space decreases by the size of the deleted relay log, but does
3620
not increase, so flush-after-flush we may become negative, which is wrong.
3621
Even if this will be corrected as soon as a query is replicated on the
3622
slave (because the I/O thread will then call harvest_bytes_written() which
3623
will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
3624
output in SHOW SLAVE STATUS meanwhile. So we harvest now.
3625
If the log is closed, then this will just harvest the last writes, probably
3626
0 as they probably have been harvested.
3628
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3630
pthread_mutex_unlock(&mi->run_lock);
3636
Detects, based on master's version (as found in the relay log), if master
3638
@param rli Relay_log_info which tells the master's version
3639
@param bug_id Number of the bug as found in bugs.mysql.com
3640
@param report bool report error message, default TRUE
3641
@return true if master has the bug, FALSE if it does not.
3643
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
3645
struct st_version_range_for_one_bug {
3647
const unsigned char introduced_in[3]; // first version with bug
3648
const unsigned char fixed_in[3]; // first version with fix
3650
static struct st_version_range_for_one_bug versions_for_all_bugs[]=
3652
{24432, { 5, 0, 24 }, { 5, 0, 38 } },
3653
{24432, { 5, 1, 12 }, { 5, 1, 17 } },
3654
{33029, { 5, 0, 0 }, { 5, 0, 58 } },
3655
{33029, { 5, 1, 0 }, { 5, 1, 12 } },
3657
const unsigned char *master_ver=
3658
rli->relay_log.description_event_for_exec->server_version_split;
3660
assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3663
i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3665
const unsigned char *introduced_in= versions_for_all_bugs[i].introduced_in,
3666
*fixed_in= versions_for_all_bugs[i].fixed_in;
3667
if ((versions_for_all_bugs[i].bug_id == bug_id) &&
3668
(memcmp(introduced_in, master_ver, 3) <= 0) &&
3669
(memcmp(fixed_in, master_ver, 3) > 0))
3674
// a short message for SHOW SLAVE STATUS (message length constraints)
3675
my_printf_error(ER_UNKNOWN_ERROR,
3676
_("master may suffer from"
3677
" http://bugs.mysql.com/bug.php?id=%u"
3678
" so slave stops; check error log on slave"
3679
" for more info"), MYF(0), bug_id);
3680
// a verbose message for the error log
3681
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
3682
_("According to the master's version ('%s'),"
3683
" it is probable that master suffers from this bug:"
3684
" http://bugs.mysql.com/bug.php?id=%u"
3685
" and thus replicating the current binary log event"
3686
" may make the slave's data become different from the"
3688
" To take no risk, slave refuses to replicate"
3689
" this event and stops."
3690
" We recommend that all updates be stopped on the"
3691
" master and slave, that the data of both be"
3692
" manually synchronized,"
3693
" that master's binary logs be deleted,"
3694
" that master be upgraded to a version at least"
3695
" equal to '%d.%d.%d'. Then replication can be"
3697
rli->relay_log.description_event_for_exec->server_version,
3699
fixed_in[0], fixed_in[1], fixed_in[2]);
3707
BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
3708
exclusive, if one statement in a SP generated AUTO_INCREMENT value
3709
by the top statement, all statements after it would be considered
3710
generated AUTO_INCREMENT value by the top statement, and a
3711
erroneous INSERT_ID value might be associated with these statement,
3712
which could cause duplicate entry error and stop the slave.
3714
Detect buggy master to work around.
3716
bool rpl_master_erroneous_autoinc(Session *session)
3718
if (active_mi && active_mi->rli.sql_session == session)
3720
Relay_log_info *rli= &active_mi->rli;
3721
return rpl_master_has_bug(rli, 33029, false);
3726
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
3727
template class I_List_iterator<i_string>;
3728
template class I_List_iterator<i_string_pair>;
3732
@} (end of group Replication)