1
/* Copyright (C) 2000-2003 DRIZZLE AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
@addtogroup Replication
23
@brief Code to run the io thread and the sql thread on the
26
#include <drizzled/server_includes.h>
28
#include <storage/myisam/myisam.h>
29
#include <drizzled/rpl_mi.h>
30
#include <drizzled/rpl_rli.h>
31
#include <drizzled/sql_repl.h>
32
#include <drizzled/rpl_filter.h>
33
#include <mysys/thr_alarm.h>
34
#include <libdrizzle/errmsg.h>
35
#include <mysys/mysys_err.h>
36
#include <drizzled/error.h>
37
#include <drizzled/sql_parse.h>
38
#include <drizzled/gettext.h>
41
#if TIME_WITH_SYS_TIME
42
# include <sys/time.h>
46
# include <sys/time.h>
52
#include <drizzled/tztime.h>
54
#include "rpl_tblmap.h"
56
#define MAX_SLAVE_RETRY_PAUSE 5
57
bool use_slave_mask = 0;
58
MY_BITMAP slave_error_mask;
60
typedef bool (*CHECK_KILLED_FUNC)(Session*,void*);
62
char* slave_load_tmpdir = 0;
63
Master_info *active_mi= 0;
64
bool replicate_same_server_id;
65
uint64_t relay_log_space_limit = 0;
68
When slave thread exits, we need to remember the temporary tables so we
69
can re-use them on slave start.
71
TODO: move the vars below under Master_info
74
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
75
int32_t events_till_abort = -1;
77
enum enum_slave_reconnect_actions
79
SLAVE_RECON_ACT_REG= 0,
80
SLAVE_RECON_ACT_DUMP= 1,
81
SLAVE_RECON_ACT_EVENT= 2,
85
enum enum_slave_reconnect_messages
87
SLAVE_RECON_MSG_WAIT= 0,
88
SLAVE_RECON_MSG_KILLED_WAITING= 1,
89
SLAVE_RECON_MSG_AFTER= 2,
90
SLAVE_RECON_MSG_FAILED= 3,
91
SLAVE_RECON_MSG_COMMAND= 4,
92
SLAVE_RECON_MSG_KILLED_AFTER= 5,
96
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
99
N_("Waiting to reconnect after a failed registration on master"),
100
N_("Slave I/O thread killed while waitnig to reconnect after a "
101
"failed registration on master"),
102
N_("Reconnecting after a failed registration on master"),
103
N_("failed registering on master, reconnecting to try again, "
104
"log '%s' at postion %s"),
105
"COM_REGISTER_SLAVE",
106
N_("Slave I/O thread killed during or after reconnect")
109
N_("Waiting to reconnect after a failed binlog dump request"),
110
N_("Slave I/O thread killed while retrying master dump"),
111
N_("Reconnecting after a failed binlog dump request"),
112
N_("failed dump request, reconnecting to try again, "
113
"log '%s' at postion %s"),
115
N_("Slave I/O thread killed during or after reconnect")
118
N_("Waiting to reconnect after a failed master event read"),
119
N_("Slave I/O thread killed while waiting to reconnect "
120
"after a failed read"),
121
N_("Reconnecting after a failed master event read"),
122
N_("Slave I/O thread: Failed reading log event, "
123
"reconnecting to retry, log '%s' at postion %s"),
125
N_("Slave I/O thread killed during or after a "
126
"reconnect done to recover from failed read")
131
typedef enum { SLAVE_Session_IO, SLAVE_Session_SQL} SLAVE_Session_TYPE;
133
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
134
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
135
static bool wait_for_relay_log_space(Relay_log_info* rli);
136
static inline bool io_slave_killed(Session* session,Master_info* mi);
137
static inline bool sql_slave_killed(Session* session,Relay_log_info* rli);
138
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type);
139
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi);
140
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
141
bool suppress_warnings);
142
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
143
bool reconnect, bool suppress_warnings);
144
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
145
void* thread_killed_arg);
146
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
147
static Log_event* next_event(Relay_log_info* rli);
148
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
149
static int32_t terminate_slave_thread(Session *session,
150
pthread_mutex_t* term_lock,
151
pthread_cond_t* term_cond,
152
volatile uint32_t *slave_running,
154
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info);
157
Find out which replications threads are running
161
mask Return value here
162
mi master_info for slave
163
inverse If set, returns which threads are not running
166
Get a bit mask for which threads are running so that we can later restart
170
mask If inverse == 0, running threads
171
If inverse == 1, stopped threads
174
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
176
bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
177
register int32_t tmp_mask=0;
180
tmp_mask |= SLAVE_IO;
182
tmp_mask |= SLAVE_SQL;
184
tmp_mask^= (SLAVE_IO | SLAVE_SQL);
194
void lock_slave_threads(Master_info* mi)
196
//TODO: see if we can do this without dual mutex
197
pthread_mutex_lock(&mi->run_lock);
198
pthread_mutex_lock(&mi->rli.run_lock);
204
unlock_slave_threads()
207
void unlock_slave_threads(Master_info* mi)
209
//TODO: see if we can do this without dual mutex
210
pthread_mutex_unlock(&mi->rli.run_lock);
211
pthread_mutex_unlock(&mi->run_lock);
216
/* Initialize slave structures */
221
This is called when mysqld starts. Before client connections are
222
accepted. However bootstrap may conflict with us if it does START SLAVE.
223
So it's safer to take the lock.
225
pthread_mutex_lock(&LOCK_active_mi);
227
TODO: re-write this to interate through the list of files
230
active_mi= new Master_info;
233
If master_host is not specified, try to read it from the master_info file.
234
If master_host is specified, create the master_info file if it doesn't
239
sql_print_error(_("Failed to allocate memory for the master info structure"));
243
if (active_mi->init_master_info(master_info_file, relay_log_info_file, (SLAVE_IO | SLAVE_SQL)))
245
sql_print_error(_("Failed to initialize the master info structure"));
249
/* If server id is not set, start_slave_thread() will say it */
251
if (active_mi->host[0] && !opt_skip_slave_start)
253
if (start_slave_threads(1 /* need mutex */,
254
0 /* no wait for start*/,
258
SLAVE_IO | SLAVE_SQL))
260
sql_print_error(_("Failed to create slave threads"));
264
pthread_mutex_unlock(&LOCK_active_mi);
268
pthread_mutex_unlock(&LOCK_active_mi);
274
Init function to set up array for errors that should be skipped for slave
277
init_slave_skip_errors()
278
arg List of errors numbers to skip, separated with ','
281
Called from get_options() in mysqld.cc on start-up
284
void init_slave_skip_errors(const char* arg)
288
if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
290
fprintf(stderr, "Badly out of memory, please check your system status\n");
294
for (;my_isspace(system_charset_info,*arg);++arg)
296
if (!my_strnncoll(system_charset_info,(unsigned char*)arg,4,(const unsigned char*)"all",4))
298
bitmap_set_all(&slave_error_mask);
304
if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
306
if (err_code < MAX_SLAVE_ERROR)
307
bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
308
while (!my_isdigit(system_charset_info,*p) && *p)
315
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
318
return(0); /* successfully do nothing */
319
int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
320
pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
322
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
325
if ((error=terminate_slave_thread(mi->io_session,io_lock,
332
if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
334
mi->rli.abort_slave=1;
335
if ((error=terminate_slave_thread(mi->rli.sql_session,sql_lock,
337
&mi->rli.slave_running,
347
Wait for a slave thread to terminate.
349
This function is called after requesting the thread to terminate
350
(by setting @c abort_slave member of @c Relay_log_info or @c
351
Master_info structure to 1). Termination of the thread is
352
controlled with the the predicate <code>*slave_running</code>.
354
Function will acquire @c term_lock before waiting on the condition
355
unless @c skip_lock is true in which case the mutex should be owned
356
by the caller of this function and will remain acquired after
357
return from the function.
360
Associated lock to use when waiting for @c term_cond
363
Condition that is signalled when the thread has terminated
366
Pointer to predicate to check for slave thread termination
369
If @c true the lock will not be acquired before waiting on
370
the condition. In this case, it is assumed that the calling
371
function acquires the lock before calling this function.
376
terminate_slave_thread(Session *session,
377
pthread_mutex_t* term_lock,
378
pthread_cond_t* term_cond,
379
volatile uint32_t *slave_running,
385
pthread_mutex_lock(term_lock);
387
safe_mutex_assert_owner(term_lock);
392
pthread_mutex_unlock(term_lock);
393
return(ER_SLAVE_NOT_RUNNING);
395
assert(session != 0);
396
Session_CHECK_SENTRY(session);
399
Is is critical to test if the slave is running. Otherwise, we might
400
be referening freed memory trying to kick it
403
while (*slave_running) // Should always be true
405
pthread_mutex_lock(&session->LOCK_delete);
406
#ifndef DONT_USE_THR_ALARM
408
Error codes from pthread_kill are:
409
EINVAL: invalid signal number (can't happen)
410
ESRCH: thread already killed (can happen, should be ignored)
412
int32_t err= pthread_kill(session->real_id, thr_client_alarm);
413
assert(err != EINVAL);
415
session->awake(Session::NOT_KILLED);
416
pthread_mutex_unlock(&session->LOCK_delete);
419
There is a small chance that slave thread might miss the first
420
alarm. To protect againts it, resend the signal until it reacts
422
struct timespec abstime;
423
set_timespec(abstime,2);
424
error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
425
assert(error == ETIMEDOUT || error == 0);
428
assert(*slave_running == 0);
431
pthread_mutex_unlock(term_lock);
436
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
437
pthread_mutex_t *cond_lock,
438
pthread_cond_t *start_cond,
439
volatile uint32_t *slave_running,
440
volatile uint32_t *slave_run_id,
450
pthread_mutex_lock(start_lock);
454
pthread_cond_broadcast(start_cond);
456
pthread_mutex_unlock(start_lock);
457
sql_print_error(_("Server id not set, will not start slave"));
458
return(ER_BAD_SLAVE);
464
pthread_cond_broadcast(start_cond);
466
pthread_mutex_unlock(start_lock);
467
return(ER_SLAVE_MUST_STOP);
469
start_id= *slave_run_id;
472
struct sched_param tmp_sched_param;
474
memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
475
tmp_sched_param.sched_priority= CONNECT_PRIOR;
476
(void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
478
if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
481
pthread_mutex_unlock(start_lock);
482
return(ER_SLAVE_THREAD);
484
if (start_cond && cond_lock) // caller has cond_lock
486
Session* session = current_session;
487
while (start_id == *slave_run_id)
489
const char* old_msg = session->enter_cond(start_cond,cond_lock,
490
"Waiting for slave thread to start");
491
pthread_cond_wait(start_cond,cond_lock);
492
session->exit_cond(old_msg);
493
pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
495
return(session->killed_errno());
499
pthread_mutex_unlock(start_lock);
505
start_slave_threads()
508
SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
509
sense to do that for starting a slave--we always care if it actually
510
started the threads that were not previously running
513
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
514
Master_info* mi, const char*, const char*,
517
pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
518
pthread_cond_t* cond_io=0,*cond_sql=0;
521
if (need_slave_mutex)
523
lock_io = &mi->run_lock;
524
lock_sql = &mi->rli.run_lock;
528
cond_io = &mi->start_cond;
529
cond_sql = &mi->rli.start_cond;
530
lock_cond_io = &mi->run_lock;
531
lock_cond_sql = &mi->rli.run_lock;
534
if (thread_mask & SLAVE_IO)
535
error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
537
&mi->slave_running, &mi->slave_run_id,
538
mi, 1); //high priority, to read the most possible
539
if (!error && (thread_mask & SLAVE_SQL))
541
error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
543
&mi->rli.slave_running, &mi->rli.slave_run_id,
546
terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
553
static int32_t end_slave_on_walk(Master_info* mi, unsigned char* /*unused*/)
562
Free all resources used by slave
571
This is called when the server terminates, in close_connections().
572
It terminates slave threads. However, some CHANGE MASTER etc may still be
573
running presently. If a START SLAVE was in progress, the mutex lock below
574
will make us wait until slave threads have started, and START SLAVE
575
returns, then we terminate them here.
577
pthread_mutex_lock(&LOCK_active_mi);
581
TODO: replace the line below with
582
list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
583
once multi-master code is ready.
585
terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
586
active_mi->end_master_info();
590
pthread_mutex_unlock(&LOCK_active_mi);
595
static bool io_slave_killed(Session* session, Master_info* mi)
597
assert(mi->io_session == session);
598
assert(mi->slave_running); // tracking buffer overrun
599
return(mi->abort_slave || abort_loop || session->killed);
603
static bool sql_slave_killed(Session* session, Relay_log_info* rli)
605
assert(rli->sql_session == session);
606
assert(rli->slave_running == 1);// tracking buffer overrun
607
if (abort_loop || session->killed || rli->abort_slave)
610
If we are in an unsafe situation (stopping could corrupt replication),
611
we give one minute to the slave SQL thread of grace before really
612
terminating, in the hope that it will be able to read more events and
613
the unsafe situation will soon be left. Note that this one minute starts
614
from the last time anything happened in the slave SQL thread. So it's
615
really one minute of idleness, we don't timeout if the slave SQL thread
618
if (rli->last_event_start_time == 0)
620
if (difftime(time(0), rli->last_event_start_time) > 60)
622
rli->report(ERROR_LEVEL, 0,
623
_("SQL thread had to stop in an unsafe situation, in "
624
"the middle of applying updates to a "
625
"non-transactional table without any primary key. "
626
"There is a risk of duplicate updates when the slave "
627
"SQL thread is restarted. Please check your tables' "
628
"contents after restart."));
637
skip_load_data_infile()
640
This is used to tell a 3.23 master to break send_file()
643
void skip_load_data_infile(NET *net)
645
(void)net_request_file(net, "/dev/null");
646
(void)my_net_read(net); // discard response
647
(void)net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0); // ok
652
bool net_request_file(NET* net, const char* fname)
654
return(net_write_command(net, 251, (unsigned char*) fname, strlen(fname),
655
(unsigned char*) "", 0));
659
From other comments and tests in code, it looks like
660
sometimes Query_log_event and Load_log_event can have db == 0
661
(see rewrite_db() above for example)
662
(cases where this happens are unclear; it may be when the master is 3.23).
665
const char *print_slave_db_safe(const char* db)
667
return((db ? db : ""));
670
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
671
const char *default_val)
675
if ((length=my_b_gets(f,var, max_size)))
677
char* last_p = var + length -1;
679
*last_p = 0; // if we stopped on newline, kill it
683
If we truncated a line or stopped on last char, remove all chars
684
up to and including newline.
687
while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
691
else if (default_val)
693
strmake(var, default_val, max_size-1);
700
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
705
if (my_b_gets(f, buf, sizeof(buf)))
710
else if (default_val)
718
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
723
if (my_b_gets(f, buf, sizeof(buf)))
725
if (sscanf(buf, "%f", var) != 1)
730
else if (default_val != 0.0)
738
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info)
740
if (io_slave_killed(session, mi))
742
if (info && global_system_variables.log_warnings)
743
sql_print_information("%s",info);
751
Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
752
relying on the binlog's version. This is not perfect: imagine an upgrade
753
of the master without waiting that all slaves are in sync with the master;
754
then a slave could be fooled about the binlog's format. This is what happens
755
when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
756
slaves are fooled. So we do this only to distinguish between 3.23 and more
757
recent masters (it's too late to change things for 3.23).
764
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
767
String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
768
char err_buff[MAX_SLAVE_ERRMSG];
769
const char* errmsg= 0;
771
DRIZZLE_RES *master_res= 0;
772
DRIZZLE_ROW master_row;
776
Free old description_event_for_queue (that is needed if we are in
779
delete mi->rli.relay_log.description_event_for_queue;
780
mi->rli.relay_log.description_event_for_queue= 0;
782
if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
784
errmsg = _("Master reported unrecognized DRIZZLE version");
785
err_code= ER_SLAVE_FATAL_ERROR;
786
sprintf(err_buff, ER(err_code), errmsg);
787
err_msg.append(err_buff);
792
Note the following switch will bug when we have DRIZZLE branch 30 ;)
794
switch (*drizzle->server_version)
799
errmsg = _("Master reported unrecognized DRIZZLE version");
800
err_code= ER_SLAVE_FATAL_ERROR;
801
sprintf(err_buff, ER(err_code), errmsg);
802
err_msg.append(err_buff);
805
mi->rli.relay_log.description_event_for_queue= new
806
Format_description_log_event(1, drizzle->server_version);
809
mi->rli.relay_log.description_event_for_queue= new
810
Format_description_log_event(3, drizzle->server_version);
814
Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
815
take the early steps (like tests for "is this a 3.23 master") which we
816
have to take before we receive the real master's Format_desc which will
817
override this one. Note that the Format_desc we create below is garbage
818
(it has the format of the *slave*); it's only good to help know if the
819
master is 3.23, 4.0, etc.
821
mi->rli.relay_log.description_event_for_queue= new
822
Format_description_log_event(4, drizzle->server_version);
828
This does not mean that a 5.0 slave will be able to read a 6.0 master; but
829
as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
830
can't read a 6.0 master, this will show up when the slave can't read some
831
events sent by the master, and there will be error messages.
834
if (err_msg.length() != 0)
837
/* as we are here, we tried to allocate the event */
838
if (!mi->rli.relay_log.description_event_for_queue)
840
errmsg= _("default Format_description_log_event");
841
err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
842
sprintf(err_buff, ER(err_code), errmsg);
843
err_msg.append(err_buff);
848
Compare the master and slave's clock. Do not die if master's clock is
849
unavailable (very old master not supporting UNIX_TIMESTAMP()?).
852
if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
853
(master_res= drizzle_store_result(drizzle)) &&
854
(master_row= drizzle_fetch_row(master_res)))
856
mi->clock_diff_with_master=
857
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
859
else if (!check_io_slave_killed(mi->io_session, mi, NULL))
861
mi->clock_diff_with_master= 0; /* The "most sensible" value */
862
sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
863
"do not trust column Seconds_Behind_Master of SHOW "
864
"SLAVE STATUS. Error: %s (%d)"),
865
drizzle_error(drizzle), drizzle_errno(drizzle));
868
drizzle_free_result(master_res);
871
Check that the master's server id and ours are different. Because if they
872
are equal (which can result from a simple copy of master's datadir to slave,
873
thus copying some drizzle.cnf), replication will work but all events will be
875
Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
877
Note: we could have put a @@SERVER_ID in the previous SELECT
878
UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
880
if (!drizzle_real_query(drizzle,
881
STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
882
(master_res= drizzle_store_result(drizzle)))
884
if ((master_row= drizzle_fetch_row(master_res)) &&
885
(::server_id == strtoul(master_row[1], 0, 10)) &&
886
!mi->rli.replicate_same_server_id)
889
_("The slave I/O thread stops because master and slave have equal "
890
"DRIZZLE server ids; these ids must be different "
891
"for replication to work (or "
892
"the --replicate-same-server-id option must be used "
893
"on slave but this does"
894
"not always make sense; please check the manual before using it).");
895
err_code= ER_SLAVE_FATAL_ERROR;
896
sprintf(err_buff, ER(err_code), errmsg);
897
err_msg.append(err_buff);
899
drizzle_free_result(master_res);
905
Check that the master's global character_set_server and ours are the same.
906
Not fatal if query fails (old master?).
907
Note that we don't check for equality of global character_set_client and
908
collation_connection (neither do we prevent their setting in
909
set_var.cc). That's because from what I (Guilhem) have tested, the global
910
values of these 2 are never used (new connections don't use them).
911
We don't test equality of global collation_database either as it's is
912
going to be deprecated (made read-only) in 4.1 very soon.
913
The test is only relevant if master < 5.0.3 (we'll test only if it's older
914
than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
915
charset info in each binlog event.
916
We don't do it for 3.23 because masters <3.23.50 hang on
917
SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
918
test only if master is 4.x.
921
/* redundant with rest of code but safer against later additions */
922
if (*drizzle->server_version == '3')
925
if ((*drizzle->server_version == '4') &&
926
!drizzle_real_query(drizzle,
927
STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
928
(master_res= drizzle_store_result(drizzle)))
930
if ((master_row= drizzle_fetch_row(master_res)) &&
931
strcmp(master_row[0], global_system_variables.collation_server->name))
934
_("The slave I/O thread stops because master and slave have"
935
" different values for the COLLATION_SERVER global variable."
936
" The values must be equal for replication to work");
937
err_code= ER_SLAVE_FATAL_ERROR;
938
sprintf(err_buff, ER(err_code), errmsg);
939
err_msg.append(err_buff);
941
drizzle_free_result(master_res);
947
Perform analogous check for time zone. Theoretically we also should
948
perform check here to verify that SYSTEM time zones are the same on
949
slave and master, but we can't rely on value of @@system_time_zone
950
variable (it is time zone abbreviation) since it determined at start
951
time and so could differ for slave and master even if they are really
952
in the same system time zone. So we are omiting this check and just
953
relying on documentation. Also according to Monty there are many users
954
who are using replication between servers in various time zones. Hence
955
such check will broke everything for them. (And now everything will
956
work for them because by default both their master and slave will have
958
This check is only necessary for 4.x masters (and < 5.0.4 masters but
961
if ((*drizzle->server_version == '4') &&
962
!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
963
(master_res= drizzle_store_result(drizzle)))
965
if ((master_row= drizzle_fetch_row(master_res)) &&
966
strcmp(master_row[0],
967
global_system_variables.time_zone->get_name()->ptr()))
970
_("The slave I/O thread stops because master and slave have"
971
" different values for the TIME_ZONE global variable."
972
" The values must be equal for replication to work");
973
err_code= ER_SLAVE_FATAL_ERROR;
974
sprintf(err_buff, ER(err_code), errmsg);
975
err_msg.append(err_buff);
977
drizzle_free_result(master_res);
983
if (mi->heartbeat_period != 0.0)
986
const char query_format[]= "SET @master_heartbeat_period= %s";
987
char query[sizeof(query_format) - 2 + sizeof(llbuf)];
989
the period is an uint64_t of nano-secs.
991
llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
992
sprintf(query, query_format, llbuf);
994
if (drizzle_real_query(drizzle, query, strlen(query))
995
&& !check_io_slave_killed(mi->io_session, mi, NULL))
997
err_msg.append("The slave I/O thread stops because querying master with '");
998
err_msg.append(query);
999
err_msg.append("' failed;");
1000
err_msg.append(" error: ");
1001
err_code= drizzle_errno(drizzle);
1002
err_msg.qs_append(err_code);
1003
err_msg.append(" '");
1004
err_msg.append(drizzle_error(drizzle));
1005
err_msg.append("'");
1006
drizzle_free_result(drizzle_store_result(drizzle));
1009
drizzle_free_result(drizzle_store_result(drizzle));
1013
if (err_msg.length() != 0)
1015
sql_print_error("%s",err_msg.ptr());
1016
assert(err_code != 0);
1017
mi->report(ERROR_LEVEL, err_code, "%s",err_msg.ptr());
1025
static bool wait_for_relay_log_space(Relay_log_info* rli)
1027
bool slave_killed=0;
1028
Master_info* mi = rli->mi;
1029
const char *save_proc_info;
1030
Session* session = mi->io_session;
1032
pthread_mutex_lock(&rli->log_space_lock);
1033
save_proc_info= session->enter_cond(&rli->log_space_cond,
1034
&rli->log_space_lock,
1035
_("Waiting for the slave SQL thread "
1036
"to free enough relay log space"));
1037
while (rli->log_space_limit < rli->log_space_total &&
1038
!(slave_killed=io_slave_killed(session,mi)) &&
1039
!rli->ignore_log_space_limit)
1040
pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1041
session->exit_cond(save_proc_info);
1042
return(slave_killed);
1047
Builds a Rotate from the ignored events' info and writes it to relay log.
1050
write_ignored_events_info_to_relay_log()
1051
session pointer to I/O thread's session
1055
Slave I/O thread, going to die, must leave a durable trace of the
1056
ignored events' end position for the use of the slave SQL thread, by
1057
calling this function. Only that thread can call it (see assertion).
1059
static void write_ignored_events_info_to_relay_log(Session *session,
1062
Relay_log_info *rli= &mi->rli;
1063
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1065
assert(session == mi->io_session);
1066
pthread_mutex_lock(log_lock);
1067
if (rli->ign_master_log_name_end[0])
1069
Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1070
0, rli->ign_master_log_pos_end,
1071
Rotate_log_event::DUP_NAME);
1072
rli->ign_master_log_name_end[0]= 0;
1073
/* can unlock before writing as slave SQL session will soon see our Rotate */
1074
pthread_mutex_unlock(log_lock);
1075
if (likely((bool)ev))
1077
ev->server_id= 0; // don't be ignored by slave SQL thread
1078
if (unlikely(rli->relay_log.append(ev)))
1079
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
1080
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
1081
_("failed to write a Rotate event"
1082
" to the relay log, SHOW SLAVE STATUS may be"
1084
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1086
sql_print_error(_("Failed to flush master info file"));
1090
mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
1091
ER(ER_SLAVE_CREATE_EVENT_FAILURE),
1092
_("Rotate_event (out of memory?),"
1093
" SHOW SLAVE STATUS may be inaccurate"));
1096
pthread_mutex_unlock(log_lock);
1101
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
1102
bool *suppress_warnings)
1104
unsigned char buf[1024], *pos= buf;
1105
uint32_t report_host_len, report_user_len=0, report_password_len=0;
1107
*suppress_warnings= false;
1110
report_host_len= strlen(report_host);
1111
/* 30 is a good safety margin */
1112
if (report_host_len + report_user_len + report_password_len + 30 >
1114
return(0); // safety
1116
int4store(pos, server_id); pos+= 4;
1117
pos= net_store_data(pos, (unsigned char*) report_host, report_host_len);
1118
pos= net_store_data(pos, NULL, report_user_len);
1119
pos= net_store_data(pos, NULL, report_password_len);
1120
int2store(pos, (uint16_t) report_port); pos+= 2;
1121
int4store(pos, 0); pos+= 4;
1122
/* The master will fill in master_id */
1123
int4store(pos, 0); pos+= 4;
1125
if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
1127
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1129
*suppress_warnings= true; // Suppress reconnect warning
1131
else if (!check_io_slave_killed(mi->io_session, mi, NULL))
1134
snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle),
1135
drizzle_errno(drizzle));
1136
mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1137
ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1145
bool show_master_info(Session* session, Master_info* mi)
1147
// TODO: fix this for multi-master
1148
List<Item> field_list;
1149
Protocol *protocol= session->protocol;
1151
field_list.push_back(new Item_empty_string("Slave_IO_State",
1153
field_list.push_back(new Item_empty_string("Master_Host",
1155
field_list.push_back(new Item_empty_string("Master_User",
1157
field_list.push_back(new Item_return_int("Master_Port", 7,
1158
DRIZZLE_TYPE_LONG));
1159
field_list.push_back(new Item_return_int("Connect_Retry", 10,
1160
DRIZZLE_TYPE_LONG));
1161
field_list.push_back(new Item_empty_string("Master_Log_File",
1163
field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
1164
DRIZZLE_TYPE_LONGLONG));
1165
field_list.push_back(new Item_empty_string("Relay_Log_File",
1167
field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
1168
DRIZZLE_TYPE_LONGLONG));
1169
field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
1171
field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
1172
field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1173
field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
1174
field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
1175
field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
1176
field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
1177
field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
1178
field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
1180
field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
1181
field_list.push_back(new Item_empty_string("Last_Error", 20));
1182
field_list.push_back(new Item_return_int("Skip_Counter", 10,
1183
DRIZZLE_TYPE_LONG));
1184
field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
1185
DRIZZLE_TYPE_LONGLONG));
1186
field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
1187
DRIZZLE_TYPE_LONGLONG));
1188
field_list.push_back(new Item_empty_string("Until_Condition", 6));
1189
field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1190
field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1191
DRIZZLE_TYPE_LONGLONG));
1192
field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1193
DRIZZLE_TYPE_LONGLONG));
1194
field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
1195
field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1196
field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
1197
field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
1199
if (protocol->send_fields(&field_list,
1200
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1205
String *packet= &session->packet;
1206
protocol->prepare_for_resend();
1209
slave_running can be accessed without run_lock but not other
1210
non-volotile members like mi->io_session, which is guarded by the mutex.
1212
pthread_mutex_lock(&mi->run_lock);
1213
protocol->store(mi->io_session ? mi->io_session->get_proc_info() : "", &my_charset_bin);
1214
pthread_mutex_unlock(&mi->run_lock);
1216
pthread_mutex_lock(&mi->data_lock);
1217
pthread_mutex_lock(&mi->rli.data_lock);
1218
protocol->store(mi->getHostname(), &my_charset_bin);
1219
protocol->store(mi->getUsername(), &my_charset_bin);
1220
protocol->store((uint32_t) mi->getPort());
1221
protocol->store(mi->getConnectionRetry());
1222
protocol->store(mi->getLogName(), &my_charset_bin);
1223
protocol->store((uint64_t) mi->getLogPosition());
1224
protocol->store(mi->rli.group_relay_log_name.c_str() +
1225
dirname_length(mi->rli.group_relay_log_name.c_str()),
1227
protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1228
protocol->store(mi->rli.group_master_log_name.c_str(), &my_charset_bin);
1229
protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1230
"Yes" : "No", &my_charset_bin);
1231
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1232
protocol->store(rpl_filter->get_do_db());
1233
protocol->store(rpl_filter->get_ignore_db());
1236
String tmp(buf, sizeof(buf), &my_charset_bin);
1237
rpl_filter->get_do_table(&tmp);
1238
protocol->store(&tmp);
1239
rpl_filter->get_ignore_table(&tmp);
1240
protocol->store(&tmp);
1241
rpl_filter->get_wild_do_table(&tmp);
1242
protocol->store(&tmp);
1243
rpl_filter->get_wild_ignore_table(&tmp);
1244
protocol->store(&tmp);
1246
protocol->store(mi->rli.last_error().number);
1247
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1248
protocol->store((uint32_t) mi->rli.slave_skip_counter);
1249
protocol->store((uint64_t) mi->rli.group_master_log_pos);
1250
protocol->store((uint64_t) mi->rli.log_space_total);
1253
mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
1254
( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
1255
"Relay"), &my_charset_bin);
1256
protocol->store(mi->rli.until_log_name, &my_charset_bin);
1257
protocol->store((uint64_t) mi->rli.until_log_pos);
1260
Seconds_Behind_Master: if SQL thread is running and I/O thread is
1261
connected, we can compute it otherwise show NULL (i.e. unknown).
1263
if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
1264
mi->rli.slave_running)
1266
long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
1267
- mi->clock_diff_with_master);
1269
Apparently on some systems time_diff can be <0. Here are possible
1270
reasons related to MySQL:
1271
- the master is itself a slave of another master whose time is ahead.
1272
- somebody used an explicit SET TIMESTAMP on the master.
1273
Possible reason related to granularity-to-second of time functions
1274
(nothing to do with MySQL), which can explain a value of -1:
1275
assume the master's and slave's time are perfectly synchronized, and
1276
that at slave's connection time, when the master's timestamp is read,
1277
it is at the very end of second 1, and (a very short time later) when
1278
the slave's timestamp is read it is at the very beginning of second
1279
2. Then the recorded value for master is 1 and the recorded value for
1280
slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1281
between timestamp of slave and rli->last_master_timestamp is 0
1282
(i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1283
This confuses users, so we don't go below 0: hence the cmax().
1285
last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1286
special marker to say "consider we have caught up".
1288
protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1289
cmax((long)0, time_diff) : 0));
1293
protocol->store_null();
1297
protocol->store(mi->last_error().number);
1299
protocol->store(mi->last_error().message, &my_charset_bin);
1301
protocol->store(mi->rli.last_error().number);
1303
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1305
pthread_mutex_unlock(&mi->rli.data_lock);
1306
pthread_mutex_unlock(&mi->data_lock);
1308
if (my_net_write(&session->net, (unsigned char*) session->packet.ptr(), packet->length()))
1316
void set_slave_thread_options(Session* session)
1319
It's nonsense to constrain the slave threads with max_join_size; if a
1320
query succeeded on master, we HAVE to execute it. So set
1321
OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
1322
(and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
1323
SELECT examining more than 4 billion rows would still fail (yes, because
1324
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1325
only for client threads.
1327
uint64_t options= session->options | OPTION_BIG_SELECTS;
1328
if (opt_log_slave_updates)
1329
options|= OPTION_BIN_LOG;
1331
options&= ~OPTION_BIN_LOG;
1332
session->options= options;
1333
session->variables.completion_type= 0;
1341
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type)
1343
int32_t simulate_error= 0;
1344
session->system_thread = (session_type == SLAVE_Session_SQL) ?
1345
SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1346
session->security_ctx->skip_grants();
1347
my_net_init(&session->net, 0);
1349
Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1350
slave threads, since a replication event can become this much larger
1351
than the corresponding packet (query) sent from client to master.
1353
session->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1354
+ MAX_LOG_EVENT_HEADER; /* note, incr over the global not session var */
1355
session->slave_thread = 1;
1356
set_slave_thread_options(session);
1357
session->client_capabilities = CLIENT_LOCAL_FILES;
1358
pthread_mutex_lock(&LOCK_thread_count);
1359
session->thread_id= session->variables.pseudo_thread_id= thread_id++;
1360
pthread_mutex_unlock(&LOCK_thread_count);
1362
simulate_error|= (1 << SLAVE_Session_IO);
1363
simulate_error|= (1 << SLAVE_Session_SQL);
1364
if (init_thr_lock() || session->store_globals() || simulate_error & (1<< session_type))
1371
if (session_type == SLAVE_Session_SQL)
1372
session->set_proc_info("Waiting for the next event in relay log");
1374
session->set_proc_info("Waiting for master update");
1375
session->version=refresh_version;
1376
session->set_time();
1381
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1382
void* thread_killed_arg)
1385
thr_alarm_t alarmed;
1387
thr_alarm_init(&alarmed);
1388
time_t start_time= my_time(0);
1389
time_t end_time= start_time+sec;
1391
while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1395
The only reason we are asking for alarm is so that
1396
we will be woken up in case of murder, so if we do not get killed,
1397
set the alarm so it goes off after we wake up naturally
1399
thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
1401
thr_end_alarm(&alarmed);
1403
if ((*thread_killed)(session,thread_killed_arg))
1405
start_time= my_time(0);
1411
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
1412
bool *suppress_warnings)
1414
unsigned char buf[FN_REFLEN + 10];
1416
int32_t binlog_flags = 0; // for now
1417
const char* logname = mi->getLogName();
1419
*suppress_warnings= false;
1421
// TODO if big log files: Change next to int8store()
1422
int4store(buf, (uint32_t) mi->getLogPosition());
1423
int2store(buf + 4, binlog_flags);
1424
int4store(buf + 6, server_id);
1425
len = (uint32_t) strlen(logname);
1426
memcpy(buf + 10, logname,len);
1427
if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
1430
Something went wrong, so we will just reconnect and retry later
1431
in the future, we should do a better error analysis, but for
1432
now we just fill up the error log :-)
1434
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1435
*suppress_warnings= true; // Suppress reconnect warning
1437
sql_print_error(_("Error on COM_BINLOG_DUMP: %d %s, will retry in %d secs"),
1438
drizzle_errno(drizzle), drizzle_error(drizzle),
1447
Read one event from the master
1451
DRIZZLE DRIZZLE connection
1452
mi Master connection information
1453
suppress_warnings TRUE when a normal net read timeout has caused us to
1454
try a reconnect. We do not want to print anything to
1455
the error log in this case because this a anormal
1456
event in an idle server.
1459
'packet_error' Error
1460
number Length of packet
1463
static uint32_t read_event(DRIZZLE *drizzle,
1465
bool* suppress_warnings)
1469
*suppress_warnings= false;
1471
my_real_read() will time us out
1472
We check if we were told to die, and if not, try reading again
1474
if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
1475
return(packet_error);
1477
len = cli_safe_read(drizzle);
1478
if (len == packet_error || (int32_t) len < 1)
1480
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1483
We are trying a normal reconnect after a read timeout;
1484
we suppress prints to .err file as long as the reconnect
1485
happens without problems
1487
*suppress_warnings= true;
1490
sql_print_error(_("Error reading packet from server: %s ( server_errno=%d)"),
1491
drizzle_error(drizzle), drizzle_errno(drizzle));
1492
return(packet_error);
1495
/* Check if eof packet */
1496
if (len < 8 && drizzle->net.read_pos[0] == 254)
1498
sql_print_information(_("Slave: received end packet from server, apparent "
1499
"master shutdown: %s"),
1500
drizzle_error(drizzle));
1501
return(packet_error);
1508
int32_t check_expected_error(Session*, Relay_log_info const *,
1509
int32_t expected_error)
1511
switch (expected_error) {
1512
case ER_NET_READ_ERROR:
1513
case ER_NET_ERROR_ON_WRITE:
1514
case ER_QUERY_INTERRUPTED:
1515
case ER_SERVER_SHUTDOWN:
1516
case ER_NEW_ABORTING_CONNECTION:
1525
Check if the current error is of temporary nature of not.
1526
Some errors are temporary in nature, such as
1527
ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. Ndb also signals
1528
that the error is temporary by pushing a warning with the error code
1529
ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1531
static int32_t has_temporary_error(Session *session)
1533
if (session->is_fatal_error)
1536
if (session->main_da.is_error())
1538
session->clear_error();
1539
my_error(ER_LOCK_DEADLOCK, MYF(0));
1543
If there is no message in Session, we can't say if it's a temporary
1544
error or not. This is currently the case for Incident_log_event,
1545
which sets no message. Return FALSE.
1547
if (!session->is_error())
1551
Temporary error codes:
1552
currently, InnoDB deadlock detected by InnoDB or lock
1553
wait timeout (innodb_lock_wait_timeout exceeded
1555
if (session->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
1556
session->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
1564
Applies the given event and advances the relay log position.
1566
In essence, this function does:
1569
ev->apply_event(rli);
1570
ev->update_pos(rli);
1573
But it also does some maintainance, such as skipping events if
1574
needed and reporting errors.
1576
If the @c skip flag is set, then it is tested whether the event
1577
should be skipped, by looking at the slave_skip_counter and the
1578
server id. The skip flag should be set when calling this from a
1579
replication thread but not set when executing an explicit BINLOG
1584
@retval 1 Error calling ev->apply_event().
1586
@retval 2 No error calling ev->apply_event(), but error calling
1589
int32_t apply_event_and_update_pos(Log_event* ev, Session* session, Relay_log_info* rli,
1592
int32_t exec_res= 0;
1595
Execute the event to change the database and update the binary
1596
log coordinates, but first we set some data that is needed for
1599
The event will be executed unless it is supposed to be skipped.
1601
Queries originating from this server must be skipped. Low-level
1602
events (Format_description_log_event, Rotate_log_event,
1603
Stop_log_event) from this server must also be skipped. But for
1604
those we don't want to modify 'group_master_log_pos', because
1605
these events did not exist on the master.
1606
Format_description_log_event is not completely skipped.
1608
Skip queries specified by the user in 'slave_skip_counter'. We
1609
can't however skip events that has something to do with the log
1612
Filtering on own server id is extremely important, to ignore
1613
execution of events created by the creation/rotation of the relay
1614
log (remember that now the relay log starts with its Format_desc,
1618
session->server_id = ev->server_id; // use the original server id for logging
1619
session->set_time(); // time the query
1620
session->lex->current_select= 0;
1622
ev->when= my_time(0);
1623
ev->session = session; // because up to this point, ev->session == 0
1627
int32_t reason= ev->shall_skip(rli);
1628
if (reason == Log_event::EVENT_SKIP_COUNT)
1629
--rli->slave_skip_counter;
1630
pthread_mutex_unlock(&rli->data_lock);
1631
if (reason == Log_event::EVENT_SKIP_NOT)
1632
exec_res= ev->apply_event(rli);
1635
exec_res= ev->apply_event(rli);
1639
int32_t error= ev->update_pos(rli);
1641
The update should not fail, so print an error message and
1642
return an error code.
1644
TODO: Replace this with a decent error message when merged
1645
with BUG#24954 (which adds several new error message).
1650
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
1651
_("It was not possible to update the positions"
1652
" of the relay log information: the slave may"
1653
" be in an inconsistent state."
1654
" Stopped in %s position %s"),
1655
rli->group_relay_log_name.c_str(),
1656
llstr(rli->group_relay_log_pos, buf));
1661
return(exec_res ? 1 : 0);
1666
Top-level function for executing the next event from the relay log.
1668
This function reads the event from the relay log, executes it, and
1669
advances the relay log position. It also handles errors, etc.
1671
This function may fail to apply the event for the following reasons:
1673
- The position specfied by the UNTIL condition of the START SLAVE
1676
- It was not possible to read the event from the log.
1678
- The slave is killed.
1680
- An error occurred when applying the event, and the event has been
1681
tried slave_trans_retries times. If the event has been retried
1682
fewer times, 0 is returned.
1684
- init_master_info or init_relay_log_pos failed. (These are called
1685
if a failure occurs when applying the event.)</li>
1687
- An error occurred when updating the binlog position.
1689
@retval 0 The event was applied.
1691
@retval 1 The event was not applied.
1693
static int32_t exec_relay_log_event(Session* session, Relay_log_info* rli)
1696
We acquire this mutex since we need it for all operations except
1697
event execution. But we will release it in places where we will
1698
wait for something for example inside of next_event().
1700
pthread_mutex_lock(&rli->data_lock);
1702
Log_event * ev = next_event(rli);
1704
assert(rli->sql_session==session);
1706
if (sql_slave_killed(session,rli))
1708
pthread_mutex_unlock(&rli->data_lock);
1717
This tests if the position of the beginning of the current event
1718
hits the UNTIL barrier.
1720
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
1721
rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
1722
rli->group_master_log_pos :
1723
ev->log_pos - ev->data_written))
1726
sql_print_information(_("Slave SQL thread stopped because it reached its"
1727
" UNTIL position %s"),
1728
llstr(rli->until_pos(), buf));
1730
Setting abort_slave flag because we do not want additional message about
1731
error in query execution to be printed.
1733
rli->abort_slave= 1;
1734
pthread_mutex_unlock(&rli->data_lock);
1738
exec_res= apply_event_and_update_pos(ev, session, rli, true);
1741
Format_description_log_event should not be deleted because it will be
1742
used to read info about the relay log's format; it will be deleted when
1743
the SQL thread does not need it, i.e. when this thread terminates.
1745
if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
1751
update_log_pos failed: this should not happen, so we don't
1757
if (slave_trans_retries)
1759
int32_t temp_err= 0;
1760
if (exec_res && (temp_err= has_temporary_error(session)))
1764
We were in a transaction which has been rolled back because of a
1766
let's seek back to BEGIN log event and retry it all again.
1767
Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
1768
there is no rollback since 5.0.13 (ref: manual).
1769
We have to not only seek but also
1770
a) init_master_info(), to seek back to hot relay log's start for later
1771
(for when we will come back to this hot log after re-processing the
1772
possibly existing old logs where BEGIN is: check_binlog_magic() will
1773
then need the cache to be at position 0 (see comments at beginning of
1774
init_master_info()).
1775
b) init_relay_log_pos(), because the BEGIN may be an older relay log.
1777
if (rli->trans_retries < slave_trans_retries)
1779
if (rli->mi->init_master_info(0, 0, SLAVE_SQL))
1780
sql_print_error(_("Failed to initialize the master info structure"));
1781
else if (init_relay_log_pos(rli,
1782
rli->group_relay_log_name.c_str(),
1783
rli->group_relay_log_pos,
1785
sql_print_error(_("Error initializing relay log position: %s"),
1790
end_trans(session, ROLLBACK);
1791
/* chance for concurrent connection to get more locks */
1792
safe_sleep(session, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1793
(CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1794
pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1795
rli->trans_retries++;
1796
rli->retried_trans++;
1797
pthread_mutex_unlock(&rli->data_lock);
1801
sql_print_error(_("Slave SQL thread retried transaction %lu time(s) "
1802
"in vain, giving up. Consider raising the value of "
1803
"the slave_transaction_retries variable."),
1804
slave_trans_retries);
1806
else if ((exec_res && !temp_err) ||
1807
(opt_using_transactions &&
1808
rli->group_relay_log_pos == rli->event_relay_log_pos))
1811
Only reset the retry counter if the entire group succeeded
1812
or failed with a non-transient error. On a successful
1813
event, the execution will proceed as usual; in the case of a
1814
non-transient error, the slave will stop with an error.
1816
rli->trans_retries= 0; // restart from fresh
1821
pthread_mutex_unlock(&rli->data_lock);
1822
rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
1823
ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
1824
_("Could not parse relay log event entry. The possible reasons "
1825
"are: the master's binary log is corrupted (you can check this "
1826
"by running 'mysqlbinlog' on the binary log), the slave's "
1827
"relay log is corrupted (you can check this by running "
1828
"'mysqlbinlog' on the relay log), a network problem, or a bug "
1829
"in the master's or slave's DRIZZLE code. If you want to check "
1830
"the master's binary log or slave's relay log, you will be "
1831
"able to know their names by issuing 'SHOW SLAVE STATUS' "
1838
@brief Try to reconnect slave IO thread.
1840
@details Terminates current connection to master, sleeps for
1841
@c mi->connect_retry msecs and initiates new connection with
1842
@c safe_reconnect(). Variable pointed by @c retry_count is increased -
1843
if it exceeds @c master_retry_count then connection is not re-established
1844
and function signals error.
1845
Unless @c suppres_warnings is TRUE, a warning is put in the server error log
1846
when reconnecting. The warning message and messages used to report errors
1847
are taken from @c messages array. In case @c master_retry_count is exceeded,
1848
no messages are added to the log.
1850
@param[in] session Thread context.
1851
@param[in] DRIZZLE DRIZZLE connection.
1852
@param[in] mi Master connection information.
1853
@param[in,out] retry_count Number of attempts to reconnect.
1854
@param[in] suppress_warnings TRUE when a normal net read timeout
1855
has caused to reconnecting.
1856
@param[in] messages Messages to print/log, see
1857
reconnect_messages[] array.
1860
@retval 1 There was an error.
1863
static int32_t try_to_reconnect(Session *session, DRIZZLE *drizzle, Master_info *mi,
1864
uint32_t *retry_count, bool suppress_warnings,
1865
const char *messages[SLAVE_RECON_MSG_MAX])
1867
mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1868
session->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
1869
drizzle_disconnect(drizzle);
1870
if ((*retry_count)++)
1872
if (*retry_count > master_retry_count)
1873
return 1; // Don't retry forever
1874
safe_sleep(session, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1877
if (check_io_slave_killed(session, mi,
1878
_(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1880
session->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
1881
if (!suppress_warnings)
1883
char buf[256], llbuff[22];
1884
snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1885
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
1887
Raise a warining during registering on master/requesting dump.
1888
Log a message reading event.
1890
if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
1892
mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1893
ER(ER_SLAVE_MASTER_COM_FAILURE),
1894
_(messages[SLAVE_RECON_MSG_COMMAND]), buf);
1898
sql_print_information("%s",buf);
1901
if (safe_reconnect(session, drizzle, mi, 1) || io_slave_killed(session, mi))
1903
if (global_system_variables.log_warnings)
1904
sql_print_information("%s",_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1911
/* Slave I/O Thread entry point */
1913
pthread_handler_t handle_slave_io(void *arg)
1915
Session *session; // needs to be first for thread_stack
1917
Master_info *mi = (Master_info*)arg;
1918
Relay_log_info *rli= &mi->rli;
1920
uint32_t retry_count;
1921
bool suppress_warnings;
1922
uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
1929
pthread_mutex_lock(&mi->run_lock);
1930
/* Inform waiting threads that slave has started */
1933
mi->events_till_disconnect = disconnect_slave_event_count;
1935
session= new Session;
1936
Session_CHECK_SENTRY(session);
1937
mi->io_session = session;
1939
pthread_detach_this_thread();
1940
session->thread_stack= (char*) &session; // remember where our stack is
1941
if (init_slave_thread(session, SLAVE_Session_IO))
1943
pthread_cond_broadcast(&mi->start_cond);
1944
pthread_mutex_unlock(&mi->run_lock);
1945
sql_print_error(_("Failed during slave I/O thread initialization"));
1948
pthread_mutex_lock(&LOCK_thread_count);
1949
threads.append(session);
1950
pthread_mutex_unlock(&LOCK_thread_count);
1951
mi->slave_running = 1;
1952
mi->abort_slave = 0;
1953
pthread_mutex_unlock(&mi->run_lock);
1954
pthread_cond_broadcast(&mi->start_cond);
1956
if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
1958
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
1959
ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
1963
session->set_proc_info("Connecting to master");
1964
// we can get killed during safe_connect
1965
if (!safe_connect(session, drizzle, mi))
1967
sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
1968
"replication started in log '%s' at position %s"),
1969
mi->getUsername(), mi->getHostname(), mi->getPort(),
1971
llstr(mi->getLogPosition(), llbuff));
1973
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
1974
thread, since a replication event can become this much larger than
1975
the corresponding packet (query) sent from client to master.
1977
drizzle->net.max_packet_size= session->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
1981
sql_print_information(_("Slave I/O thread killed while connecting to master"));
1987
// TODO: the assignment below should be under mutex (5.0)
1988
mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
1989
session->slave_net = &drizzle->net;
1990
session->set_proc_info("Checking master version");
1991
if (get_master_version_and_clock(drizzle, mi))
1994
if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
1997
Register ourselves with the master.
1999
session->set_proc_info("Registering slave on master");
2000
if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2002
if (!check_io_slave_killed(session, mi, "Slave I/O thread killed "
2003
"while registering slave on master"))
2005
sql_print_error(_("Slave I/O thread couldn't register on master"));
2006
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2007
reconnect_messages[SLAVE_RECON_ACT_REG]))
2014
if (!retry_count_reg)
2017
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2018
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2019
reconnect_messages[SLAVE_RECON_ACT_REG]))
2025
while (!io_slave_killed(session,mi))
2027
session->set_proc_info("Requesting binlog dump");
2028
if (request_dump(drizzle, mi, &suppress_warnings))
2030
sql_print_error(_("Failed on request_dump()"));
2031
if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while \
2032
requesting master dump")) ||
2033
try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2034
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2038
if (!retry_count_dump)
2041
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2042
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2043
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2048
while (!io_slave_killed(session,mi))
2052
We say "waiting" because read_event() will wait if there's nothing to
2053
read. But if there's something to read, it will not wait. The
2054
important thing is to not confuse users by saying "reading" whereas
2055
we're in fact receiving nothing.
2057
session->set_proc_info(_("Waiting for master to send event"));
2058
event_len= read_event(drizzle, mi, &suppress_warnings);
2059
if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while "
2062
if (!retry_count_event)
2064
retry_count_event++;
2065
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2066
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2067
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2072
if (event_len == packet_error)
2074
uint32_t drizzle_error_number= drizzle_errno(drizzle);
2075
switch (drizzle_error_number) {
2076
case CR_NET_PACKET_TOO_LARGE:
2077
sql_print_error(_("Log entry on master is longer than "
2078
"max_allowed_packet (%ld) on "
2079
"slave. If the entry is correct, restart the "
2080
"server with a higher value of "
2081
"max_allowed_packet"),
2082
session->variables.max_allowed_packet);
2084
case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2085
sql_print_error(ER(drizzle_error_number), drizzle_error_number,
2086
drizzle_error(drizzle));
2088
case EE_OUTOFMEMORY:
2089
case ER_OUTOFMEMORY:
2091
_("Stopping slave I/O thread due to out-of-memory error from master"));
2094
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2095
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2098
} // if (event_len == packet_error)
2100
retry_count=0; // ok event, reset retry counter
2101
session->set_proc_info(_("Queueing master event to the relay log"));
2102
if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2108
sql_print_error(_("Failed to flush master info file"));
2112
See if the relay logs take too much space.
2113
We don't lock mi->rli.log_space_lock here; this dirty read saves time
2114
and does not introduce any problem:
2115
- if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
2116
the clean value is 0), then we are reading only one more event as we
2117
should, and we'll block only at the next event. No big deal.
2118
- if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
2119
the clean value is 1), then we are going into wait_for_relay_log_space()
2120
for no reason, but this function will do a clean read, notice the clean
2121
value and exit immediately.
2123
if (rli->log_space_limit && rli->log_space_limit <
2124
rli->log_space_total &&
2125
!rli->ignore_log_space_limit)
2126
if (wait_for_relay_log_space(rli))
2128
sql_print_error(_("Slave I/O thread aborted while waiting for "
2129
"relay log space"));
2137
// print the current replication position
2138
sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2140
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
2141
pthread_mutex_lock(&LOCK_thread_count);
2142
session->query = session->db = 0; // extra safety
2143
session->query_length= session->db_length= 0;
2144
pthread_mutex_unlock(&LOCK_thread_count);
2148
Here we need to clear the active VIO before closing the
2149
connection with the master. The reason is that Session::awake()
2150
might be called from terminate_slave_thread() because somebody
2151
issued a STOP SLAVE. If that happends, the close_active_vio()
2152
can be called in the middle of closing the VIO associated with
2153
the 'mysql' object, causing a crash.
2155
drizzle_close(drizzle);
2158
write_ignored_events_info_to_relay_log(session, mi);
2159
session->set_proc_info(_("Waiting for slave mutex on exit"));
2160
pthread_mutex_lock(&mi->run_lock);
2162
/* Forget the relay log's format */
2163
delete mi->rli.relay_log.description_event_for_queue;
2164
mi->rli.relay_log.description_event_for_queue= 0;
2165
assert(session->net.buff != 0);
2166
net_end(&session->net); // destructor will not free it, because net.vio is 0
2167
close_thread_tables(session);
2168
pthread_mutex_lock(&LOCK_thread_count);
2169
Session_CHECK_SENTRY(session);
2171
pthread_mutex_unlock(&LOCK_thread_count);
2173
mi->slave_running= 0;
2176
Note: the order of the two following calls (first broadcast, then unlock)
2177
is important. Otherwise a killer_thread can execute between the calls and
2178
delete the mi structure leading to a crash! (see BUG#25306 for details)
2180
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
2181
pthread_mutex_unlock(&mi->run_lock);
2184
return(0); // Can't return anything here
2188
/* Slave SQL Thread entry point */
2190
pthread_handler_t handle_slave_sql(void *arg)
2192
Session *session; /* needs to be first for thread_stack */
2193
char llbuff[22],llbuff1[22];
2195
Relay_log_info* rli = &((Master_info*)arg)->rli;
2200
assert(rli->inited);
2201
pthread_mutex_lock(&rli->run_lock);
2202
assert(!rli->slave_running);
2204
rli->events_till_abort = abort_slave_event_count;
2206
session = new Session;
2207
session->thread_stack = (char*)&session; // remember where our stack is
2208
rli->sql_session= session;
2210
/* Inform waiting threads that slave has started */
2211
rli->slave_run_id++;
2212
rli->slave_running = 1;
2214
pthread_detach_this_thread();
2215
if (init_slave_thread(session, SLAVE_Session_SQL))
2218
TODO: this is currently broken - slave start and change master
2219
will be stuck if we fail here
2221
pthread_cond_broadcast(&rli->start_cond);
2222
pthread_mutex_unlock(&rli->run_lock);
2223
sql_print_error(_("Failed during slave thread initialization"));
2226
session->init_for_queries();
2227
session->temporary_tables = rli->save_temporary_tables; // restore temp tables
2228
pthread_mutex_lock(&LOCK_thread_count);
2229
threads.append(session);
2230
pthread_mutex_unlock(&LOCK_thread_count);
2232
We are going to set slave_running to 1. Assuming slave I/O thread is
2233
alive and connected, this is going to make Seconds_Behind_Master be 0
2234
i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
2235
the moment we start we can think we are caught up, and the next second we
2236
start receiving data so we realize we are not caught up and
2237
Seconds_Behind_Master grows. No big deal.
2239
rli->abort_slave = 0;
2240
pthread_mutex_unlock(&rli->run_lock);
2241
pthread_cond_broadcast(&rli->start_cond);
2244
Reset errors for a clean start (otherwise, if the master is idle, the SQL
2245
thread may execute no Query_log_event, so the error will remain even
2246
though there's no problem anymore). Do not reset the master timestamp
2247
(imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
2248
as we are not sure that we are going to receive a query, we want to
2249
remember the last master timestamp (to say how many seconds behind we are
2251
But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
2255
//tell the I/O thread to take relay_log_space_limit into account from now on
2256
pthread_mutex_lock(&rli->log_space_lock);
2257
rli->ignore_log_space_limit= 0;
2258
pthread_mutex_unlock(&rli->log_space_lock);
2259
rli->trans_retries= 0; // start from "no error"
2261
if (init_relay_log_pos(rli,
2262
rli->group_relay_log_name.c_str(),
2263
rli->group_relay_log_pos,
2264
1 /*need data lock*/, &errmsg,
2265
1 /*look for a description_event*/))
2267
sql_print_error(_("Error initializing relay log position: %s"),
2271
Session_CHECK_SENTRY(session);
2272
assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2274
Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2275
correct position when it's called just after my_b_seek() (the questionable
2276
stuff is those "seek is done on next read" comments in the my_b_seek()
2278
The crude reality is that this assertion randomly fails whereas
2279
replication seems to work fine. And there is no easy explanation why it
2280
fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
2281
init_relay_log_pos() called above). Maybe the assertion would be
2282
meaningful if we held rli->data_lock between the my_b_seek() and the
2285
assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2286
assert(rli->sql_session == session);
2288
if (global_system_variables.log_warnings)
2289
sql_print_information(_("Slave SQL thread initialized, "
2290
"starting replication in log '%s' at "
2291
"position %s, relay log '%s' position: %s"),
2293
llstr(rli->group_master_log_pos,llbuff),
2294
rli->group_relay_log_name.c_str(),
2295
llstr(rli->group_relay_log_pos,llbuff1));
2297
/* execute init_slave variable */
2298
if (sys_init_slave.value_length)
2300
execute_init_command(session, &sys_init_slave, &LOCK_sys_init_slave);
2301
if (session->is_slave_error)
2303
sql_print_error(_("Slave SQL thread aborted. "
2304
"Can't execute init_slave query"));
2310
First check until condition - probably there is nothing to execute. We
2311
do not want to wait for next event in this case.
2313
pthread_mutex_lock(&rli->data_lock);
2314
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
2315
rli->is_until_satisfied(rli->group_master_log_pos))
2318
sql_print_information(_("Slave SQL thread stopped because it reached its"
2319
" UNTIL position %s"), llstr(rli->until_pos(), buf));
2320
pthread_mutex_unlock(&rli->data_lock);
2323
pthread_mutex_unlock(&rli->data_lock);
2325
/* Read queries from the IO/THREAD until this thread is killed */
2327
while (!sql_slave_killed(session,rli))
2329
session->set_proc_info(_("Reading event from the relay log"));
2330
assert(rli->sql_session == session);
2331
Session_CHECK_SENTRY(session);
2332
if (exec_relay_log_event(session,rli))
2334
// do not scare the user if SQL thread was simply killed or stopped
2335
if (!sql_slave_killed(session,rli))
2338
retrieve as much info as possible from the session and, error
2339
codes and warnings and print this to the error log as to
2340
allow the user to locate the error
2342
uint32_t const last_errno= rli->last_error().number;
2344
if (session->is_error())
2346
char const *const errmsg= session->main_da.message();
2348
if (last_errno == 0)
2350
rli->report(ERROR_LEVEL, session->main_da.sql_errno(), "%s", errmsg);
2352
else if (last_errno != session->main_da.sql_errno())
2354
sql_print_error(_("Slave (additional info): %s Error_code: %d"),
2355
errmsg, session->main_da.sql_errno());
2359
/* Print any warnings issued */
2360
List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
2363
Added controlled slave thread cancel for replication
2364
of user-defined variables.
2366
bool udf_error = false;
2369
if (err->code == ER_CANT_OPEN_LIBRARY)
2371
sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
2374
sql_print_error(_("Error loading user-defined library, slave SQL "
2375
"thread aborted. Install the missing library, "
2376
"and restart the slave SQL thread with "
2377
"\"SLAVE START\". We stopped at log '%s' "
2379
RPL_LOG_NAME, llstr(rli->group_master_log_pos,
2382
sql_print_error(_("Error running query, slave SQL thread aborted. "
2383
"Fix the problem, and restart "
2384
"the slave SQL thread with \"SLAVE START\". "
2385
"We stopped at log '%s' position %s"),
2387
llstr(rli->group_master_log_pos, llbuff));
2393
/* Thread stopped. Print the current replication position to the log */
2394
sql_print_information(_("Slave SQL thread exiting, replication stopped in "
2395
"log '%s' at position %s"),
2397
llstr(rli->group_master_log_pos,llbuff));
2402
Some events set some playgrounds, which won't be cleared because thread
2403
stops. Stopping of this thread may not be known to these events ("stop"
2404
request is detected only by the present function, not by events), so we
2405
must "proactively" clear playgrounds:
2407
rli->cleanup_context(session, 1);
2408
pthread_mutex_lock(&LOCK_thread_count);
2410
Some extra safety, which should not been needed (normally, event deletion
2411
should already have done these assignments (each event which sets these
2412
variables is supposed to set them to 0 before terminating)).
2414
session->query= session->db= session->catalog= 0;
2415
session->query_length= session->db_length= 0;
2416
pthread_mutex_unlock(&LOCK_thread_count);
2417
session->set_proc_info("Waiting for slave mutex on exit");
2418
pthread_mutex_lock(&rli->run_lock);
2419
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
2420
pthread_mutex_lock(&rli->data_lock);
2421
assert(rli->slave_running == 1); // tracking buffer overrun
2422
/* When master_pos_wait() wakes up it will check this and terminate */
2423
rli->slave_running= 0;
2424
/* Forget the relay log's format */
2425
delete rli->relay_log.description_event_for_exec;
2426
rli->relay_log.description_event_for_exec= 0;
2427
/* Wake up master_pos_wait() */
2428
pthread_mutex_unlock(&rli->data_lock);
2429
pthread_cond_broadcast(&rli->data_cond);
2430
rli->ignore_log_space_limit= 0; /* don't need any lock */
2431
rli->save_temporary_tables = session->temporary_tables;
2434
TODO: see if we can do this conditionally in next_event() instead
2435
to avoid unneeded position re-init
2437
session->temporary_tables = 0; // remove tempation from destructor to close them
2438
assert(session->net.buff != 0);
2439
net_end(&session->net); // destructor will not free it, because we are weird
2440
assert(rli->sql_session == session);
2441
Session_CHECK_SENTRY(session);
2442
rli->sql_session= 0;
2443
pthread_mutex_lock(&LOCK_thread_count);
2444
Session_CHECK_SENTRY(session);
2446
pthread_mutex_unlock(&LOCK_thread_count);
2448
Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
2449
is important. Otherwise a killer_thread can execute between the calls and
2450
delete the mi structure leading to a crash! (see BUG#25306 for details)
2452
pthread_cond_broadcast(&rli->stop_cond);
2453
pthread_mutex_unlock(&rli->run_lock); // tell the world we are done
2457
return(0); // Can't return anything here
2462
process_io_create_file()
2465
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
2469
bool cev_not_written;
2470
Session *session = mi->io_session;
2471
NET *net = &mi->drizzle->net;
2473
if (unlikely(!cev->is_valid()))
2476
if (!rpl_filter->db_ok(cev->db))
2478
skip_load_data_infile(net);
2481
assert(cev->inited_from_old);
2482
session->file_id = cev->file_id = mi->file_id++;
2483
session->server_id = cev->server_id;
2484
cev_not_written = 1;
2486
if (unlikely(net_request_file(net,cev->fname)))
2488
sql_print_error(_("Slave I/O: failed requesting download of '%s'"),
2494
This dummy block is so we could instantiate Append_block_log_event
2495
once and then modify it slightly instead of doing it multiple times
2499
Append_block_log_event aev(session,0,0,0,0);
2503
if (unlikely((num_bytes=my_net_read(net)) == packet_error))
2505
sql_print_error(_("Network read error downloading '%s' from master"),
2509
if (unlikely(!num_bytes)) /* eof */
2511
/* 3.23 master wants it */
2512
net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0);
2514
If we wrote Create_file_log_event, then we need to write
2515
Execute_load_log_event. If we did not write Create_file_log_event,
2516
then this is an empty file and we can just do as if the LOAD DATA
2517
INFILE had not existed, i.e. write nothing.
2519
if (unlikely(cev_not_written))
2521
Execute_load_log_event xev(session,0,0);
2522
xev.log_pos = cev->log_pos;
2523
if (unlikely(mi->rli.relay_log.append(&xev)))
2525
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2526
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2527
_("error writing Exec_load event to relay log"));
2530
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2533
if (unlikely(cev_not_written))
2535
cev->block = net->read_pos;
2536
cev->block_len = num_bytes;
2537
if (unlikely(mi->rli.relay_log.append(cev)))
2539
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2540
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2541
_("error writing Create_file event to relay log"));
2545
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2549
aev.block = net->read_pos;
2550
aev.block_len = num_bytes;
2551
aev.log_pos = cev->log_pos;
2552
if (unlikely(mi->rli.relay_log.append(&aev)))
2554
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2555
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2556
_("error writing Append_block event to relay log"));
2559
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
2570
Start using a new binary log on the master
2574
mi master_info for the slave
2575
rev The rotate log event read from the binary log
2578
Updates the master info with the place in the next binary
2579
log where we should start reading.
2580
Rotate the relay log to avoid mixed-format relay logs.
2583
We assume we already locked mi->data_lock
2587
1 Log event is illegal
2591
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
2593
safe_mutex_assert_owner(&mi->data_lock);
2595
if (unlikely(!rev->is_valid()))
2598
/* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2599
mi->setLogName(rev->new_log_ident);
2600
mi->setLogPosition(rev->pos);
2602
If we do not do this, we will be getting the first
2603
rotate event forever, so we need to not disconnect after one.
2605
if (disconnect_slave_event_count)
2606
mi->events_till_disconnect++;
2609
If description_event_for_queue is format <4, there is conversion in the
2610
relay log to the slave's format (4). And Rotate can mean upgrade or
2611
nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
2612
no need to reset description_event_for_queue now. And if it's nothing (same
2613
master version as before), no need (still using the slave's format).
2615
if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
2617
delete mi->rli.relay_log.description_event_for_queue;
2618
/* start from format 3 (DRIZZLE 4.0) again */
2619
mi->rli.relay_log.description_event_for_queue= new
2620
Format_description_log_event(3);
2623
Rotate the relay log makes binlog format detection easier (at next slave
2624
start or mysqlbinlog)
2626
rotate_relay_log(mi); /* will take the right mutexes */
2631
Reads a 3.23 event and converts it to the slave's format. This code was
2632
copied from DRIZZLE 4.0.
2634
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
2637
const char *errmsg = 0;
2639
bool ignore_event= 0;
2641
Relay_log_info *rli= &mi->rli;
2644
If we get Load event, we need to pass a non-reusable buffer
2645
to read_log_event, so we do a trick
2647
if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
2649
if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
2651
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2652
ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
2655
memcpy(tmp_buf,buf,event_len);
2657
Create_file constructor wants a 0 as last char of buffer, this 0 will
2658
serve as the string-termination char for the file's name (which is at the
2660
We must increment event_len, otherwise the event constructor will not see
2661
this end 0, which leads to segfault.
2663
tmp_buf[event_len++]=0;
2664
int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
2665
buf = (const char*)tmp_buf;
2668
This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
2669
send the loaded file, and write it to the relay log in the form of
2670
Append_block/Exec_load (the SQL thread needs the data, as that thread is not
2671
connected to the master).
2673
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2674
mi->rli.relay_log.description_event_for_queue);
2677
sql_print_error(_("Read invalid event from master: '%s', "
2678
"master could be corrupt but a more likely cause "
2679
"of this is a bug"),
2681
free((char*) tmp_buf);
2685
pthread_mutex_lock(&mi->data_lock);
2686
ev->log_pos= mi->getLogPosition(); /* 3.23 events don't contain log_pos */
2687
switch (ev->get_type_code()) {
2693
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2696
pthread_mutex_unlock(&mi->data_lock);
2701
case CREATE_FILE_EVENT:
2703
Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
2704
queue_old_event() which is for 3.23 events which don't comprise
2705
CREATE_FILE_EVENT. This is because read_log_event() above has just
2706
transformed LOAD_EVENT into CREATE_FILE_EVENT.
2709
/* We come here when and only when tmp_buf != 0 */
2710
assert(tmp_buf != 0);
2712
ev->log_pos+= inc_pos;
2713
int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2715
mi->incrementLogPosition(inc_pos);
2716
pthread_mutex_unlock(&mi->data_lock);
2717
free((char*)tmp_buf);
2724
if (likely(!ignore_event))
2728
Don't do it for fake Rotate events (see comment in
2729
Log_event::Log_event(const char* buf...) in log_event.cc).
2731
ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
2732
if (unlikely(rli->relay_log.append(ev)))
2735
pthread_mutex_unlock(&mi->data_lock);
2738
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2741
mi->incrementLogPosition(inc_pos);
2742
pthread_mutex_unlock(&mi->data_lock);
2747
Reads a 4.0 event and converts it to the slave's format. This code was copied
2748
from queue_binlog_ver_1_event(), with some affordable simplifications.
2750
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
2753
const char *errmsg = 0;
2756
Relay_log_info *rli= &mi->rli;
2758
/* read_log_event() will adjust log_pos to be end_log_pos */
2759
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2760
mi->rli.relay_log.description_event_for_queue);
2763
sql_print_error(_("Read invalid event from master: '%s', "
2764
"master could be corrupt but a more likely cause of "
2767
free((char*) tmp_buf);
2770
pthread_mutex_lock(&mi->data_lock);
2771
switch (ev->get_type_code()) {
2775
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2778
pthread_mutex_unlock(&mi->data_lock);
2787
if (unlikely(rli->relay_log.append(ev)))
2790
pthread_mutex_unlock(&mi->data_lock);
2793
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2795
mi->incrementLogPosition(inc_pos);
2797
pthread_mutex_unlock(&mi->data_lock);
2804
Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
2805
(exactly, slave's) format. To do the conversion, we create a 5.0 event from
2806
the 3.23/4.0 bytes, then write this event to the relay log.
2809
Test this code before release - it has to be tested on a separate
2810
setup with 3.23 master or 4.0 master
2813
static int32_t queue_old_event(Master_info *mi, const char *buf,
2816
switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
2819
return(queue_binlog_ver_1_event(mi,buf,event_len));
2821
return(queue_binlog_ver_3_event(mi,buf,event_len));
2822
default: /* unsupported format; eg version 2 */
2830
If the event is 3.23/4.0, passes it to queue_old_event() which will convert
2831
it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
2832
no format conversion, it's pure read/write of bytes.
2833
So a 5.0.0 slave's relay log can contain events in the slave's format or in
2837
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
2841
uint32_t inc_pos= 0;
2842
Relay_log_info *rli= &mi->rli;
2843
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
2846
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
2847
buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
2848
return(queue_old_event(mi,buf,event_len));
2850
pthread_mutex_lock(&mi->data_lock);
2852
switch (buf[EVENT_TYPE_OFFSET]) {
2855
We needn't write this event to the relay log. Indeed, it just indicates a
2856
master server shutdown. The only thing this does is cleaning. But
2857
cleaning is already done on a per-master-thread basis (as the master
2858
server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
2859
prepared statements' deletion are TODO only when we binlog prep stmts).
2861
We don't even increment mi->master_log_pos, because we may be just after
2862
a Rotate event. Btw, in a few milliseconds we are going to have a Start
2863
event from the next binlog (unless the master is presently running
2869
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
2870
if (unlikely(process_io_rotate(mi,&rev)))
2872
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2876
Now the I/O thread has just changed its mi->master_log_name, so
2877
incrementing mi->master_log_pos is nonsense.
2882
case FORMAT_DESCRIPTION_EVENT:
2885
Create an event, and save it (when we rotate the relay log, we will have
2886
to write this event again).
2889
We are the only thread which reads/writes description_event_for_queue.
2890
The relay_log struct does not move (though some members of it can
2891
change), so we needn't any lock (no rli->data_lock, no log lock).
2893
Format_description_log_event* tmp;
2895
if (!(tmp= (Format_description_log_event*)
2896
Log_event::read_log_event(buf, event_len, &errmsg,
2897
mi->rli.relay_log.description_event_for_queue)))
2899
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2902
delete mi->rli.relay_log.description_event_for_queue;
2903
mi->rli.relay_log.description_event_for_queue= tmp;
2905
Though this does some conversion to the slave's format, this will
2906
preserve the master's binlog format version, and number of event types.
2909
If the event was not requested by the slave (the slave did not ask for
2910
it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
2912
inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
2916
case HEARTBEAT_LOG_EVENT:
2919
HB (heartbeat) cannot come before RL (Relay)
2922
Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
2925
error= ER_SLAVE_HEARTBEAT_FAILURE;
2926
error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2927
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2928
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2929
error_msg.append(STRING_WITH_LEN(" log_pos "));
2930
llstr(hb.log_pos, llbuf);
2931
error_msg.append(llbuf, strlen(llbuf));
2934
mi->received_heartbeats++;
2936
compare local and event's versions of log_file, log_pos.
2938
Heartbeat is sent only after an event corresponding to the corrdinates
2939
the heartbeat carries.
2940
Slave can not have a difference in coordinates except in the only
2941
special case when mi->master_log_name, master_log_pos have never
2942
been updated by Rotate event i.e when slave does not have any history
2943
with the master (and thereafter mi->master_log_pos is NULL).
2945
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2947
if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
2948
|| mi->getLogPosition() != hb.log_pos)
2950
/* missed events of heartbeat from the past */
2951
error= ER_SLAVE_HEARTBEAT_FAILURE;
2952
error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
2953
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2954
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2955
error_msg.append(STRING_WITH_LEN(" log_pos "));
2956
llstr(hb.log_pos, llbuf);
2957
error_msg.append(llbuf, strlen(llbuf));
2960
goto skip_relay_logging;
2970
If this event is originating from this server, don't queue it.
2971
We don't check this for 3.23 events because it's simpler like this; 3.23
2972
will be filtered anyway by the SQL slave thread which also tests the
2973
server id (we must also keep this test in the SQL thread, in case somebody
2974
upgrades a 4.0 slave which has a not-filtered relay log).
2976
ANY event coming from ourselves can be ignored: it is obvious for queries;
2977
for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
2978
(--log-slave-updates would not log that) unless this slave is also its
2979
direct master (an unsupported, useless setup!).
2982
pthread_mutex_lock(log_lock);
2984
if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
2985
!mi->rli.replicate_same_server_id)
2988
Do not write it to the relay log.
2989
a) We still want to increment mi->master_log_pos, so that we won't
2990
re-read this event from the master if the slave IO thread is now
2991
stopped/restarted (more efficient if the events we are ignoring are big
2993
b) We want to record that we are skipping events, for the information of
2994
the slave SQL thread, otherwise that thread may let
2995
rli->group_relay_log_pos stay too small if the last binlog's event is
2997
But events which were generated by this slave and which do not exist in
2998
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
3001
if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
3002
buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3003
buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3005
mi->incrementLogPosition(inc_pos);
3006
memcpy(rli->ign_master_log_name_end, mi->getLogName(), FN_REFLEN);
3007
assert(rli->ign_master_log_name_end[0]);
3008
rli->ign_master_log_pos_end= mi->getLogPosition();
3010
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3014
/* write the event to the relay log */
3015
if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3017
mi->incrementLogPosition(inc_pos);
3018
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3022
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
3024
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
3026
pthread_mutex_unlock(log_lock);
3031
pthread_mutex_unlock(&mi->data_lock);
3033
mi->report(ERROR_LEVEL, error, ER(error),
3034
(error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
3035
_("could not queue event from master") :
3041
void end_relay_log_info(Relay_log_info* rli)
3045
if (rli->info_fd >= 0)
3047
end_io_cache(&rli->info_file);
3048
(void) my_close(rli->info_fd, MYF(MY_WME));
3051
if (rli->cur_log_fd >= 0)
3053
end_io_cache(&rli->cache_buf);
3054
(void)my_close(rli->cur_log_fd, MYF(MY_WME));
3055
rli->cur_log_fd = -1;
3058
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
3059
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3061
Delete the slave's temporary tables from memory.
3062
In the future there will be other actions than this, to ensure persistance
3063
of slave's temp tables after shutdown.
3065
rli->close_temporary_tables();
3070
Try to connect until successful or slave killed
3074
session Thread handler for slave
3075
DRIZZLE DRIZZLE connection handle
3076
mi Replication handle
3083
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi)
3085
return(connect_to_master(session, drizzle, mi, 0, 0));
3094
Try to connect until successful or slave killed or we have retried
3095
master_retry_count times
3098
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
3099
bool reconnect, bool suppress_warnings)
3101
int32_t slave_was_killed;
3102
int32_t last_errno= -2; // impossible error
3103
uint32_t err_count=0;
3106
mi->events_till_disconnect = disconnect_slave_event_count;
3107
uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
3108
if (opt_slave_compressed_protocol)
3109
client_flag=CLIENT_COMPRESS; /* We will use compression */
3111
drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3112
drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3114
while (!(slave_was_killed = io_slave_killed(session,mi)) &&
3115
(reconnect ? drizzle_reconnect(drizzle) != 0 :
3116
drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
3117
mi->getPort(), 0, client_flag) == 0))
3119
/* Don't repeat last error */
3120
if ((int32_t)drizzle_errno(drizzle) != last_errno)
3122
last_errno=drizzle_errno(drizzle);
3123
suppress_warnings= 0;
3124
mi->report(ERROR_LEVEL, last_errno,
3125
_("error %s to master '%s@%s:%d'"
3126
" - retry-time: %d retries: %u"),
3127
(reconnect ? _("reconnecting") : _("connecting")),
3128
mi->getUsername(), mi->getHostname(), mi->getPort(),
3129
mi->getConnectionRetry(), master_retry_count);
3132
By default we try forever. The reason is that failure will trigger
3133
master election, so if the user did not set master_retry_count we
3134
do not want to have election triggered on the first failure to
3137
if (++err_count == master_retry_count)
3142
safe_sleep(session,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3146
if (!slave_was_killed)
3150
if (!suppress_warnings && global_system_variables.log_warnings)
3151
sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3152
"replication resumed in log '%s' at "
3153
"position %s"), mi->getUsername(),
3154
mi->getHostname(), mi->getPort(),
3156
llstr(mi->getLogPosition(),llbuff));
3159
drizzle->reconnect= 1;
3160
return(slave_was_killed);
3168
Try to connect until successful or slave killed or we have retried
3169
master_retry_count times
3172
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
3173
bool suppress_warnings)
3175
return(connect_to_master(session, drizzle, mi, 1, suppress_warnings));
3180
Store the file and position where the execute-slave thread are in the
3184
flush_relay_log_info()
3185
rli Relay log information
3188
- As this is only called by the slave thread, we don't need to
3189
have a lock on this.
3190
- If there is an active transaction, then we don't update the position
3191
in the relay log. This is to ensure that we re-execute statements
3192
if we die in the middle of an transaction that was rolled back.
3193
- As a transaction never spans binary logs, we don't have to handle the
3194
case where we do a relay-log-rotation in the middle of the transaction.
3195
If this would not be the case, we would have to ensure that we
3196
don't delete the relay log file where the transaction started when
3197
we switch to a new relay log file.
3200
- Change the log file information to a binary format to avoid calling
3208
bool flush_relay_log_info(Relay_log_info* rli)
3212
if (unlikely(rli->no_storage))
3220
Called when we notice that the current "hot" log got rotated under our feet.
3223
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
3225
assert(rli->cur_log != &rli->cache_buf);
3226
assert(rli->cur_log_fd == -1);
3228
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3229
if ((rli->cur_log_fd=open_binlog(cur_log, rli->event_relay_log_name.c_str(), errmsg)) < 0)
3232
We want to start exactly where we was before:
3233
relay_log_pos Current log pos
3234
pending Number of bytes already processed from the event
3236
rli->event_relay_log_pos= cmax(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3237
my_b_seek(cur_log,rli->event_relay_log_pos);
3242
static Log_event* next_event(Relay_log_info* rli)
3245
IO_CACHE* cur_log = rli->cur_log;
3246
pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3247
const char* errmsg=0;
3248
Session* session = rli->sql_session;
3250
assert(session != 0);
3252
if (abort_slave_event_count && !rli->events_till_abort--)
3256
For most operations we need to protect rli members with data_lock,
3257
so we assume calling function acquired this mutex for us and we will
3258
hold it for the most of the loop below However, we will release it
3259
whenever it is worth the hassle, and in the cases when we go into a
3260
pthread_cond_wait() with the non-data_lock mutex
3262
safe_mutex_assert_owner(&rli->data_lock);
3264
while (!sql_slave_killed(session,rli))
3267
We can have two kinds of log reading:
3269
rli->cur_log points at the IO_CACHE of relay_log, which
3270
is actively being updated by the I/O thread. We need to be careful
3271
in this case and make sure that we are not looking at a stale log that
3272
has already been rotated. If it has been, we reopen the log.
3274
The other case is much simpler:
3275
We just have a read only log that nobody else will be updating.
3278
if ((hot_log = (cur_log != &rli->cache_buf)))
3280
assert(rli->cur_log_fd == -1); // foreign descriptor
3281
pthread_mutex_lock(log_lock);
3284
Reading xxx_file_id is safe because the log will only
3285
be rotated when we hold relay_log.LOCK_log
3287
if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
3289
// The master has switched to a new log file; Reopen the old log file
3290
cur_log=reopen_relay_log(rli, &errmsg);
3291
pthread_mutex_unlock(log_lock);
3292
if (!cur_log) // No more log files
3294
hot_log=0; // Using old binary log
3298
As there is no guarantee that the relay is open (for example, an I/O
3299
error during a write by the slave I/O thread may have closed it), we
3302
if (!my_b_inited(cur_log))
3304
assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3305
assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
3308
Relay log is always in new format - if the master is 3.23, the
3309
I/O thread will convert the format for us.
3310
A problem: the description event may be in a previous relay log. So if
3311
the slave has been shutdown meanwhile, we would have to look in old relay
3312
logs, which may even have been deleted. So we need to write this
3313
description event at the beginning of the relay log.
3314
When the relay log is created when the I/O thread starts, easy: the
3315
master will send the description event and we will queue it.
3316
But if the relay log is created by new_file(): then the solution is:
3317
DRIZZLE_BIN_LOG::open() will write the buffered description event.
3319
if ((ev=Log_event::read_log_event(cur_log,0,
3320
rli->relay_log.description_event_for_exec)))
3323
assert(session==rli->sql_session);
3325
read it while we have a lock, to avoid a mutex lock in
3326
inc_event_relay_log_pos()
3328
rli->future_event_relay_log_pos= my_b_tell(cur_log);
3330
pthread_mutex_unlock(log_lock);
3333
assert(session==rli->sql_session);
3334
if (opt_reckless_slave) // For mysql-test
3336
if (cur_log->error < 0)
3338
errmsg = "slave SQL thread aborted because of I/O error";
3340
pthread_mutex_unlock(log_lock);
3343
if (!cur_log->error) /* EOF */
3346
On a hot log, EOF means that there are no more updates to
3347
process and we must block until I/O thread adds some and
3348
signals us to continue
3353
We say in Seconds_Behind_Master that we have "caught up". Note that
3354
for example if network link is broken but I/O slave thread hasn't
3355
noticed it (slave_net_timeout not elapsed), then we'll say "caught
3356
up" whereas we're not really caught up. Fixing that would require
3357
internally cutting timeout in smaller pieces in network read, no
3358
thanks. Another example: SQL has caught up on I/O, now I/O has read
3359
a new event and is queuing it; the false "0" will exist until SQL
3360
finishes executing the new event; it will be look abnormal only if
3361
the events have old timestamps (then you get "many", 0, "many").
3363
Transient phases like this can be fixed with implemeting
3364
Heartbeat event which provides the slave the status of the
3365
master at time the master does not have any new update to send.
3366
Seconds_Behind_Master would be zero only when master has no
3367
more updates in binlog for slave. The heartbeat can be sent
3368
in a (small) fraction of slave_net_timeout. Until it's done
3369
rli->last_master_timestamp is temporarely (for time of
3370
waiting for the following event) reset whenever EOF is
3373
time_t save_timestamp= rli->last_master_timestamp;
3374
rli->last_master_timestamp= 0;
3376
assert(rli->relay_log.get_open_count() ==
3377
rli->cur_log_old_open_count);
3379
if (rli->ign_master_log_name_end[0])
3381
/* We generate and return a Rotate, to make our positions advance */
3382
ev= new Rotate_log_event(rli->ign_master_log_name_end,
3383
0, rli->ign_master_log_pos_end,
3384
Rotate_log_event::DUP_NAME);
3385
rli->ign_master_log_name_end[0]= 0;
3386
pthread_mutex_unlock(log_lock);
3389
errmsg= "Slave SQL thread failed to create a Rotate event "
3390
"(out of memory?), SHOW SLAVE STATUS may be inaccurate";
3393
ev->server_id= 0; // don't be ignored by slave SQL thread
3398
We can, and should release data_lock while we are waiting for
3399
update. If we do not, show slave status will block
3401
pthread_mutex_unlock(&rli->data_lock);
3405
- the I/O thread has reached log_space_limit
3406
- the SQL thread has read all relay logs, but cannot purge for some
3408
* it has already purged all logs except the current one
3409
* there are other logs than the current one but they're involved in
3410
a transaction that finishes in the current one (or is not finished)
3412
Wake up the possibly waiting I/O thread, and set a boolean asking
3413
the I/O thread to temporarily ignore the log_space_limit
3414
constraint, because we do not want the I/O thread to block because of
3415
space (it's ok if it blocks for any other reason (e.g. because the
3416
master does not send anything). Then the I/O thread stops waiting
3417
and reads more events.
3418
The SQL thread decides when the I/O thread should take log_space_limit
3419
into account again : ignore_log_space_limit is reset to 0
3420
in purge_first_log (when the SQL thread purges the just-read relay
3421
log), and also when the SQL thread starts. We should also reset
3422
ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
3423
fact, no need as RESET SLAVE requires that the slave
3424
be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
3427
pthread_mutex_lock(&rli->log_space_lock);
3428
// prevent the I/O thread from blocking next times
3429
rli->ignore_log_space_limit= 1;
3431
If the I/O thread is blocked, unblock it. Ok to broadcast
3432
after unlock, because the mutex is only destroyed in
3433
~Relay_log_info(), i.e. when rli is destroyed, and rli will
3434
not be destroyed before we exit the present function.
3436
pthread_mutex_unlock(&rli->log_space_lock);
3437
pthread_cond_broadcast(&rli->log_space_cond);
3438
// Note that wait_for_update_relay_log unlocks lock_log !
3439
rli->relay_log.wait_for_update_relay_log(rli->sql_session);
3440
// re-acquire data lock since we released it earlier
3441
pthread_mutex_lock(&rli->data_lock);
3442
rli->last_master_timestamp= save_timestamp;
3446
If the log was not hot, we need to move to the next log in
3447
sequence. The next log could be hot or cold, we deal with both
3448
cases separately after doing some common initialization
3450
end_io_cache(cur_log);
3451
assert(rli->cur_log_fd >= 0);
3452
my_close(rli->cur_log_fd, MYF(MY_WME));
3453
rli->cur_log_fd = -1;
3455
if (relay_log_purge)
3458
purge_first_log will properly set up relay log coordinates in rli.
3459
If the group's coordinates are equal to the event's coordinates
3460
(i.e. the relay log was not rotated in the middle of a group),
3461
we can purge this relay log too.
3462
We do uint64_t and string comparisons, this may be slow but
3463
- purging the last relay log is nice (it can save 1GB of disk), so we
3464
like to detect the case where we can do it, and given this,
3465
- I see no better detection method
3466
- purge_first_log is not called that often
3468
if (rli->relay_log.purge_first_log
3470
rli->group_relay_log_pos == rli->event_relay_log_pos
3471
&& !strcmp(rli->group_relay_log_name.c_str(), rli->event_relay_log_name.c_str())))
3473
errmsg = "Error purging processed logs";
3480
If hot_log is set, then we already have a lock on
3481
LOCK_log. If not, we have to get the lock.
3483
According to Sasha, the only time this code will ever be executed
3484
is if we are recovering from a bug.
3486
if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
3488
errmsg = "error switching to the next log";
3491
rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3492
rli->event_relay_log_name.assign(rli->linfo.log_file_name);
3493
flush_relay_log_info(rli);
3497
Now we want to open this next log. To know if it's a hot log (the one
3498
being written by the I/O thread now) or a cold log, we can use
3499
is_active(); if it is hot, we use the I/O cache; if it's cold we open
3500
the file normally. But if is_active() reports that the log is hot, this
3501
may change between the test and the consequence of the test. So we may
3502
open the I/O cache whereas the log is now cold, which is nonsense.
3503
To guard against this, we need to have LOCK_log.
3506
if (!hot_log) /* if hot_log, we already have this mutex */
3507
pthread_mutex_lock(log_lock);
3508
if (rli->relay_log.is_active(rli->linfo.log_file_name))
3511
if (global_system_variables.log_warnings)
3512
sql_print_information(_("next log '%s' is currently active"),
3513
rli->linfo.log_file_name);
3515
rli->cur_log= cur_log= rli->relay_log.get_log_file();
3516
rli->cur_log_old_open_count= rli->relay_log.get_open_count();
3517
assert(rli->cur_log_fd == -1);
3520
Read pointer has to be at the start since we are the only
3522
We must keep the LOCK_log to read the 4 first bytes, as this is a hot
3523
log (same as when we call read_log_event() above: for a hot log we
3526
if (check_binlog_magic(cur_log,&errmsg))
3528
if (!hot_log) pthread_mutex_unlock(log_lock);
3531
if (!hot_log) pthread_mutex_unlock(log_lock);
3534
if (!hot_log) pthread_mutex_unlock(log_lock);
3536
if we get here, the log was not hot, so we will have to open it
3537
ourselves. We are sure that the log is still not hot now (a log can get
3538
from hot to cold, but not from cold to hot). No need for LOCK_log.
3541
if (global_system_variables.log_warnings)
3542
sql_print_information(_("next log '%s' is not active"),
3543
rli->linfo.log_file_name);
3545
// open_binlog() will check the magic header
3546
if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
3553
Read failed with a non-EOF error.
3554
TODO: come up with something better to handle this error
3557
pthread_mutex_unlock(log_lock);
3558
sql_print_error(_("Slave SQL thread: I/O error reading "
3559
"event(errno: %d cur_log->error: %d)"),
3560
my_errno,cur_log->error);
3561
// set read position to the beginning of the event
3562
my_b_seek(cur_log,rli->event_relay_log_pos);
3563
/* otherwise, we have had a partial read */
3564
errmsg = _("Aborting slave SQL thread because of partial event read");
3565
break; // To end of function
3568
if (!errmsg && global_system_variables.log_warnings)
3570
sql_print_information(_("Error reading relay log event: %s"),
3571
_("slave SQL thread was killed"));
3577
sql_print_error(_("Error reading relay log event: %s"), errmsg);
3582
Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
3583
because of size is simpler because when we do it we already have all relevant
3584
locks; here we don't, so this function is mainly taking locks).
3585
Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
3589
void rotate_relay_log(Master_info* mi)
3591
Relay_log_info* rli= &mi->rli;
3593
/* We don't lock rli->run_lock. This would lead to deadlocks. */
3594
pthread_mutex_lock(&mi->run_lock);
3597
We need to test inited because otherwise, new_file() will attempt to lock
3598
LOCK_log, which may not be inited (if we're not a slave).
3605
/* If the relay log is closed, new_file() will do nothing. */
3606
rli->relay_log.new_file();
3609
We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
3610
be counted, so imagine a succession of FLUSH LOGS and assume the slave
3611
threads are started:
3612
relay_log_space decreases by the size of the deleted relay log, but does
3613
not increase, so flush-after-flush we may become negative, which is wrong.
3614
Even if this will be corrected as soon as a query is replicated on the
3615
slave (because the I/O thread will then call harvest_bytes_written() which
3616
will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
3617
output in SHOW SLAVE STATUS meanwhile. So we harvest now.
3618
If the log is closed, then this will just harvest the last writes, probably
3619
0 as they probably have been harvested.
3621
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3623
pthread_mutex_unlock(&mi->run_lock);
3629
Detects, based on master's version (as found in the relay log), if master
3631
@param rli Relay_log_info which tells the master's version
3632
@param bug_id Number of the bug as found in bugs.mysql.com
3633
@param report bool report error message, default TRUE
3634
@return true if master has the bug, FALSE if it does not.
3636
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
3638
struct st_version_range_for_one_bug {
3640
const unsigned char introduced_in[3]; // first version with bug
3641
const unsigned char fixed_in[3]; // first version with fix
3643
static struct st_version_range_for_one_bug versions_for_all_bugs[]=
3645
{24432, { 5, 0, 24 }, { 5, 0, 38 } },
3646
{24432, { 5, 1, 12 }, { 5, 1, 17 } },
3647
{33029, { 5, 0, 0 }, { 5, 0, 58 } },
3648
{33029, { 5, 1, 0 }, { 5, 1, 12 } },
3650
const unsigned char *master_ver=
3651
rli->relay_log.description_event_for_exec->server_version_split;
3653
assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3656
i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3658
const unsigned char *introduced_in= versions_for_all_bugs[i].introduced_in,
3659
*fixed_in= versions_for_all_bugs[i].fixed_in;
3660
if ((versions_for_all_bugs[i].bug_id == bug_id) &&
3661
(memcmp(introduced_in, master_ver, 3) <= 0) &&
3662
(memcmp(fixed_in, master_ver, 3) > 0))
3667
// a short message for SHOW SLAVE STATUS (message length constraints)
3668
my_printf_error(ER_UNKNOWN_ERROR,
3669
_("master may suffer from"
3670
" http://bugs.mysql.com/bug.php?id=%u"
3671
" so slave stops; check error log on slave"
3672
" for more info"), MYF(0), bug_id);
3673
// a verbose message for the error log
3674
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
3675
_("According to the master's version ('%s'),"
3676
" it is probable that master suffers from this bug:"
3677
" http://bugs.mysql.com/bug.php?id=%u"
3678
" and thus replicating the current binary log event"
3679
" may make the slave's data become different from the"
3681
" To take no risk, slave refuses to replicate"
3682
" this event and stops."
3683
" We recommend that all updates be stopped on the"
3684
" master and slave, that the data of both be"
3685
" manually synchronized,"
3686
" that master's binary logs be deleted,"
3687
" that master be upgraded to a version at least"
3688
" equal to '%d.%d.%d'. Then replication can be"
3690
rli->relay_log.description_event_for_exec->server_version,
3692
fixed_in[0], fixed_in[1], fixed_in[2]);
3700
BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
3701
exclusive, if one statement in a SP generated AUTO_INCREMENT value
3702
by the top statement, all statements after it would be considered
3703
generated AUTO_INCREMENT value by the top statement, and a
3704
erroneous INSERT_ID value might be associated with these statement,
3705
which could cause duplicate entry error and stop the slave.
3707
Detect buggy master to work around.
3709
bool rpl_master_erroneous_autoinc(Session *session)
3711
if (active_mi && active_mi->rli.sql_session == session)
3713
Relay_log_info *rli= &active_mi->rli;
3714
return rpl_master_has_bug(rli, 33029, false);
3719
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
3720
template class I_List_iterator<i_string>;
3721
template class I_List_iterator<i_string_pair>;
3725
@} (end of group Replication)