1
/* Copyright (C) 2000-2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
@addtogroup Replication
23
@brief Code to run the io thread and the sql thread on the
27
#include "mysql_priv.h"
34
#include "rpl_filter.h"
35
#include "repl_failsafe.h"
36
#include <thr_alarm.h>
38
#include <sql_common.h>
40
#include <mysys_err.h>
42
#ifdef HAVE_REPLICATION
44
#include "rpl_tblmap.h"
46
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
48
#define MAX_SLAVE_RETRY_PAUSE 5
49
bool use_slave_mask = 0;
50
MY_BITMAP slave_error_mask;
52
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
54
char* slave_load_tmpdir = 0;
55
Master_info *active_mi= 0;
56
my_bool replicate_same_server_id;
57
uint64_t relay_log_space_limit = 0;
60
When slave thread exits, we need to remember the temporary tables so we
61
can re-use them on slave start.
63
TODO: move the vars below under Master_info
66
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
67
int32_t events_till_abort = -1;
69
enum enum_slave_reconnect_actions
71
SLAVE_RECON_ACT_REG= 0,
72
SLAVE_RECON_ACT_DUMP= 1,
73
SLAVE_RECON_ACT_EVENT= 2,
77
enum enum_slave_reconnect_messages
79
SLAVE_RECON_MSG_WAIT= 0,
80
SLAVE_RECON_MSG_KILLED_WAITING= 1,
81
SLAVE_RECON_MSG_AFTER= 2,
82
SLAVE_RECON_MSG_FAILED= 3,
83
SLAVE_RECON_MSG_COMMAND= 4,
84
SLAVE_RECON_MSG_KILLED_AFTER= 5,
88
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
91
"Waiting to reconnect after a failed registration on master",
92
"Slave I/O thread killed while waitnig to reconnect after a failed \
93
registration on master",
94
"Reconnecting after a failed registration on master",
95
"failed registering on master, reconnecting to try again, \
96
log '%s' at postion %s",
98
"Slave I/O thread killed during or after reconnect"
101
"Waiting to reconnect after a failed binlog dump request",
102
"Slave I/O thread killed while retrying master dump",
103
"Reconnecting after a failed binlog dump request",
104
"failed dump request, reconnecting to try again, log '%s' at postion %s",
106
"Slave I/O thread killed during or after reconnect"
109
"Waiting to reconnect after a failed master event read",
110
"Slave I/O thread killed while waiting to reconnect after a failed read",
111
"Reconnecting after a failed master event read",
112
"Slave I/O thread: Failed reading log event, reconnecting to retry, \
113
log '%s' at postion %s",
115
"Slave I/O thread killed during or after a reconnect done to recover from \
121
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
123
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
124
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
125
static bool wait_for_relay_log_space(Relay_log_info* rli);
126
static inline bool io_slave_killed(THD* thd,Master_info* mi);
127
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
128
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
129
static int32_t safe_connect(THD* thd, MYSQL* mysql, Master_info* mi);
130
static int32_t safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
131
bool suppress_warnings);
132
static int32_t connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
133
bool reconnect, bool suppress_warnings);
134
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
135
void* thread_killed_arg);
136
static int32_t get_master_version_and_clock(MYSQL* mysql, Master_info* mi);
137
static Log_event* next_event(Relay_log_info* rli);
138
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
139
static int32_t terminate_slave_thread(THD *thd,
140
pthread_mutex_t* term_lock,
141
pthread_cond_t* term_cond,
142
volatile uint32_t *slave_running,
144
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
147
Find out which replications threads are running
151
mask Return value here
152
mi master_info for slave
153
inverse If set, returns which threads are not running
156
Get a bit mask for which threads are running so that we can later restart
160
mask If inverse == 0, running threads
161
If inverse == 1, stopped threads
164
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
166
bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
167
register int32_t tmp_mask=0;
170
tmp_mask |= SLAVE_IO;
172
tmp_mask |= SLAVE_SQL;
174
tmp_mask^= (SLAVE_IO | SLAVE_SQL);
184
void lock_slave_threads(Master_info* mi)
186
//TODO: see if we can do this without dual mutex
187
pthread_mutex_lock(&mi->run_lock);
188
pthread_mutex_lock(&mi->rli.run_lock);
194
unlock_slave_threads()
197
void unlock_slave_threads(Master_info* mi)
199
//TODO: see if we can do this without dual mutex
200
pthread_mutex_unlock(&mi->rli.run_lock);
201
pthread_mutex_unlock(&mi->run_lock);
206
/* Initialize slave structures */
211
This is called when mysqld starts. Before client connections are
212
accepted. However bootstrap may conflict with us if it does START SLAVE.
213
So it's safer to take the lock.
215
pthread_mutex_lock(&LOCK_active_mi);
217
TODO: re-write this to interate through the list of files
220
active_mi= new Master_info;
223
If master_host is not specified, try to read it from the master_info file.
224
If master_host is specified, create the master_info file if it doesn't
229
sql_print_error("Failed to allocate memory for the master info structure");
233
if (init_master_info(active_mi,master_info_file,relay_log_info_file,
234
1, (SLAVE_IO | SLAVE_SQL)))
236
sql_print_error("Failed to initialize the master info structure");
240
/* If server id is not set, start_slave_thread() will say it */
242
if (active_mi->host[0] && !opt_skip_slave_start)
244
if (start_slave_threads(1 /* need mutex */,
245
0 /* no wait for start*/,
249
SLAVE_IO | SLAVE_SQL))
251
sql_print_error("Failed to create slave threads");
255
pthread_mutex_unlock(&LOCK_active_mi);
259
pthread_mutex_unlock(&LOCK_active_mi);
265
Init function to set up array for errors that should be skipped for slave
268
init_slave_skip_errors()
269
arg List of errors numbers to skip, separated with ','
272
Called from get_options() in mysqld.cc on start-up
275
void init_slave_skip_errors(const char* arg)
279
if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
281
fprintf(stderr, "Badly out of memory, please check your system status\n");
285
for (;my_isspace(system_charset_info,*arg);++arg)
287
if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
289
bitmap_set_all(&slave_error_mask);
295
if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
297
if (err_code < MAX_SLAVE_ERROR)
298
bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
299
while (!my_isdigit(system_charset_info,*p) && *p)
306
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
309
return(0); /* successfully do nothing */
310
int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
311
pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
313
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
316
if ((error=terminate_slave_thread(mi->io_thd,io_lock,
323
if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
325
mi->rli.abort_slave=1;
326
if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
328
&mi->rli.slave_running,
338
Wait for a slave thread to terminate.
340
This function is called after requesting the thread to terminate
341
(by setting @c abort_slave member of @c Relay_log_info or @c
342
Master_info structure to 1). Termination of the thread is
343
controlled with the the predicate <code>*slave_running</code>.
345
Function will acquire @c term_lock before waiting on the condition
346
unless @c skip_lock is true in which case the mutex should be owned
347
by the caller of this function and will remain acquired after
348
return from the function.
351
Associated lock to use when waiting for @c term_cond
354
Condition that is signalled when the thread has terminated
357
Pointer to predicate to check for slave thread termination
360
If @c true the lock will not be acquired before waiting on
361
the condition. In this case, it is assumed that the calling
362
function acquires the lock before calling this function.
367
terminate_slave_thread(THD *thd,
368
pthread_mutex_t* term_lock,
369
pthread_cond_t* term_cond,
370
volatile uint32_t *slave_running,
376
pthread_mutex_lock(term_lock);
378
safe_mutex_assert_owner(term_lock);
383
pthread_mutex_unlock(term_lock);
384
return(ER_SLAVE_NOT_RUNNING);
387
THD_CHECK_SENTRY(thd);
390
Is is critical to test if the slave is running. Otherwise, we might
391
be referening freed memory trying to kick it
394
while (*slave_running) // Should always be true
396
pthread_mutex_lock(&thd->LOCK_delete);
397
#ifndef DONT_USE_THR_ALARM
399
Error codes from pthread_kill are:
400
EINVAL: invalid signal number (can't happen)
401
ESRCH: thread already killed (can happen, should be ignored)
403
int32_t err= pthread_kill(thd->real_id, thr_client_alarm);
404
assert(err != EINVAL);
406
thd->awake(THD::NOT_KILLED);
407
pthread_mutex_unlock(&thd->LOCK_delete);
410
There is a small chance that slave thread might miss the first
411
alarm. To protect againts it, resend the signal until it reacts
413
struct timespec abstime;
414
set_timespec(abstime,2);
415
error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
416
assert(error == ETIMEDOUT || error == 0);
419
assert(*slave_running == 0);
422
pthread_mutex_unlock(term_lock);
427
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
428
pthread_mutex_t *cond_lock,
429
pthread_cond_t *start_cond,
430
volatile uint32_t *slave_running,
431
volatile uint32_t *slave_run_id,
441
pthread_mutex_lock(start_lock);
445
pthread_cond_broadcast(start_cond);
447
pthread_mutex_unlock(start_lock);
448
sql_print_error("Server id not set, will not start slave");
449
return(ER_BAD_SLAVE);
455
pthread_cond_broadcast(start_cond);
457
pthread_mutex_unlock(start_lock);
458
return(ER_SLAVE_MUST_STOP);
460
start_id= *slave_run_id;
463
struct sched_param tmp_sched_param;
465
memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
466
tmp_sched_param.sched_priority= CONNECT_PRIOR;
467
(void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
469
if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
472
pthread_mutex_unlock(start_lock);
473
return(ER_SLAVE_THREAD);
475
if (start_cond && cond_lock) // caller has cond_lock
477
THD* thd = current_thd;
478
while (start_id == *slave_run_id)
480
const char* old_msg = thd->enter_cond(start_cond,cond_lock,
481
"Waiting for slave thread to start");
482
pthread_cond_wait(start_cond,cond_lock);
483
thd->exit_cond(old_msg);
484
pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
486
return(thd->killed_errno());
490
pthread_mutex_unlock(start_lock);
496
start_slave_threads()
499
SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
500
sense to do that for starting a slave--we always care if it actually
501
started the threads that were not previously running
504
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
506
const char* master_info_fname __attribute__((__unused__)),
507
const char* slave_info_fname __attribute__((__unused__)),
510
pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
511
pthread_cond_t* cond_io=0,*cond_sql=0;
514
if (need_slave_mutex)
516
lock_io = &mi->run_lock;
517
lock_sql = &mi->rli.run_lock;
521
cond_io = &mi->start_cond;
522
cond_sql = &mi->rli.start_cond;
523
lock_cond_io = &mi->run_lock;
524
lock_cond_sql = &mi->rli.run_lock;
527
if (thread_mask & SLAVE_IO)
528
error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
530
&mi->slave_running, &mi->slave_run_id,
531
mi, 1); //high priority, to read the most possible
532
if (!error && (thread_mask & SLAVE_SQL))
534
error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
536
&mi->rli.slave_running, &mi->rli.slave_run_id,
539
terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
546
static int32_t end_slave_on_walk(Master_info* mi, uchar* /*unused*/)
555
Free all resources used by slave
564
This is called when the server terminates, in close_connections().
565
It terminates slave threads. However, some CHANGE MASTER etc may still be
566
running presently. If a START SLAVE was in progress, the mutex lock below
567
will make us wait until slave threads have started, and START SLAVE
568
returns, then we terminate them here.
570
pthread_mutex_lock(&LOCK_active_mi);
574
TODO: replace the line below with
575
list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
576
once multi-master code is ready.
578
terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
579
end_master_info(active_mi);
583
pthread_mutex_unlock(&LOCK_active_mi);
588
static bool io_slave_killed(THD* thd, Master_info* mi)
590
assert(mi->io_thd == thd);
591
assert(mi->slave_running); // tracking buffer overrun
592
return(mi->abort_slave || abort_loop || thd->killed);
596
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
598
assert(rli->sql_thd == thd);
599
assert(rli->slave_running == 1);// tracking buffer overrun
600
if (abort_loop || thd->killed || rli->abort_slave)
603
If we are in an unsafe situation (stopping could corrupt replication),
604
we give one minute to the slave SQL thread of grace before really
605
terminating, in the hope that it will be able to read more events and
606
the unsafe situation will soon be left. Note that this one minute starts
607
from the last time anything happened in the slave SQL thread. So it's
608
really one minute of idleness, we don't timeout if the slave SQL thread
611
if (rli->last_event_start_time == 0)
613
if (difftime(time(0), rli->last_event_start_time) > 60)
615
rli->report(ERROR_LEVEL, 0,
616
"SQL thread had to stop in an unsafe situation, in "
617
"the middle of applying updates to a "
618
"non-transactional table without any primary key. "
619
"There is a risk of duplicate updates when the slave "
620
"SQL thread is restarted. Please check your tables' "
621
"contents after restart.");
630
skip_load_data_infile()
633
This is used to tell a 3.23 master to break send_file()
636
void skip_load_data_infile(NET *net)
638
(void)net_request_file(net, "/dev/null");
639
(void)my_net_read(net); // discard response
640
(void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok
645
bool net_request_file(NET* net, const char* fname)
647
return(net_write_command(net, 251, (uchar*) fname, strlen(fname),
652
From other comments and tests in code, it looks like
653
sometimes Query_log_event and Load_log_event can have db == 0
654
(see rewrite_db() above for example)
655
(cases where this happens are unclear; it may be when the master is 3.23).
658
const char *print_slave_db_safe(const char* db)
660
return((db ? db : ""));
663
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
664
const char *default_val)
668
if ((length=my_b_gets(f,var, max_size)))
670
char* last_p = var + length -1;
672
*last_p = 0; // if we stopped on newline, kill it
676
If we truncated a line or stopped on last char, remove all chars
677
up to and including newline.
680
while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
684
else if (default_val)
686
strmake(var, default_val, max_size-1);
693
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
698
if (my_b_gets(f, buf, sizeof(buf)))
703
else if (default_val)
711
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
716
if (my_b_gets(f, buf, sizeof(buf)))
718
if (sscanf(buf, "%f", var) != 1)
723
else if (default_val != 0.0)
731
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
733
if (io_slave_killed(thd, mi))
735
if (info && global_system_variables.log_warnings)
736
sql_print_information(info);
744
Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
745
relying on the binlog's version. This is not perfect: imagine an upgrade
746
of the master without waiting that all slaves are in sync with the master;
747
then a slave could be fooled about the binlog's format. This is what happens
748
when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
749
slaves are fooled. So we do this only to distinguish between 3.23 and more
750
recent masters (it's too late to change things for 3.23).
757
static int32_t get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
760
String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
761
char err_buff[MAX_SLAVE_ERRMSG];
762
const char* errmsg= 0;
764
MYSQL_RES *master_res= 0;
765
MYSQL_ROW master_row;
769
Free old description_event_for_queue (that is needed if we are in
772
delete mi->rli.relay_log.description_event_for_queue;
773
mi->rli.relay_log.description_event_for_queue= 0;
775
if (!my_isdigit(&my_charset_bin,*mysql->server_version))
777
errmsg = "Master reported unrecognized MySQL version";
778
err_code= ER_SLAVE_FATAL_ERROR;
779
sprintf(err_buff, ER(err_code), errmsg);
780
err_msg.append(err_buff);
785
Note the following switch will bug when we have MySQL branch 30 ;)
787
switch (*mysql->server_version)
792
errmsg = "Master reported unrecognized MySQL version";
793
err_code= ER_SLAVE_FATAL_ERROR;
794
sprintf(err_buff, ER(err_code), errmsg);
795
err_msg.append(err_buff);
798
mi->rli.relay_log.description_event_for_queue= new
799
Format_description_log_event(1, mysql->server_version);
802
mi->rli.relay_log.description_event_for_queue= new
803
Format_description_log_event(3, mysql->server_version);
807
Master is MySQL >=5.0. Give a default Format_desc event, so that we can
808
take the early steps (like tests for "is this a 3.23 master") which we
809
have to take before we receive the real master's Format_desc which will
810
override this one. Note that the Format_desc we create below is garbage
811
(it has the format of the *slave*); it's only good to help know if the
812
master is 3.23, 4.0, etc.
814
mi->rli.relay_log.description_event_for_queue= new
815
Format_description_log_event(4, mysql->server_version);
821
This does not mean that a 5.0 slave will be able to read a 6.0 master; but
822
as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
823
can't read a 6.0 master, this will show up when the slave can't read some
824
events sent by the master, and there will be error messages.
827
if (err_msg.length() != 0)
830
/* as we are here, we tried to allocate the event */
831
if (!mi->rli.relay_log.description_event_for_queue)
833
errmsg= "default Format_description_log_event";
834
err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
835
sprintf(err_buff, ER(err_code), errmsg);
836
err_msg.append(err_buff);
841
Compare the master and slave's clock. Do not die if master's clock is
842
unavailable (very old master not supporting UNIX_TIMESTAMP()?).
845
if (!mysql_real_query(mysql, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
846
(master_res= mysql_store_result(mysql)) &&
847
(master_row= mysql_fetch_row(master_res)))
849
mi->clock_diff_with_master=
850
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
852
else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
854
mi->clock_diff_with_master= 0; /* The "most sensible" value */
855
sql_print_warning("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
856
"do not trust column Seconds_Behind_Master of SHOW "
857
"SLAVE STATUS. Error: %s (%d)",
858
mysql_error(mysql), mysql_errno(mysql));
861
mysql_free_result(master_res);
864
Check that the master's server id and ours are different. Because if they
865
are equal (which can result from a simple copy of master's datadir to slave,
866
thus copying some my.cnf), replication will work but all events will be
868
Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
870
Note: we could have put a @@SERVER_ID in the previous SELECT
871
UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
873
if (!mysql_real_query(mysql,
874
STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
875
(master_res= mysql_store_result(mysql)))
877
if ((master_row= mysql_fetch_row(master_res)) &&
878
(::server_id == strtoul(master_row[1], 0, 10)) &&
879
!mi->rli.replicate_same_server_id)
882
"The slave I/O thread stops because master and slave have equal"
883
" MySQL server ids; these ids must be different for replication to work (or"
884
" the --replicate-same-server-id option must be used on slave but this does"
885
" not always make sense; please check the manual before using it).";
886
err_code= ER_SLAVE_FATAL_ERROR;
887
sprintf(err_buff, ER(err_code), errmsg);
888
err_msg.append(err_buff);
890
mysql_free_result(master_res);
896
Check that the master's global character_set_server and ours are the same.
897
Not fatal if query fails (old master?).
898
Note that we don't check for equality of global character_set_client and
899
collation_connection (neither do we prevent their setting in
900
set_var.cc). That's because from what I (Guilhem) have tested, the global
901
values of these 2 are never used (new connections don't use them).
902
We don't test equality of global collation_database either as it's is
903
going to be deprecated (made read-only) in 4.1 very soon.
904
The test is only relevant if master < 5.0.3 (we'll test only if it's older
905
than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
906
charset info in each binlog event.
907
We don't do it for 3.23 because masters <3.23.50 hang on
908
SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
909
test only if master is 4.x.
912
/* redundant with rest of code but safer against later additions */
913
if (*mysql->server_version == '3')
916
if ((*mysql->server_version == '4') &&
917
!mysql_real_query(mysql,
918
STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
919
(master_res= mysql_store_result(mysql)))
921
if ((master_row= mysql_fetch_row(master_res)) &&
922
strcmp(master_row[0], global_system_variables.collation_server->name))
925
"The slave I/O thread stops because master and slave have"
926
" different values for the COLLATION_SERVER global variable."
927
" The values must be equal for replication to work";
928
err_code= ER_SLAVE_FATAL_ERROR;
929
sprintf(err_buff, ER(err_code), errmsg);
930
err_msg.append(err_buff);
932
mysql_free_result(master_res);
938
Perform analogous check for time zone. Theoretically we also should
939
perform check here to verify that SYSTEM time zones are the same on
940
slave and master, but we can't rely on value of @@system_time_zone
941
variable (it is time zone abbreviation) since it determined at start
942
time and so could differ for slave and master even if they are really
943
in the same system time zone. So we are omiting this check and just
944
relying on documentation. Also according to Monty there are many users
945
who are using replication between servers in various time zones. Hence
946
such check will broke everything for them. (And now everything will
947
work for them because by default both their master and slave will have
949
This check is only necessary for 4.x masters (and < 5.0.4 masters but
952
if ((*mysql->server_version == '4') &&
953
!mysql_real_query(mysql, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
954
(master_res= mysql_store_result(mysql)))
956
if ((master_row= mysql_fetch_row(master_res)) &&
957
strcmp(master_row[0],
958
global_system_variables.time_zone->get_name()->ptr()))
961
"The slave I/O thread stops because master and slave have"
962
" different values for the TIME_ZONE global variable."
963
" The values must be equal for replication to work";
964
err_code= ER_SLAVE_FATAL_ERROR;
965
sprintf(err_buff, ER(err_code), errmsg);
966
err_msg.append(err_buff);
968
mysql_free_result(master_res);
974
if (mi->heartbeat_period != 0.0)
977
const char query_format[]= "SET @master_heartbeat_period= %s";
978
char query[sizeof(query_format) - 2 + sizeof(llbuf)];
980
the period is an uint64_t of nano-secs.
982
llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
983
sprintf(query, query_format, llbuf);
985
if (mysql_real_query(mysql, query, strlen(query))
986
&& !check_io_slave_killed(mi->io_thd, mi, NULL))
988
err_msg.append("The slave I/O thread stops because querying master with '");
989
err_msg.append(query);
990
err_msg.append("' failed;");
991
err_msg.append(" error: ");
992
err_code= mysql_errno(mysql);
993
err_msg.qs_append(err_code);
994
err_msg.append(" '");
995
err_msg.append(mysql_error(mysql));
997
mysql_free_result(mysql_store_result(mysql));
1000
mysql_free_result(mysql_store_result(mysql));
1004
if (err_msg.length() != 0)
1006
sql_print_error(err_msg.ptr());
1007
assert(err_code != 0);
1008
mi->report(ERROR_LEVEL, err_code, err_msg.ptr());
1016
static bool wait_for_relay_log_space(Relay_log_info* rli)
1018
bool slave_killed=0;
1019
Master_info* mi = rli->mi;
1020
const char *save_proc_info;
1021
THD* thd = mi->io_thd;
1023
pthread_mutex_lock(&rli->log_space_lock);
1024
save_proc_info= thd->enter_cond(&rli->log_space_cond,
1025
&rli->log_space_lock,
1027
Waiting for the slave SQL thread to free enough relay log space");
1028
while (rli->log_space_limit < rli->log_space_total &&
1029
!(slave_killed=io_slave_killed(thd,mi)) &&
1030
!rli->ignore_log_space_limit)
1031
pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1032
thd->exit_cond(save_proc_info);
1033
return(slave_killed);
1038
Builds a Rotate from the ignored events' info and writes it to relay log.
1041
write_ignored_events_info_to_relay_log()
1042
thd pointer to I/O thread's thd
1046
Slave I/O thread, going to die, must leave a durable trace of the
1047
ignored events' end position for the use of the slave SQL thread, by
1048
calling this function. Only that thread can call it (see assertion).
1050
static void write_ignored_events_info_to_relay_log(THD *thd __attribute__((__unused__)),
1053
Relay_log_info *rli= &mi->rli;
1054
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1056
assert(thd == mi->io_thd);
1057
pthread_mutex_lock(log_lock);
1058
if (rli->ign_master_log_name_end[0])
1060
Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1061
0, rli->ign_master_log_pos_end,
1062
Rotate_log_event::DUP_NAME);
1063
rli->ign_master_log_name_end[0]= 0;
1064
/* can unlock before writing as slave SQL thd will soon see our Rotate */
1065
pthread_mutex_unlock(log_lock);
1066
if (likely((bool)ev))
1068
ev->server_id= 0; // don't be ignored by slave SQL thread
1069
if (unlikely(rli->relay_log.append(ev)))
1070
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
1071
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
1072
"failed to write a Rotate event"
1073
" to the relay log, SHOW SLAVE STATUS may be"
1075
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1076
if (flush_master_info(mi, 1))
1077
sql_print_error("Failed to flush master info file");
1081
mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
1082
ER(ER_SLAVE_CREATE_EVENT_FAILURE),
1083
"Rotate_event (out of memory?),"
1084
" SHOW SLAVE STATUS may be inaccurate");
1087
pthread_mutex_unlock(log_lock);
1092
int32_t register_slave_on_master(MYSQL* mysql, Master_info *mi,
1093
bool *suppress_warnings)
1095
uchar buf[1024], *pos= buf;
1096
uint32_t report_host_len, report_user_len=0, report_password_len=0;
1098
*suppress_warnings= false;
1101
report_host_len= strlen(report_host);
1103
report_user_len= strlen(report_user);
1104
if (report_password)
1105
report_password_len= strlen(report_password);
1106
/* 30 is a good safety margin */
1107
if (report_host_len + report_user_len + report_password_len + 30 >
1109
return(0); // safety
1111
int4store(pos, server_id); pos+= 4;
1112
pos= net_store_data(pos, (uchar*) report_host, report_host_len);
1113
pos= net_store_data(pos, (uchar*) report_user, report_user_len);
1114
pos= net_store_data(pos, (uchar*) report_password, report_password_len);
1115
int2store(pos, (uint16) report_port); pos+= 2;
1116
int4store(pos, rpl_recovery_rank); pos+= 4;
1117
/* The master will fill in master_id */
1118
int4store(pos, 0); pos+= 4;
1120
if (simple_command(mysql, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
1122
if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
1124
*suppress_warnings= true; // Suppress reconnect warning
1126
else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
1129
snprintf(buf, sizeof(buf), "%s (Errno: %d)", mysql_error(mysql),
1130
mysql_errno(mysql));
1131
mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1132
ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1140
bool show_master_info(THD* thd, Master_info* mi)
1142
// TODO: fix this for multi-master
1143
List<Item> field_list;
1144
Protocol *protocol= thd->protocol;
1146
field_list.push_back(new Item_empty_string("Slave_IO_State",
1148
field_list.push_back(new Item_empty_string("Master_Host",
1150
field_list.push_back(new Item_empty_string("Master_User",
1152
field_list.push_back(new Item_return_int("Master_Port", 7,
1154
field_list.push_back(new Item_return_int("Connect_Retry", 10,
1156
field_list.push_back(new Item_empty_string("Master_Log_File",
1158
field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
1159
MYSQL_TYPE_LONGLONG));
1160
field_list.push_back(new Item_empty_string("Relay_Log_File",
1162
field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
1163
MYSQL_TYPE_LONGLONG));
1164
field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
1166
field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
1167
field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1168
field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
1169
field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
1170
field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
1171
field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
1172
field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
1173
field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
1175
field_list.push_back(new Item_return_int("Last_Errno", 4, MYSQL_TYPE_LONG));
1176
field_list.push_back(new Item_empty_string("Last_Error", 20));
1177
field_list.push_back(new Item_return_int("Skip_Counter", 10,
1179
field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
1180
MYSQL_TYPE_LONGLONG));
1181
field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
1182
MYSQL_TYPE_LONGLONG));
1183
field_list.push_back(new Item_empty_string("Until_Condition", 6));
1184
field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1185
field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1186
MYSQL_TYPE_LONGLONG));
1187
field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
1188
field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
1189
sizeof(mi->ssl_ca)));
1190
field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
1191
sizeof(mi->ssl_capath)));
1192
field_list.push_back(new Item_empty_string("Master_SSL_Cert",
1193
sizeof(mi->ssl_cert)));
1194
field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
1195
sizeof(mi->ssl_cipher)));
1196
field_list.push_back(new Item_empty_string("Master_SSL_Key",
1197
sizeof(mi->ssl_key)));
1198
field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1199
MYSQL_TYPE_LONGLONG));
1200
field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
1202
field_list.push_back(new Item_return_int("Last_IO_Errno", 4, MYSQL_TYPE_LONG));
1203
field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1204
field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, MYSQL_TYPE_LONG));
1205
field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
1207
if (protocol->send_fields(&field_list,
1208
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1213
String *packet= &thd->packet;
1214
protocol->prepare_for_resend();
1217
slave_running can be accessed without run_lock but not other
1218
non-volotile members like mi->io_thd, which is guarded by the mutex.
1220
pthread_mutex_lock(&mi->run_lock);
1221
protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
1222
pthread_mutex_unlock(&mi->run_lock);
1224
pthread_mutex_lock(&mi->data_lock);
1225
pthread_mutex_lock(&mi->rli.data_lock);
1226
protocol->store(mi->host, &my_charset_bin);
1227
protocol->store(mi->user, &my_charset_bin);
1228
protocol->store((uint32) mi->port);
1229
protocol->store((uint32) mi->connect_retry);
1230
protocol->store(mi->master_log_name, &my_charset_bin);
1231
protocol->store((uint64_t) mi->master_log_pos);
1232
protocol->store(mi->rli.group_relay_log_name +
1233
dirname_length(mi->rli.group_relay_log_name),
1235
protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1236
protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
1237
protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ?
1238
"Yes" : "No", &my_charset_bin);
1239
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1240
protocol->store(rpl_filter->get_do_db());
1241
protocol->store(rpl_filter->get_ignore_db());
1244
String tmp(buf, sizeof(buf), &my_charset_bin);
1245
rpl_filter->get_do_table(&tmp);
1246
protocol->store(&tmp);
1247
rpl_filter->get_ignore_table(&tmp);
1248
protocol->store(&tmp);
1249
rpl_filter->get_wild_do_table(&tmp);
1250
protocol->store(&tmp);
1251
rpl_filter->get_wild_ignore_table(&tmp);
1252
protocol->store(&tmp);
1254
protocol->store(mi->rli.last_error().number);
1255
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1256
protocol->store((uint32) mi->rli.slave_skip_counter);
1257
protocol->store((uint64_t) mi->rli.group_master_log_pos);
1258
protocol->store((uint64_t) mi->rli.log_space_total);
1261
mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
1262
( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
1263
"Relay"), &my_charset_bin);
1264
protocol->store(mi->rli.until_log_name, &my_charset_bin);
1265
protocol->store((uint64_t) mi->rli.until_log_pos);
1267
protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
1268
protocol->store(mi->ssl_ca, &my_charset_bin);
1269
protocol->store(mi->ssl_capath, &my_charset_bin);
1270
protocol->store(mi->ssl_cert, &my_charset_bin);
1271
protocol->store(mi->ssl_cipher, &my_charset_bin);
1272
protocol->store(mi->ssl_key, &my_charset_bin);
1275
Seconds_Behind_Master: if SQL thread is running and I/O thread is
1276
connected, we can compute it otherwise show NULL (i.e. unknown).
1278
if ((mi->slave_running == MYSQL_SLAVE_RUN_CONNECT) &&
1279
mi->rli.slave_running)
1281
long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
1282
- mi->clock_diff_with_master);
1284
Apparently on some systems time_diff can be <0. Here are possible
1285
reasons related to MySQL:
1286
- the master is itself a slave of another master whose time is ahead.
1287
- somebody used an explicit SET TIMESTAMP on the master.
1288
Possible reason related to granularity-to-second of time functions
1289
(nothing to do with MySQL), which can explain a value of -1:
1290
assume the master's and slave's time are perfectly synchronized, and
1291
that at slave's connection time, when the master's timestamp is read,
1292
it is at the very end of second 1, and (a very short time later) when
1293
the slave's timestamp is read it is at the very beginning of second
1294
2. Then the recorded value for master is 1 and the recorded value for
1295
slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1296
between timestamp of slave and rli->last_master_timestamp is 0
1297
(i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1298
This confuses users, so we don't go below 0: hence the max().
1300
last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1301
special marker to say "consider we have caught up".
1303
protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1304
max(0, time_diff) : 0));
1308
protocol->store_null();
1310
protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
1313
protocol->store(mi->last_error().number);
1315
protocol->store(mi->last_error().message, &my_charset_bin);
1317
protocol->store(mi->rli.last_error().number);
1319
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1321
pthread_mutex_unlock(&mi->rli.data_lock);
1322
pthread_mutex_unlock(&mi->data_lock);
1324
if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
1332
void set_slave_thread_options(THD* thd)
1335
It's nonsense to constrain the slave threads with max_join_size; if a
1336
query succeeded on master, we HAVE to execute it. So set
1337
OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
1338
(and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
1339
SELECT examining more than 4 billion rows would still fail (yes, because
1340
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1341
only for client threads.
1343
uint64_t options= thd->options | OPTION_BIG_SELECTS;
1344
if (opt_log_slave_updates)
1345
options|= OPTION_BIN_LOG;
1347
options&= ~OPTION_BIN_LOG;
1348
thd->options= options;
1349
thd->variables.completion_type= 0;
1353
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
1355
thd->variables.character_set_client=
1356
global_system_variables.character_set_client;
1357
thd->variables.collation_connection=
1358
global_system_variables.collation_connection;
1359
thd->variables.collation_server=
1360
global_system_variables.collation_server;
1361
thd->update_charset();
1364
We use a const cast here since the conceptual (and externally
1365
visible) behavior of the function is to set the default charset of
1366
the thread. That the cache has to be invalidated is a secondary
1369
const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
1377
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
1379
int32_t simulate_error= 0;
1380
thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
1381
SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1382
thd->security_ctx->skip_grants();
1383
my_net_init(&thd->net, 0);
1385
Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1386
slave threads, since a replication event can become this much larger
1387
than the corresponding packet (query) sent from client to master.
1389
thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1390
+ MAX_LOG_EVENT_HEADER; /* note, incr over the global not session var */
1391
thd->slave_thread = 1;
1392
thd->enable_slow_log= opt_log_slow_slave_statements;
1393
set_slave_thread_options(thd);
1394
thd->client_capabilities = CLIENT_LOCAL_FILES;
1395
pthread_mutex_lock(&LOCK_thread_count);
1396
thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
1397
pthread_mutex_unlock(&LOCK_thread_count);
1399
simulate_error|= (1 << SLAVE_THD_IO);
1400
simulate_error|= (1 << SLAVE_THD_SQL);
1401
if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
1408
if (thd_type == SLAVE_THD_SQL)
1409
thd_proc_info(thd, "Waiting for the next event in relay log");
1411
thd_proc_info(thd, "Waiting for master update");
1412
thd->version=refresh_version;
1418
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1419
void* thread_killed_arg)
1422
thr_alarm_t alarmed;
1424
thr_alarm_init(&alarmed);
1425
time_t start_time= my_time(0);
1426
time_t end_time= start_time+sec;
1428
while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1432
The only reason we are asking for alarm is so that
1433
we will be woken up in case of murder, so if we do not get killed,
1434
set the alarm so it goes off after we wake up naturally
1436
thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
1438
thr_end_alarm(&alarmed);
1440
if ((*thread_killed)(thd,thread_killed_arg))
1442
start_time= my_time(0);
1448
static int32_t request_dump(MYSQL* mysql, Master_info* mi,
1449
bool *suppress_warnings)
1451
uchar buf[FN_REFLEN + 10];
1453
int32_t binlog_flags = 0; // for now
1454
char* logname = mi->master_log_name;
1456
*suppress_warnings= false;
1458
// TODO if big log files: Change next to int8store()
1459
int4store(buf, (uint32_t) mi->master_log_pos);
1460
int2store(buf + 4, binlog_flags);
1461
int4store(buf + 6, server_id);
1462
len = (uint32_t) strlen(logname);
1463
memcpy(buf + 10, logname,len);
1464
if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
1467
Something went wrong, so we will just reconnect and retry later
1468
in the future, we should do a better error analysis, but for
1469
now we just fill up the error log :-)
1471
if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
1472
*suppress_warnings= true; // Suppress reconnect warning
1474
sql_print_error("Error on COM_BINLOG_DUMP: %d %s, will retry in %d secs",
1475
mysql_errno(mysql), mysql_error(mysql),
1484
Read one event from the master
1488
mysql MySQL connection
1489
mi Master connection information
1490
suppress_warnings TRUE when a normal net read timeout has caused us to
1491
try a reconnect. We do not want to print anything to
1492
the error log in this case because this a anormal
1493
event in an idle server.
1496
'packet_error' Error
1497
number Length of packet
1500
static uint32_t read_event(MYSQL* mysql,
1501
Master_info *mi __attribute__((__unused__)),
1502
bool* suppress_warnings)
1506
*suppress_warnings= false;
1508
my_real_read() will time us out
1509
We check if we were told to die, and if not, try reading again
1511
if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
1512
return(packet_error);
1514
len = cli_safe_read(mysql);
1515
if (len == packet_error || (int32_t) len < 1)
1517
if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
1520
We are trying a normal reconnect after a read timeout;
1521
we suppress prints to .err file as long as the reconnect
1522
happens without problems
1524
*suppress_warnings= true;
1527
sql_print_error("Error reading packet from server: %s ( server_errno=%d)",
1528
mysql_error(mysql), mysql_errno(mysql));
1529
return(packet_error);
1532
/* Check if eof packet */
1533
if (len < 8 && mysql->net.read_pos[0] == 254)
1535
sql_print_information("Slave: received end packet from server, apparent "
1536
"master shutdown: %s",
1537
mysql_error(mysql));
1538
return(packet_error);
1545
int32_t check_expected_error(THD* thd __attribute__((__unused__)),
1546
Relay_log_info const *rli __attribute__((__unused__)),
1547
int32_t expected_error)
1549
switch (expected_error) {
1550
case ER_NET_READ_ERROR:
1551
case ER_NET_ERROR_ON_WRITE:
1552
case ER_QUERY_INTERRUPTED:
1553
case ER_SERVER_SHUTDOWN:
1554
case ER_NEW_ABORTING_CONNECTION:
1563
Check if the current error is of temporary nature of not.
1564
Some errors are temporary in nature, such as
1565
ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. Ndb also signals
1566
that the error is temporary by pushing a warning with the error code
1567
ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1569
static int32_t has_temporary_error(THD *thd)
1571
if (thd->is_fatal_error)
1574
if (thd->main_da.is_error())
1577
my_error(ER_LOCK_DEADLOCK, MYF(0));
1581
If there is no message in THD, we can't say if it's a temporary
1582
error or not. This is currently the case for Incident_log_event,
1583
which sets no message. Return FALSE.
1585
if (!thd->is_error())
1589
Temporary error codes:
1590
currently, InnoDB deadlock detected by InnoDB or lock
1591
wait timeout (innodb_lock_wait_timeout exceeded
1593
if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
1594
thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
1602
Applies the given event and advances the relay log position.
1604
In essence, this function does:
1607
ev->apply_event(rli);
1608
ev->update_pos(rli);
1611
But it also does some maintainance, such as skipping events if
1612
needed and reporting errors.
1614
If the @c skip flag is set, then it is tested whether the event
1615
should be skipped, by looking at the slave_skip_counter and the
1616
server id. The skip flag should be set when calling this from a
1617
replication thread but not set when executing an explicit BINLOG
1622
@retval 1 Error calling ev->apply_event().
1624
@retval 2 No error calling ev->apply_event(), but error calling
1627
int32_t apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
1630
int32_t exec_res= 0;
1633
Execute the event to change the database and update the binary
1634
log coordinates, but first we set some data that is needed for
1637
The event will be executed unless it is supposed to be skipped.
1639
Queries originating from this server must be skipped. Low-level
1640
events (Format_description_log_event, Rotate_log_event,
1641
Stop_log_event) from this server must also be skipped. But for
1642
those we don't want to modify 'group_master_log_pos', because
1643
these events did not exist on the master.
1644
Format_description_log_event is not completely skipped.
1646
Skip queries specified by the user in 'slave_skip_counter'. We
1647
can't however skip events that has something to do with the log
1650
Filtering on own server id is extremely important, to ignore
1651
execution of events created by the creation/rotation of the relay
1652
log (remember that now the relay log starts with its Format_desc,
1656
thd->server_id = ev->server_id; // use the original server id for logging
1657
thd->set_time(); // time the query
1658
thd->lex->current_select= 0;
1660
ev->when= my_time(0);
1661
ev->thd = thd; // because up to this point, ev->thd == 0
1665
int32_t reason= ev->shall_skip(rli);
1666
if (reason == Log_event::EVENT_SKIP_COUNT)
1667
--rli->slave_skip_counter;
1668
pthread_mutex_unlock(&rli->data_lock);
1669
if (reason == Log_event::EVENT_SKIP_NOT)
1670
exec_res= ev->apply_event(rli);
1673
exec_res= ev->apply_event(rli);
1677
int32_t error= ev->update_pos(rli);
1679
The update should not fail, so print an error message and
1680
return an error code.
1682
TODO: Replace this with a decent error message when merged
1683
with BUG#24954 (which adds several new error message).
1688
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
1689
"It was not possible to update the positions"
1690
" of the relay log information: the slave may"
1691
" be in an inconsistent state."
1692
" Stopped in %s position %s",
1693
rli->group_relay_log_name,
1694
llstr(rli->group_relay_log_pos, buf));
1699
return(exec_res ? 1 : 0);
1704
Top-level function for executing the next event from the relay log.
1706
This function reads the event from the relay log, executes it, and
1707
advances the relay log position. It also handles errors, etc.
1709
This function may fail to apply the event for the following reasons:
1711
- The position specfied by the UNTIL condition of the START SLAVE
1714
- It was not possible to read the event from the log.
1716
- The slave is killed.
1718
- An error occurred when applying the event, and the event has been
1719
tried slave_trans_retries times. If the event has been retried
1720
fewer times, 0 is returned.
1722
- init_master_info or init_relay_log_pos failed. (These are called
1723
if a failure occurs when applying the event.)</li>
1725
- An error occurred when updating the binlog position.
1727
@retval 0 The event was applied.
1729
@retval 1 The event was not applied.
1731
static int32_t exec_relay_log_event(THD* thd, Relay_log_info* rli)
1734
We acquire this mutex since we need it for all operations except
1735
event execution. But we will release it in places where we will
1736
wait for something for example inside of next_event().
1738
pthread_mutex_lock(&rli->data_lock);
1740
Log_event * ev = next_event(rli);
1742
assert(rli->sql_thd==thd);
1744
if (sql_slave_killed(thd,rli))
1746
pthread_mutex_unlock(&rli->data_lock);
1755
This tests if the position of the beginning of the current event
1756
hits the UNTIL barrier.
1758
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
1759
rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
1760
rli->group_master_log_pos :
1761
ev->log_pos - ev->data_written))
1764
sql_print_information("Slave SQL thread stopped because it reached its"
1765
" UNTIL position %s", llstr(rli->until_pos(), buf));
1767
Setting abort_slave flag because we do not want additional message about
1768
error in query execution to be printed.
1770
rli->abort_slave= 1;
1771
pthread_mutex_unlock(&rli->data_lock);
1775
exec_res= apply_event_and_update_pos(ev, thd, rli, true);
1778
Format_description_log_event should not be deleted because it will be
1779
used to read info about the relay log's format; it will be deleted when
1780
the SQL thread does not need it, i.e. when this thread terminates.
1782
if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
1788
update_log_pos failed: this should not happen, so we don't
1794
if (slave_trans_retries)
1796
int32_t temp_err= 0;
1797
if (exec_res && (temp_err= has_temporary_error(thd)))
1801
We were in a transaction which has been rolled back because of a
1803
let's seek back to BEGIN log event and retry it all again.
1804
Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
1805
there is no rollback since 5.0.13 (ref: manual).
1806
We have to not only seek but also
1807
a) init_master_info(), to seek back to hot relay log's start for later
1808
(for when we will come back to this hot log after re-processing the
1809
possibly existing old logs where BEGIN is: check_binlog_magic() will
1810
then need the cache to be at position 0 (see comments at beginning of
1811
init_master_info()).
1812
b) init_relay_log_pos(), because the BEGIN may be an older relay log.
1814
if (rli->trans_retries < slave_trans_retries)
1816
if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
1817
sql_print_error("Failed to initialize the master info structure");
1818
else if (init_relay_log_pos(rli,
1819
rli->group_relay_log_name,
1820
rli->group_relay_log_pos,
1822
sql_print_error("Error initializing relay log position: %s",
1827
end_trans(thd, ROLLBACK);
1828
/* chance for concurrent connection to get more locks */
1829
safe_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
1830
(CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1831
pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1832
rli->trans_retries++;
1833
rli->retried_trans++;
1834
pthread_mutex_unlock(&rli->data_lock);
1838
sql_print_error("Slave SQL thread retried transaction %lu time(s) "
1839
"in vain, giving up. Consider raising the value of "
1840
"the slave_transaction_retries variable.",
1841
slave_trans_retries);
1843
else if ((exec_res && !temp_err) ||
1844
(opt_using_transactions &&
1845
rli->group_relay_log_pos == rli->event_relay_log_pos))
1848
Only reset the retry counter if the entire group succeeded
1849
or failed with a non-transient error. On a successful
1850
event, the execution will proceed as usual; in the case of a
1851
non-transient error, the slave will stop with an error.
1853
rli->trans_retries= 0; // restart from fresh
1858
pthread_mutex_unlock(&rli->data_lock);
1859
rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
1860
ER(ER_SLAVE_RELAY_LOG_READ_FAILURE), "\
1861
Could not parse relay log event entry. The possible reasons are: the master's \
1862
binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
1863
binary log), the slave's relay log is corrupted (you can check this by running \
1864
'mysqlbinlog' on the relay log), a network problem, or a bug in the master's \
1865
or slave's MySQL code. If you want to check the master's binary log or slave's \
1866
relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' \
1874
@brief Try to reconnect slave IO thread.
1876
@details Terminates current connection to master, sleeps for
1877
@c mi->connect_retry msecs and initiates new connection with
1878
@c safe_reconnect(). Variable pointed by @c retry_count is increased -
1879
if it exceeds @c master_retry_count then connection is not re-established
1880
and function signals error.
1881
Unless @c suppres_warnings is TRUE, a warning is put in the server error log
1882
when reconnecting. The warning message and messages used to report errors
1883
are taken from @c messages array. In case @c master_retry_count is exceeded,
1884
no messages are added to the log.
1886
@param[in] thd Thread context.
1887
@param[in] mysql MySQL connection.
1888
@param[in] mi Master connection information.
1889
@param[in,out] retry_count Number of attempts to reconnect.
1890
@param[in] suppress_warnings TRUE when a normal net read timeout
1891
has caused to reconnecting.
1892
@param[in] messages Messages to print/log, see
1893
reconnect_messages[] array.
1896
@retval 1 There was an error.
1899
static int32_t try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
1900
uint32_t *retry_count, bool suppress_warnings,
1901
const char *messages[SLAVE_RECON_MSG_MAX])
1903
mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
1904
thd->proc_info= messages[SLAVE_RECON_MSG_WAIT];
1905
#ifdef SIGNAL_WITH_VIO_CLOSE
1906
thd->clear_active_vio();
1909
if ((*retry_count)++)
1911
if (*retry_count > master_retry_count)
1912
return 1; // Don't retry forever
1913
safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1916
if (check_io_slave_killed(thd, mi, messages[SLAVE_RECON_MSG_KILLED_WAITING]))
1918
thd->proc_info = messages[SLAVE_RECON_MSG_AFTER];
1919
if (!suppress_warnings)
1921
char buf[256], llbuff[22];
1922
snprintf(buf, sizeof(buf), messages[SLAVE_RECON_MSG_FAILED],
1923
IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
1925
Raise a warining during registering on master/requesting dump.
1926
Log a message reading event.
1928
if (messages[SLAVE_RECON_MSG_COMMAND][0])
1930
mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1931
ER(ER_SLAVE_MASTER_COM_FAILURE),
1932
messages[SLAVE_RECON_MSG_COMMAND], buf);
1936
sql_print_information(buf);
1939
if (safe_reconnect(thd, mysql, mi, 1) || io_slave_killed(thd, mi))
1941
if (global_system_variables.log_warnings)
1942
sql_print_information(messages[SLAVE_RECON_MSG_KILLED_AFTER]);
1949
/* Slave I/O Thread entry point */
1951
pthread_handler_t handle_slave_io(void *arg)
1953
THD *thd; // needs to be first for thread_stack
1955
Master_info *mi = (Master_info*)arg;
1956
Relay_log_info *rli= &mi->rli;
1958
uint32_t retry_count;
1959
bool suppress_warnings;
1960
uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
1967
pthread_mutex_lock(&mi->run_lock);
1968
/* Inform waiting threads that slave has started */
1971
mi->events_till_disconnect = disconnect_slave_event_count;
1974
THD_CHECK_SENTRY(thd);
1977
pthread_detach_this_thread();
1978
thd->thread_stack= (char*) &thd; // remember where our stack is
1979
if (init_slave_thread(thd, SLAVE_THD_IO))
1981
pthread_cond_broadcast(&mi->start_cond);
1982
pthread_mutex_unlock(&mi->run_lock);
1983
sql_print_error("Failed during slave I/O thread initialization");
1986
pthread_mutex_lock(&LOCK_thread_count);
1987
threads.append(thd);
1988
pthread_mutex_unlock(&LOCK_thread_count);
1989
mi->slave_running = 1;
1990
mi->abort_slave = 0;
1991
pthread_mutex_unlock(&mi->run_lock);
1992
pthread_cond_broadcast(&mi->start_cond);
1994
if (!(mi->mysql = mysql = mysql_init(NULL)))
1996
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
1997
ER(ER_SLAVE_FATAL_ERROR), "error in mysql_init()");
2001
thd_proc_info(thd, "Connecting to master");
2002
// we can get killed during safe_connect
2003
if (!safe_connect(thd, mysql, mi))
2005
sql_print_information("Slave I/O thread: connected to master '%s@%s:%d',"
2006
"replication started in log '%s' at position %s",
2007
mi->user, mi->host, mi->port,
2009
llstr(mi->master_log_pos,llbuff));
2011
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
2012
thread, since a replication event can become this much larger than
2013
the corresponding packet (query) sent from client to master.
2015
mysql->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
2019
sql_print_information("Slave I/O thread killed while connecting to master");
2025
// TODO: the assignment below should be under mutex (5.0)
2026
mi->slave_running= MYSQL_SLAVE_RUN_CONNECT;
2027
thd->slave_net = &mysql->net;
2028
thd_proc_info(thd, "Checking master version");
2029
if (get_master_version_and_clock(mysql, mi))
2032
if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
2035
Register ourselves with the master.
2037
thd_proc_info(thd, "Registering slave on master");
2038
if (register_slave_on_master(mysql, mi, &suppress_warnings))
2040
if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
2041
"while registering slave on master"))
2043
sql_print_error("Slave I/O thread couldn't register on master");
2044
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2045
reconnect_messages[SLAVE_RECON_ACT_REG]))
2052
if (!retry_count_reg)
2055
sql_print_information("Forcing to reconnect slave I/O thread");
2056
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2057
reconnect_messages[SLAVE_RECON_ACT_REG]))
2063
while (!io_slave_killed(thd,mi))
2065
thd_proc_info(thd, "Requesting binlog dump");
2066
if (request_dump(mysql, mi, &suppress_warnings))
2068
sql_print_error("Failed on request_dump()");
2069
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
2070
requesting master dump") ||
2071
try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2072
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2076
if (!retry_count_dump)
2079
sql_print_information("Forcing to reconnect slave I/O thread");
2080
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2081
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2086
while (!io_slave_killed(thd,mi))
2090
We say "waiting" because read_event() will wait if there's nothing to
2091
read. But if there's something to read, it will not wait. The
2092
important thing is to not confuse users by saying "reading" whereas
2093
we're in fact receiving nothing.
2095
thd_proc_info(thd, "Waiting for master to send event");
2096
event_len= read_event(mysql, mi, &suppress_warnings);
2097
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
2100
if (!retry_count_event)
2102
retry_count_event++;
2103
sql_print_information("Forcing to reconnect slave I/O thread");
2104
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2105
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2110
if (event_len == packet_error)
2112
uint32_t mysql_error_number= mysql_errno(mysql);
2113
switch (mysql_error_number) {
2114
case CR_NET_PACKET_TOO_LARGE:
2116
Log entry on master is longer than max_allowed_packet (%ld) on \
2117
slave. If the entry is correct, restart the server with a higher value of \
2118
max_allowed_packet",
2119
thd->variables.max_allowed_packet);
2121
case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2122
sql_print_error(ER(mysql_error_number), mysql_error_number,
2123
mysql_error(mysql));
2125
case EE_OUTOFMEMORY:
2126
case ER_OUTOFMEMORY:
2128
Stopping slave I/O thread due to out-of-memory error from master");
2131
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2132
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2135
} // if (event_len == packet_error)
2137
retry_count=0; // ok event, reset retry counter
2138
thd_proc_info(thd, "Queueing master event to the relay log");
2139
if (queue_event(mi,(const char*)mysql->net.read_pos + 1, event_len))
2143
if (flush_master_info(mi, 1))
2145
sql_print_error("Failed to flush master info file");
2149
See if the relay logs take too much space.
2150
We don't lock mi->rli.log_space_lock here; this dirty read saves time
2151
and does not introduce any problem:
2152
- if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
2153
the clean value is 0), then we are reading only one more event as we
2154
should, and we'll block only at the next event. No big deal.
2155
- if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
2156
the clean value is 1), then we are going into wait_for_relay_log_space()
2157
for no reason, but this function will do a clean read, notice the clean
2158
value and exit immediately.
2160
if (rli->log_space_limit && rli->log_space_limit <
2161
rli->log_space_total &&
2162
!rli->ignore_log_space_limit)
2163
if (wait_for_relay_log_space(rli))
2165
sql_print_error("Slave I/O thread aborted while waiting for relay \
2174
// print the current replication position
2175
sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
2176
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2177
VOID(pthread_mutex_lock(&LOCK_thread_count));
2178
thd->query = thd->db = 0; // extra safety
2179
thd->query_length= thd->db_length= 0;
2180
VOID(pthread_mutex_unlock(&LOCK_thread_count));
2184
Here we need to clear the active VIO before closing the
2185
connection with the master. The reason is that THD::awake()
2186
might be called from terminate_slave_thread() because somebody
2187
issued a STOP SLAVE. If that happends, the close_active_vio()
2188
can be called in the middle of closing the VIO associated with
2189
the 'mysql' object, causing a crash.
2191
#ifdef SIGNAL_WITH_VIO_CLOSE
2192
thd->clear_active_vio();
2197
write_ignored_events_info_to_relay_log(thd, mi);
2198
thd_proc_info(thd, "Waiting for slave mutex on exit");
2199
pthread_mutex_lock(&mi->run_lock);
2201
/* Forget the relay log's format */
2202
delete mi->rli.relay_log.description_event_for_queue;
2203
mi->rli.relay_log.description_event_for_queue= 0;
2204
// TODO: make rpl_status part of Master_info
2205
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2206
assert(thd->net.buff != 0);
2207
net_end(&thd->net); // destructor will not free it, because net.vio is 0
2208
close_thread_tables(thd);
2209
pthread_mutex_lock(&LOCK_thread_count);
2210
THD_CHECK_SENTRY(thd);
2212
pthread_mutex_unlock(&LOCK_thread_count);
2214
mi->slave_running= 0;
2217
Note: the order of the two following calls (first broadcast, then unlock)
2218
is important. Otherwise a killer_thread can execute between the calls and
2219
delete the mi structure leading to a crash! (see BUG#25306 for details)
2221
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
2222
pthread_mutex_unlock(&mi->run_lock);
2225
return(0); // Can't return anything here
2229
/* Slave SQL Thread entry point */
2231
pthread_handler_t handle_slave_sql(void *arg)
2233
THD *thd; /* needs to be first for thread_stack */
2234
char llbuff[22],llbuff1[22];
2236
Relay_log_info* rli = &((Master_info*)arg)->rli;
2241
assert(rli->inited);
2242
pthread_mutex_lock(&rli->run_lock);
2243
assert(!rli->slave_running);
2245
rli->events_till_abort = abort_slave_event_count;
2248
thd->thread_stack = (char*)&thd; // remember where our stack is
2251
/* Inform waiting threads that slave has started */
2252
rli->slave_run_id++;
2253
rli->slave_running = 1;
2255
pthread_detach_this_thread();
2256
if (init_slave_thread(thd, SLAVE_THD_SQL))
2259
TODO: this is currently broken - slave start and change master
2260
will be stuck if we fail here
2262
pthread_cond_broadcast(&rli->start_cond);
2263
pthread_mutex_unlock(&rli->run_lock);
2264
sql_print_error("Failed during slave thread initialization");
2267
thd->init_for_queries();
2268
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
2269
pthread_mutex_lock(&LOCK_thread_count);
2270
threads.append(thd);
2271
pthread_mutex_unlock(&LOCK_thread_count);
2273
We are going to set slave_running to 1. Assuming slave I/O thread is
2274
alive and connected, this is going to make Seconds_Behind_Master be 0
2275
i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
2276
the moment we start we can think we are caught up, and the next second we
2277
start receiving data so we realize we are not caught up and
2278
Seconds_Behind_Master grows. No big deal.
2280
rli->abort_slave = 0;
2281
pthread_mutex_unlock(&rli->run_lock);
2282
pthread_cond_broadcast(&rli->start_cond);
2285
Reset errors for a clean start (otherwise, if the master is idle, the SQL
2286
thread may execute no Query_log_event, so the error will remain even
2287
though there's no problem anymore). Do not reset the master timestamp
2288
(imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
2289
as we are not sure that we are going to receive a query, we want to
2290
remember the last master timestamp (to say how many seconds behind we are
2292
But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
2296
//tell the I/O thread to take relay_log_space_limit into account from now on
2297
pthread_mutex_lock(&rli->log_space_lock);
2298
rli->ignore_log_space_limit= 0;
2299
pthread_mutex_unlock(&rli->log_space_lock);
2300
rli->trans_retries= 0; // start from "no error"
2302
if (init_relay_log_pos(rli,
2303
rli->group_relay_log_name,
2304
rli->group_relay_log_pos,
2305
1 /*need data lock*/, &errmsg,
2306
1 /*look for a description_event*/))
2308
sql_print_error("Error initializing relay log position: %s",
2312
THD_CHECK_SENTRY(thd);
2313
assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2315
Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2316
correct position when it's called just after my_b_seek() (the questionable
2317
stuff is those "seek is done on next read" comments in the my_b_seek()
2319
The crude reality is that this assertion randomly fails whereas
2320
replication seems to work fine. And there is no easy explanation why it
2321
fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
2322
init_relay_log_pos() called above). Maybe the assertion would be
2323
meaningful if we held rli->data_lock between the my_b_seek() and the
2326
assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2327
assert(rli->sql_thd == thd);
2329
if (global_system_variables.log_warnings)
2330
sql_print_information("Slave SQL thread initialized, starting replication in \
2331
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
2332
llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
2333
llstr(rli->group_relay_log_pos,llbuff1));
2335
/* execute init_slave variable */
2336
if (sys_init_slave.value_length)
2338
execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
2339
if (thd->is_slave_error)
2342
Slave SQL thread aborted. Can't execute init_slave query");
2348
First check until condition - probably there is nothing to execute. We
2349
do not want to wait for next event in this case.
2351
pthread_mutex_lock(&rli->data_lock);
2352
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
2353
rli->is_until_satisfied(rli->group_master_log_pos))
2356
sql_print_information("Slave SQL thread stopped because it reached its"
2357
" UNTIL position %s", llstr(rli->until_pos(), buf));
2358
pthread_mutex_unlock(&rli->data_lock);
2361
pthread_mutex_unlock(&rli->data_lock);
2363
/* Read queries from the IO/THREAD until this thread is killed */
2365
while (!sql_slave_killed(thd,rli))
2367
thd_proc_info(thd, "Reading event from the relay log");
2368
assert(rli->sql_thd == thd);
2369
THD_CHECK_SENTRY(thd);
2370
if (exec_relay_log_event(thd,rli))
2372
// do not scare the user if SQL thread was simply killed or stopped
2373
if (!sql_slave_killed(thd,rli))
2376
retrieve as much info as possible from the thd and, error
2377
codes and warnings and print this to the error log as to
2378
allow the user to locate the error
2380
uint32 const last_errno= rli->last_error().number;
2382
if (thd->is_error())
2384
char const *const errmsg= thd->main_da.message();
2386
if (last_errno == 0)
2388
rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), errmsg);
2390
else if (last_errno != thd->main_da.sql_errno())
2392
sql_print_error("Slave (additional info): %s Error_code: %d",
2393
errmsg, thd->main_da.sql_errno());
2397
/* Print any warnings issued */
2398
List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
2401
Added controlled slave thread cancel for replication
2402
of user-defined variables.
2404
bool udf_error = false;
2407
if (err->code == ER_CANT_OPEN_LIBRARY)
2409
sql_print_warning("Slave: %s Error_code: %d",err->msg, err->code);
2412
sql_print_error("Error loading user-defined library, slave SQL "
2413
"thread aborted. Install the missing library, and restart the "
2414
"slave SQL thread with \"SLAVE START\". We stopped at log '%s' "
2415
"position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,
2419
Error running query, slave SQL thread aborted. Fix the problem, and restart \
2420
the slave SQL thread with \"SLAVE START\". We stopped at log \
2421
'%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff));
2427
/* Thread stopped. Print the current replication position to the log */
2428
sql_print_information("Slave SQL thread exiting, replication stopped in log "
2429
"'%s' at position %s",
2430
RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
2435
Some events set some playgrounds, which won't be cleared because thread
2436
stops. Stopping of this thread may not be known to these events ("stop"
2437
request is detected only by the present function, not by events), so we
2438
must "proactively" clear playgrounds:
2440
rli->cleanup_context(thd, 1);
2441
VOID(pthread_mutex_lock(&LOCK_thread_count));
2443
Some extra safety, which should not been needed (normally, event deletion
2444
should already have done these assignments (each event which sets these
2445
variables is supposed to set them to 0 before terminating)).
2447
thd->query= thd->db= thd->catalog= 0;
2448
thd->query_length= thd->db_length= 0;
2449
VOID(pthread_mutex_unlock(&LOCK_thread_count));
2450
thd_proc_info(thd, "Waiting for slave mutex on exit");
2451
pthread_mutex_lock(&rli->run_lock);
2452
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
2453
pthread_mutex_lock(&rli->data_lock);
2454
assert(rli->slave_running == 1); // tracking buffer overrun
2455
/* When master_pos_wait() wakes up it will check this and terminate */
2456
rli->slave_running= 0;
2457
/* Forget the relay log's format */
2458
delete rli->relay_log.description_event_for_exec;
2459
rli->relay_log.description_event_for_exec= 0;
2460
/* Wake up master_pos_wait() */
2461
pthread_mutex_unlock(&rli->data_lock);
2462
pthread_cond_broadcast(&rli->data_cond);
2463
rli->ignore_log_space_limit= 0; /* don't need any lock */
2464
/* we die so won't remember charset - re-update them on next thread start */
2465
rli->cached_charset_invalidate();
2466
rli->save_temporary_tables = thd->temporary_tables;
2469
TODO: see if we can do this conditionally in next_event() instead
2470
to avoid unneeded position re-init
2472
thd->temporary_tables = 0; // remove tempation from destructor to close them
2473
assert(thd->net.buff != 0);
2474
net_end(&thd->net); // destructor will not free it, because we are weird
2475
assert(rli->sql_thd == thd);
2476
THD_CHECK_SENTRY(thd);
2478
pthread_mutex_lock(&LOCK_thread_count);
2479
THD_CHECK_SENTRY(thd);
2481
pthread_mutex_unlock(&LOCK_thread_count);
2483
Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
2484
is important. Otherwise a killer_thread can execute between the calls and
2485
delete the mi structure leading to a crash! (see BUG#25306 for details)
2487
pthread_cond_broadcast(&rli->stop_cond);
2488
pthread_mutex_unlock(&rli->run_lock); // tell the world we are done
2492
return(0); // Can't return anything here
2497
process_io_create_file()
2500
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
2504
bool cev_not_written;
2505
THD *thd = mi->io_thd;
2506
NET *net = &mi->mysql->net;
2508
if (unlikely(!cev->is_valid()))
2511
if (!rpl_filter->db_ok(cev->db))
2513
skip_load_data_infile(net);
2516
assert(cev->inited_from_old);
2517
thd->file_id = cev->file_id = mi->file_id++;
2518
thd->server_id = cev->server_id;
2519
cev_not_written = 1;
2521
if (unlikely(net_request_file(net,cev->fname)))
2523
sql_print_error("Slave I/O: failed requesting download of '%s'",
2529
This dummy block is so we could instantiate Append_block_log_event
2530
once and then modify it slightly instead of doing it multiple times
2534
Append_block_log_event aev(thd,0,0,0,0);
2538
if (unlikely((num_bytes=my_net_read(net)) == packet_error))
2540
sql_print_error("Network read error downloading '%s' from master",
2544
if (unlikely(!num_bytes)) /* eof */
2546
/* 3.23 master wants it */
2547
net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0);
2549
If we wrote Create_file_log_event, then we need to write
2550
Execute_load_log_event. If we did not write Create_file_log_event,
2551
then this is an empty file and we can just do as if the LOAD DATA
2552
INFILE had not existed, i.e. write nothing.
2554
if (unlikely(cev_not_written))
2556
Execute_load_log_event xev(thd,0,0);
2557
xev.log_pos = cev->log_pos;
2558
if (unlikely(mi->rli.relay_log.append(&xev)))
2560
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2561
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2562
"error writing Exec_load event to relay log");
2565
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2568
if (unlikely(cev_not_written))
2570
cev->block = net->read_pos;
2571
cev->block_len = num_bytes;
2572
if (unlikely(mi->rli.relay_log.append(cev)))
2574
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2575
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2576
"error writing Create_file event to relay log");
2580
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2584
aev.block = net->read_pos;
2585
aev.block_len = num_bytes;
2586
aev.log_pos = cev->log_pos;
2587
if (unlikely(mi->rli.relay_log.append(&aev)))
2589
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2590
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2591
"error writing Append_block event to relay log");
2594
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
2605
Start using a new binary log on the master
2609
mi master_info for the slave
2610
rev The rotate log event read from the binary log
2613
Updates the master info with the place in the next binary
2614
log where we should start reading.
2615
Rotate the relay log to avoid mixed-format relay logs.
2618
We assume we already locked mi->data_lock
2622
1 Log event is illegal
2626
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
2628
safe_mutex_assert_owner(&mi->data_lock);
2630
if (unlikely(!rev->is_valid()))
2633
/* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2634
memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
2635
mi->master_log_pos= rev->pos;
2637
If we do not do this, we will be getting the first
2638
rotate event forever, so we need to not disconnect after one.
2640
if (disconnect_slave_event_count)
2641
mi->events_till_disconnect++;
2644
If description_event_for_queue is format <4, there is conversion in the
2645
relay log to the slave's format (4). And Rotate can mean upgrade or
2646
nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
2647
no need to reset description_event_for_queue now. And if it's nothing (same
2648
master version as before), no need (still using the slave's format).
2650
if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
2652
delete mi->rli.relay_log.description_event_for_queue;
2653
/* start from format 3 (MySQL 4.0) again */
2654
mi->rli.relay_log.description_event_for_queue= new
2655
Format_description_log_event(3);
2658
Rotate the relay log makes binlog format detection easier (at next slave
2659
start or mysqlbinlog)
2661
rotate_relay_log(mi); /* will take the right mutexes */
2666
Reads a 3.23 event and converts it to the slave's format. This code was
2667
copied from MySQL 4.0.
2669
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
2672
const char *errmsg = 0;
2674
bool ignore_event= 0;
2676
Relay_log_info *rli= &mi->rli;
2679
If we get Load event, we need to pass a non-reusable buffer
2680
to read_log_event, so we do a trick
2682
if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
2684
if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
2686
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2687
ER(ER_SLAVE_FATAL_ERROR), "Memory allocation failed");
2690
memcpy(tmp_buf,buf,event_len);
2692
Create_file constructor wants a 0 as last char of buffer, this 0 will
2693
serve as the string-termination char for the file's name (which is at the
2695
We must increment event_len, otherwise the event constructor will not see
2696
this end 0, which leads to segfault.
2698
tmp_buf[event_len++]=0;
2699
int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
2700
buf = (const char*)tmp_buf;
2703
This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
2704
send the loaded file, and write it to the relay log in the form of
2705
Append_block/Exec_load (the SQL thread needs the data, as that thread is not
2706
connected to the master).
2708
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2709
mi->rli.relay_log.description_event_for_queue);
2712
sql_print_error("Read invalid event from master: '%s',\
2713
master could be corrupt but a more likely cause of this is a bug",
2715
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
2719
pthread_mutex_lock(&mi->data_lock);
2720
ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
2721
switch (ev->get_type_code()) {
2727
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2730
pthread_mutex_unlock(&mi->data_lock);
2735
case CREATE_FILE_EVENT:
2737
Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
2738
queue_old_event() which is for 3.23 events which don't comprise
2739
CREATE_FILE_EVENT. This is because read_log_event() above has just
2740
transformed LOAD_EVENT into CREATE_FILE_EVENT.
2743
/* We come here when and only when tmp_buf != 0 */
2744
assert(tmp_buf != 0);
2746
ev->log_pos+= inc_pos;
2747
int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2749
mi->master_log_pos += inc_pos;
2750
pthread_mutex_unlock(&mi->data_lock);
2751
my_free((char*)tmp_buf, MYF(0));
2758
if (likely(!ignore_event))
2762
Don't do it for fake Rotate events (see comment in
2763
Log_event::Log_event(const char* buf...) in log_event.cc).
2765
ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
2766
if (unlikely(rli->relay_log.append(ev)))
2769
pthread_mutex_unlock(&mi->data_lock);
2772
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2775
mi->master_log_pos+= inc_pos;
2776
pthread_mutex_unlock(&mi->data_lock);
2781
Reads a 4.0 event and converts it to the slave's format. This code was copied
2782
from queue_binlog_ver_1_event(), with some affordable simplifications.
2784
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
2787
const char *errmsg = 0;
2790
Relay_log_info *rli= &mi->rli;
2792
/* read_log_event() will adjust log_pos to be end_log_pos */
2793
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2794
mi->rli.relay_log.description_event_for_queue);
2797
sql_print_error("Read invalid event from master: '%s',\
2798
master could be corrupt but a more likely cause of this is a bug",
2800
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
2803
pthread_mutex_lock(&mi->data_lock);
2804
switch (ev->get_type_code()) {
2808
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2811
pthread_mutex_unlock(&mi->data_lock);
2820
if (unlikely(rli->relay_log.append(ev)))
2823
pthread_mutex_unlock(&mi->data_lock);
2826
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2828
mi->master_log_pos+= inc_pos;
2830
pthread_mutex_unlock(&mi->data_lock);
2837
Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
2838
(exactly, slave's) format. To do the conversion, we create a 5.0 event from
2839
the 3.23/4.0 bytes, then write this event to the relay log.
2842
Test this code before release - it has to be tested on a separate
2843
setup with 3.23 master or 4.0 master
2846
static int32_t queue_old_event(Master_info *mi, const char *buf,
2849
switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
2852
return(queue_binlog_ver_1_event(mi,buf,event_len));
2854
return(queue_binlog_ver_3_event(mi,buf,event_len));
2855
default: /* unsupported format; eg version 2 */
2863
If the event is 3.23/4.0, passes it to queue_old_event() which will convert
2864
it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
2865
no format conversion, it's pure read/write of bytes.
2866
So a 5.0.0 slave's relay log can contain events in the slave's format or in
2870
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
2875
Relay_log_info *rli= &mi->rli;
2876
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
2879
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
2880
buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
2881
return(queue_old_event(mi,buf,event_len));
2883
pthread_mutex_lock(&mi->data_lock);
2885
switch (buf[EVENT_TYPE_OFFSET]) {
2888
We needn't write this event to the relay log. Indeed, it just indicates a
2889
master server shutdown. The only thing this does is cleaning. But
2890
cleaning is already done on a per-master-thread basis (as the master
2891
server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
2892
prepared statements' deletion are TODO only when we binlog prep stmts).
2894
We don't even increment mi->master_log_pos, because we may be just after
2895
a Rotate event. Btw, in a few milliseconds we are going to have a Start
2896
event from the next binlog (unless the master is presently running
2902
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
2903
if (unlikely(process_io_rotate(mi,&rev)))
2905
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2909
Now the I/O thread has just changed its mi->master_log_name, so
2910
incrementing mi->master_log_pos is nonsense.
2915
case FORMAT_DESCRIPTION_EVENT:
2918
Create an event, and save it (when we rotate the relay log, we will have
2919
to write this event again).
2922
We are the only thread which reads/writes description_event_for_queue.
2923
The relay_log struct does not move (though some members of it can
2924
change), so we needn't any lock (no rli->data_lock, no log lock).
2926
Format_description_log_event* tmp;
2928
if (!(tmp= (Format_description_log_event*)
2929
Log_event::read_log_event(buf, event_len, &errmsg,
2930
mi->rli.relay_log.description_event_for_queue)))
2932
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2935
delete mi->rli.relay_log.description_event_for_queue;
2936
mi->rli.relay_log.description_event_for_queue= tmp;
2938
Though this does some conversion to the slave's format, this will
2939
preserve the master's binlog format version, and number of event types.
2942
If the event was not requested by the slave (the slave did not ask for
2943
it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
2945
inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
2949
case HEARTBEAT_LOG_EVENT:
2952
HB (heartbeat) cannot come before RL (Relay)
2955
Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
2958
error= ER_SLAVE_HEARTBEAT_FAILURE;
2959
error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2960
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2961
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2962
error_msg.append(STRING_WITH_LEN(" log_pos "));
2963
llstr(hb.log_pos, llbuf);
2964
error_msg.append(llbuf, strlen(llbuf));
2967
mi->received_heartbeats++;
2969
compare local and event's versions of log_file, log_pos.
2971
Heartbeat is sent only after an event corresponding to the corrdinates
2972
the heartbeat carries.
2973
Slave can not have a difference in coordinates except in the only
2974
special case when mi->master_log_name, master_log_pos have never
2975
been updated by Rotate event i.e when slave does not have any history
2976
with the master (and thereafter mi->master_log_pos is NULL).
2978
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2980
if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
2981
&& mi->master_log_name != NULL)
2982
|| mi->master_log_pos != hb.log_pos)
2984
/* missed events of heartbeat from the past */
2985
error= ER_SLAVE_HEARTBEAT_FAILURE;
2986
error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
2987
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2988
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2989
error_msg.append(STRING_WITH_LEN(" log_pos "));
2990
llstr(hb.log_pos, llbuf);
2991
error_msg.append(llbuf, strlen(llbuf));
2994
goto skip_relay_logging;
3004
If this event is originating from this server, don't queue it.
3005
We don't check this for 3.23 events because it's simpler like this; 3.23
3006
will be filtered anyway by the SQL slave thread which also tests the
3007
server id (we must also keep this test in the SQL thread, in case somebody
3008
upgrades a 4.0 slave which has a not-filtered relay log).
3010
ANY event coming from ourselves can be ignored: it is obvious for queries;
3011
for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
3012
(--log-slave-updates would not log that) unless this slave is also its
3013
direct master (an unsupported, useless setup!).
3016
pthread_mutex_lock(log_lock);
3018
if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
3019
!mi->rli.replicate_same_server_id)
3022
Do not write it to the relay log.
3023
a) We still want to increment mi->master_log_pos, so that we won't
3024
re-read this event from the master if the slave IO thread is now
3025
stopped/restarted (more efficient if the events we are ignoring are big
3027
b) We want to record that we are skipping events, for the information of
3028
the slave SQL thread, otherwise that thread may let
3029
rli->group_relay_log_pos stay too small if the last binlog's event is
3031
But events which were generated by this slave and which do not exist in
3032
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
3035
if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
3036
buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3037
buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3039
mi->master_log_pos+= inc_pos;
3040
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
3041
assert(rli->ign_master_log_name_end[0]);
3042
rli->ign_master_log_pos_end= mi->master_log_pos;
3044
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3048
/* write the event to the relay log */
3049
if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3051
mi->master_log_pos+= inc_pos;
3052
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3056
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
3058
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
3060
pthread_mutex_unlock(log_lock);
3065
pthread_mutex_unlock(&mi->data_lock);
3067
mi->report(ERROR_LEVEL, error, ER(error),
3068
(error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
3069
"could not queue event from master" :
3075
void end_relay_log_info(Relay_log_info* rli)
3079
if (rli->info_fd >= 0)
3081
end_io_cache(&rli->info_file);
3082
(void) my_close(rli->info_fd, MYF(MY_WME));
3085
if (rli->cur_log_fd >= 0)
3087
end_io_cache(&rli->cache_buf);
3088
(void)my_close(rli->cur_log_fd, MYF(MY_WME));
3089
rli->cur_log_fd = -1;
3092
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
3093
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3095
Delete the slave's temporary tables from memory.
3096
In the future there will be other actions than this, to ensure persistance
3097
of slave's temp tables after shutdown.
3099
rli->close_temporary_tables();
3104
Try to connect until successful or slave killed
3108
thd Thread handler for slave
3109
mysql MySQL connection handle
3110
mi Replication handle
3117
static int32_t safe_connect(THD* thd, MYSQL* mysql, Master_info* mi)
3119
return(connect_to_master(thd, mysql, mi, 0, 0));
3128
Try to connect until successful or slave killed or we have retried
3129
master_retry_count times
3132
static int32_t connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
3133
bool reconnect, bool suppress_warnings)
3135
int32_t slave_was_killed;
3136
int32_t last_errno= -2; // impossible error
3137
uint32_t err_count=0;
3140
mi->events_till_disconnect = disconnect_slave_event_count;
3141
uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
3142
if (opt_slave_compressed_protocol)
3143
client_flag=CLIENT_COMPRESS; /* We will use compression */
3145
mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3146
mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3148
mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
3149
/* This one is not strictly needed but we have it here for completeness */
3150
mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
3152
while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3153
(reconnect ? mysql_reconnect(mysql) != 0 :
3154
mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
3155
mi->port, 0, client_flag) == 0))
3157
/* Don't repeat last error */
3158
if ((int32_t)mysql_errno(mysql) != last_errno)
3160
last_errno=mysql_errno(mysql);
3161
suppress_warnings= 0;
3162
mi->report(ERROR_LEVEL, last_errno,
3163
"error %s to master '%s@%s:%d'"
3164
" - retry-time: %d retries: %u",
3165
(reconnect ? "reconnecting" : "connecting"),
3166
mi->user, mi->host, mi->port,
3167
mi->connect_retry, master_retry_count);
3170
By default we try forever. The reason is that failure will trigger
3171
master election, so if the user did not set master_retry_count we
3172
do not want to have election triggered on the first failure to
3175
if (++err_count == master_retry_count)
3179
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
3182
safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3186
if (!slave_was_killed)
3190
if (!suppress_warnings && global_system_variables.log_warnings)
3191
sql_print_information("Slave: connected to master '%s@%s:%d',\
3192
replication resumed in log '%s' at position %s", mi->user,
3195
llstr(mi->master_log_pos,llbuff));
3199
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
3200
general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d",
3201
mi->user, mi->host, mi->port);
3203
#ifdef SIGNAL_WITH_VIO_CLOSE
3204
thd->set_active_vio(mysql->net.vio);
3207
mysql->reconnect= 1;
3208
return(slave_was_killed);
3216
Try to connect until successful or slave killed or we have retried
3217
master_retry_count times
3220
static int32_t safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
3221
bool suppress_warnings)
3223
return(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
3228
Store the file and position where the execute-slave thread are in the
3232
flush_relay_log_info()
3233
rli Relay log information
3236
- As this is only called by the slave thread, we don't need to
3237
have a lock on this.
3238
- If there is an active transaction, then we don't update the position
3239
in the relay log. This is to ensure that we re-execute statements
3240
if we die in the middle of an transaction that was rolled back.
3241
- As a transaction never spans binary logs, we don't have to handle the
3242
case where we do a relay-log-rotation in the middle of the transaction.
3243
If this would not be the case, we would have to ensure that we
3244
don't delete the relay log file where the transaction started when
3245
we switch to a new relay log file.
3248
- Change the log file information to a binary format to avoid calling
3256
bool flush_relay_log_info(Relay_log_info* rli)
3260
if (unlikely(rli->no_storage))
3263
IO_CACHE *file = &rli->info_file;
3264
char buff[FN_REFLEN*2+22*2+4], *pos;
3266
my_b_seek(file, 0L);
3267
pos=strmov(buff, rli->group_relay_log_name);
3269
pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
3271
pos=strmov(pos, rli->group_master_log_name);
3273
pos=int64_t2str(rli->group_master_log_pos, pos, 10);
3275
if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
3277
if (flush_io_cache(file))
3280
/* Flushing the relay log is done by the slave I/O thread */
3286
Called when we notice that the current "hot" log got rotated under our feet.
3289
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
3291
assert(rli->cur_log != &rli->cache_buf);
3292
assert(rli->cur_log_fd == -1);
3294
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3295
if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
3299
We want to start exactly where we was before:
3300
relay_log_pos Current log pos
3301
pending Number of bytes already processed from the event
3303
rli->event_relay_log_pos= max(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE);
3304
my_b_seek(cur_log,rli->event_relay_log_pos);
3309
static Log_event* next_event(Relay_log_info* rli)
3312
IO_CACHE* cur_log = rli->cur_log;
3313
pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3314
const char* errmsg=0;
3315
THD* thd = rli->sql_thd;
3319
if (abort_slave_event_count && !rli->events_till_abort--)
3323
For most operations we need to protect rli members with data_lock,
3324
so we assume calling function acquired this mutex for us and we will
3325
hold it for the most of the loop below However, we will release it
3326
whenever it is worth the hassle, and in the cases when we go into a
3327
pthread_cond_wait() with the non-data_lock mutex
3329
safe_mutex_assert_owner(&rli->data_lock);
3331
while (!sql_slave_killed(thd,rli))
3334
We can have two kinds of log reading:
3336
rli->cur_log points at the IO_CACHE of relay_log, which
3337
is actively being updated by the I/O thread. We need to be careful
3338
in this case and make sure that we are not looking at a stale log that
3339
has already been rotated. If it has been, we reopen the log.
3341
The other case is much simpler:
3342
We just have a read only log that nobody else will be updating.
3345
if ((hot_log = (cur_log != &rli->cache_buf)))
3347
assert(rli->cur_log_fd == -1); // foreign descriptor
3348
pthread_mutex_lock(log_lock);
3351
Reading xxx_file_id is safe because the log will only
3352
be rotated when we hold relay_log.LOCK_log
3354
if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
3356
// The master has switched to a new log file; Reopen the old log file
3357
cur_log=reopen_relay_log(rli, &errmsg);
3358
pthread_mutex_unlock(log_lock);
3359
if (!cur_log) // No more log files
3361
hot_log=0; // Using old binary log
3365
As there is no guarantee that the relay is open (for example, an I/O
3366
error during a write by the slave I/O thread may have closed it), we
3369
if (!my_b_inited(cur_log))
3371
assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3372
assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
3375
Relay log is always in new format - if the master is 3.23, the
3376
I/O thread will convert the format for us.
3377
A problem: the description event may be in a previous relay log. So if
3378
the slave has been shutdown meanwhile, we would have to look in old relay
3379
logs, which may even have been deleted. So we need to write this
3380
description event at the beginning of the relay log.
3381
When the relay log is created when the I/O thread starts, easy: the
3382
master will send the description event and we will queue it.
3383
But if the relay log is created by new_file(): then the solution is:
3384
MYSQL_BIN_LOG::open() will write the buffered description event.
3386
if ((ev=Log_event::read_log_event(cur_log,0,
3387
rli->relay_log.description_event_for_exec)))
3390
assert(thd==rli->sql_thd);
3392
read it while we have a lock, to avoid a mutex lock in
3393
inc_event_relay_log_pos()
3395
rli->future_event_relay_log_pos= my_b_tell(cur_log);
3397
pthread_mutex_unlock(log_lock);
3400
assert(thd==rli->sql_thd);
3401
if (opt_reckless_slave) // For mysql-test
3403
if (cur_log->error < 0)
3405
errmsg = "slave SQL thread aborted because of I/O error";
3407
pthread_mutex_unlock(log_lock);
3410
if (!cur_log->error) /* EOF */
3413
On a hot log, EOF means that there are no more updates to
3414
process and we must block until I/O thread adds some and
3415
signals us to continue
3420
We say in Seconds_Behind_Master that we have "caught up". Note that
3421
for example if network link is broken but I/O slave thread hasn't
3422
noticed it (slave_net_timeout not elapsed), then we'll say "caught
3423
up" whereas we're not really caught up. Fixing that would require
3424
internally cutting timeout in smaller pieces in network read, no
3425
thanks. Another example: SQL has caught up on I/O, now I/O has read
3426
a new event and is queuing it; the false "0" will exist until SQL
3427
finishes executing the new event; it will be look abnormal only if
3428
the events have old timestamps (then you get "many", 0, "many").
3430
Transient phases like this can be fixed with implemeting
3431
Heartbeat event which provides the slave the status of the
3432
master at time the master does not have any new update to send.
3433
Seconds_Behind_Master would be zero only when master has no
3434
more updates in binlog for slave. The heartbeat can be sent
3435
in a (small) fraction of slave_net_timeout. Until it's done
3436
rli->last_master_timestamp is temporarely (for time of
3437
waiting for the following event) reset whenever EOF is
3440
time_t save_timestamp= rli->last_master_timestamp;
3441
rli->last_master_timestamp= 0;
3443
assert(rli->relay_log.get_open_count() ==
3444
rli->cur_log_old_open_count);
3446
if (rli->ign_master_log_name_end[0])
3448
/* We generate and return a Rotate, to make our positions advance */
3449
ev= new Rotate_log_event(rli->ign_master_log_name_end,
3450
0, rli->ign_master_log_pos_end,
3451
Rotate_log_event::DUP_NAME);
3452
rli->ign_master_log_name_end[0]= 0;
3453
pthread_mutex_unlock(log_lock);
3456
errmsg= "Slave SQL thread failed to create a Rotate event "
3457
"(out of memory?), SHOW SLAVE STATUS may be inaccurate";
3460
ev->server_id= 0; // don't be ignored by slave SQL thread
3465
We can, and should release data_lock while we are waiting for
3466
update. If we do not, show slave status will block
3468
pthread_mutex_unlock(&rli->data_lock);
3472
- the I/O thread has reached log_space_limit
3473
- the SQL thread has read all relay logs, but cannot purge for some
3475
* it has already purged all logs except the current one
3476
* there are other logs than the current one but they're involved in
3477
a transaction that finishes in the current one (or is not finished)
3479
Wake up the possibly waiting I/O thread, and set a boolean asking
3480
the I/O thread to temporarily ignore the log_space_limit
3481
constraint, because we do not want the I/O thread to block because of
3482
space (it's ok if it blocks for any other reason (e.g. because the
3483
master does not send anything). Then the I/O thread stops waiting
3484
and reads more events.
3485
The SQL thread decides when the I/O thread should take log_space_limit
3486
into account again : ignore_log_space_limit is reset to 0
3487
in purge_first_log (when the SQL thread purges the just-read relay
3488
log), and also when the SQL thread starts. We should also reset
3489
ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
3490
fact, no need as RESET SLAVE requires that the slave
3491
be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
3494
pthread_mutex_lock(&rli->log_space_lock);
3495
// prevent the I/O thread from blocking next times
3496
rli->ignore_log_space_limit= 1;
3498
If the I/O thread is blocked, unblock it. Ok to broadcast
3499
after unlock, because the mutex is only destroyed in
3500
~Relay_log_info(), i.e. when rli is destroyed, and rli will
3501
not be destroyed before we exit the present function.
3503
pthread_mutex_unlock(&rli->log_space_lock);
3504
pthread_cond_broadcast(&rli->log_space_cond);
3505
// Note that wait_for_update_relay_log unlocks lock_log !
3506
rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
3507
// re-acquire data lock since we released it earlier
3508
pthread_mutex_lock(&rli->data_lock);
3509
rli->last_master_timestamp= save_timestamp;
3513
If the log was not hot, we need to move to the next log in
3514
sequence. The next log could be hot or cold, we deal with both
3515
cases separately after doing some common initialization
3517
end_io_cache(cur_log);
3518
assert(rli->cur_log_fd >= 0);
3519
my_close(rli->cur_log_fd, MYF(MY_WME));
3520
rli->cur_log_fd = -1;
3522
if (relay_log_purge)
3525
purge_first_log will properly set up relay log coordinates in rli.
3526
If the group's coordinates are equal to the event's coordinates
3527
(i.e. the relay log was not rotated in the middle of a group),
3528
we can purge this relay log too.
3529
We do uint64_t and string comparisons, this may be slow but
3530
- purging the last relay log is nice (it can save 1GB of disk), so we
3531
like to detect the case where we can do it, and given this,
3532
- I see no better detection method
3533
- purge_first_log is not called that often
3535
if (rli->relay_log.purge_first_log
3537
rli->group_relay_log_pos == rli->event_relay_log_pos
3538
&& !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
3540
errmsg = "Error purging processed logs";
3547
If hot_log is set, then we already have a lock on
3548
LOCK_log. If not, we have to get the lock.
3550
According to Sasha, the only time this code will ever be executed
3551
is if we are recovering from a bug.
3553
if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
3555
errmsg = "error switching to the next log";
3558
rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3559
strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
3560
sizeof(rli->event_relay_log_name)-1);
3561
flush_relay_log_info(rli);
3565
Now we want to open this next log. To know if it's a hot log (the one
3566
being written by the I/O thread now) or a cold log, we can use
3567
is_active(); if it is hot, we use the I/O cache; if it's cold we open
3568
the file normally. But if is_active() reports that the log is hot, this
3569
may change between the test and the consequence of the test. So we may
3570
open the I/O cache whereas the log is now cold, which is nonsense.
3571
To guard against this, we need to have LOCK_log.
3574
if (!hot_log) /* if hot_log, we already have this mutex */
3575
pthread_mutex_lock(log_lock);
3576
if (rli->relay_log.is_active(rli->linfo.log_file_name))
3579
if (global_system_variables.log_warnings)
3580
sql_print_information("next log '%s' is currently active",
3581
rli->linfo.log_file_name);
3583
rli->cur_log= cur_log= rli->relay_log.get_log_file();
3584
rli->cur_log_old_open_count= rli->relay_log.get_open_count();
3585
assert(rli->cur_log_fd == -1);
3588
Read pointer has to be at the start since we are the only
3590
We must keep the LOCK_log to read the 4 first bytes, as this is a hot
3591
log (same as when we call read_log_event() above: for a hot log we
3594
if (check_binlog_magic(cur_log,&errmsg))
3596
if (!hot_log) pthread_mutex_unlock(log_lock);
3599
if (!hot_log) pthread_mutex_unlock(log_lock);
3602
if (!hot_log) pthread_mutex_unlock(log_lock);
3604
if we get here, the log was not hot, so we will have to open it
3605
ourselves. We are sure that the log is still not hot now (a log can get
3606
from hot to cold, but not from cold to hot). No need for LOCK_log.
3609
if (global_system_variables.log_warnings)
3610
sql_print_information("next log '%s' is not active",
3611
rli->linfo.log_file_name);
3613
// open_binlog() will check the magic header
3614
if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
3621
Read failed with a non-EOF error.
3622
TODO: come up with something better to handle this error
3625
pthread_mutex_unlock(log_lock);
3626
sql_print_error("Slave SQL thread: I/O error reading \
3627
event(errno: %d cur_log->error: %d)",
3628
my_errno,cur_log->error);
3629
// set read position to the beginning of the event
3630
my_b_seek(cur_log,rli->event_relay_log_pos);
3631
/* otherwise, we have had a partial read */
3632
errmsg = "Aborting slave SQL thread because of partial event read";
3633
break; // To end of function
3636
if (!errmsg && global_system_variables.log_warnings)
3638
sql_print_information("Error reading relay log event: %s",
3639
"slave SQL thread was killed");
3645
sql_print_error("Error reading relay log event: %s", errmsg);
3650
Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
3651
because of size is simpler because when we do it we already have all relevant
3652
locks; here we don't, so this function is mainly taking locks).
3653
Returns nothing as we cannot catch any error (MYSQL_BIN_LOG::new_file()
3657
void rotate_relay_log(Master_info* mi)
3659
Relay_log_info* rli= &mi->rli;
3661
/* We don't lock rli->run_lock. This would lead to deadlocks. */
3662
pthread_mutex_lock(&mi->run_lock);
3665
We need to test inited because otherwise, new_file() will attempt to lock
3666
LOCK_log, which may not be inited (if we're not a slave).
3673
/* If the relay log is closed, new_file() will do nothing. */
3674
rli->relay_log.new_file();
3677
We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
3678
be counted, so imagine a succession of FLUSH LOGS and assume the slave
3679
threads are started:
3680
relay_log_space decreases by the size of the deleted relay log, but does
3681
not increase, so flush-after-flush we may become negative, which is wrong.
3682
Even if this will be corrected as soon as a query is replicated on the
3683
slave (because the I/O thread will then call harvest_bytes_written() which
3684
will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
3685
output in SHOW SLAVE STATUS meanwhile. So we harvest now.
3686
If the log is closed, then this will just harvest the last writes, probably
3687
0 as they probably have been harvested.
3689
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3691
pthread_mutex_unlock(&mi->run_lock);
3697
Detects, based on master's version (as found in the relay log), if master
3699
@param rli Relay_log_info which tells the master's version
3700
@param bug_id Number of the bug as found in bugs.mysql.com
3701
@param report bool report error message, default TRUE
3702
@return true if master has the bug, FALSE if it does not.
3704
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
3706
struct st_version_range_for_one_bug {
3708
const uchar introduced_in[3]; // first version with bug
3709
const uchar fixed_in[3]; // first version with fix
3711
static struct st_version_range_for_one_bug versions_for_all_bugs[]=
3713
{24432, { 5, 0, 24 }, { 5, 0, 38 } },
3714
{24432, { 5, 1, 12 }, { 5, 1, 17 } },
3715
{33029, { 5, 0, 0 }, { 5, 0, 58 } },
3716
{33029, { 5, 1, 0 }, { 5, 1, 12 } },
3718
const uchar *master_ver=
3719
rli->relay_log.description_event_for_exec->server_version_split;
3721
assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3724
i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3726
const uchar *introduced_in= versions_for_all_bugs[i].introduced_in,
3727
*fixed_in= versions_for_all_bugs[i].fixed_in;
3728
if ((versions_for_all_bugs[i].bug_id == bug_id) &&
3729
(memcmp(introduced_in, master_ver, 3) <= 0) &&
3730
(memcmp(fixed_in, master_ver, 3) > 0))
3735
// a short message for SHOW SLAVE STATUS (message length constraints)
3736
my_printf_error(ER_UNKNOWN_ERROR, "master may suffer from"
3737
" http://bugs.mysql.com/bug.php?id=%u"
3738
" so slave stops; check error log on slave"
3739
" for more info", MYF(0), bug_id);
3740
// a verbose message for the error log
3741
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
3742
"According to the master's version ('%s'),"
3743
" it is probable that master suffers from this bug:"
3744
" http://bugs.mysql.com/bug.php?id=%u"
3745
" and thus replicating the current binary log event"
3746
" may make the slave's data become different from the"
3748
" To take no risk, slave refuses to replicate"
3749
" this event and stops."
3750
" We recommend that all updates be stopped on the"
3751
" master and slave, that the data of both be"
3752
" manually synchronized,"
3753
" that master's binary logs be deleted,"
3754
" that master be upgraded to a version at least"
3755
" equal to '%d.%d.%d'. Then replication can be"
3757
rli->relay_log.description_event_for_exec->server_version,
3759
fixed_in[0], fixed_in[1], fixed_in[2]);
3767
BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
3768
exclusive, if one statement in a SP generated AUTO_INCREMENT value
3769
by the top statement, all statements after it would be considered
3770
generated AUTO_INCREMENT value by the top statement, and a
3771
erroneous INSERT_ID value might be associated with these statement,
3772
which could cause duplicate entry error and stop the slave.
3774
Detect buggy master to work around.
3776
bool rpl_master_erroneous_autoinc(THD *thd)
3778
if (active_mi && active_mi->rli.sql_thd == thd)
3780
Relay_log_info *rli= &active_mi->rli;
3781
return rpl_master_has_bug(rli, 33029, false);
3786
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
3787
template class I_List_iterator<i_string>;
3788
template class I_List_iterator<i_string_pair>;
3792
@} (end of group Replication)
3795
#endif /* HAVE_REPLICATION */