1
/* Copyright (C) 2000-2003 DRIZZLE AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
@addtogroup Replication
23
@brief Code to run the io thread and the sql thread on the
26
#include <drizzled/server_includes.h>
28
#include <storage/myisam/myisam.h>
32
#include "rpl_filter.h"
33
#include "repl_failsafe.h"
34
#include <mysys/thr_alarm.h>
35
#include <libdrizzle/sql_common.h>
36
#include <libdrizzle/errmsg.h>
37
#include <mysys/mysys_err.h>
38
#include <drizzled/drizzled_error_messages.h>
40
#include "rpl_tblmap.h"
42
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
44
#define MAX_SLAVE_RETRY_PAUSE 5
45
bool use_slave_mask = 0;
46
MY_BITMAP slave_error_mask;
48
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
50
char* slave_load_tmpdir = 0;
51
Master_info *active_mi= 0;
52
bool replicate_same_server_id;
53
uint64_t relay_log_space_limit = 0;
56
When slave thread exits, we need to remember the temporary tables so we
57
can re-use them on slave start.
59
TODO: move the vars below under Master_info
62
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
63
int32_t events_till_abort = -1;
65
enum enum_slave_reconnect_actions
67
SLAVE_RECON_ACT_REG= 0,
68
SLAVE_RECON_ACT_DUMP= 1,
69
SLAVE_RECON_ACT_EVENT= 2,
73
enum enum_slave_reconnect_messages
75
SLAVE_RECON_MSG_WAIT= 0,
76
SLAVE_RECON_MSG_KILLED_WAITING= 1,
77
SLAVE_RECON_MSG_AFTER= 2,
78
SLAVE_RECON_MSG_FAILED= 3,
79
SLAVE_RECON_MSG_COMMAND= 4,
80
SLAVE_RECON_MSG_KILLED_AFTER= 5,
84
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
87
N_("Waiting to reconnect after a failed registration on master"),
88
N_("Slave I/O thread killed while waitnig to reconnect after a "
89
"failed registration on master"),
90
N_("Reconnecting after a failed registration on master"),
91
N_("failed registering on master, reconnecting to try again, "
92
"log '%s' at postion %s"),
94
N_("Slave I/O thread killed during or after reconnect")
97
N_("Waiting to reconnect after a failed binlog dump request"),
98
N_("Slave I/O thread killed while retrying master dump"),
99
N_("Reconnecting after a failed binlog dump request"),
100
N_("failed dump request, reconnecting to try again, "
101
"log '%s' at postion %s"),
103
N_("Slave I/O thread killed during or after reconnect")
106
N_("Waiting to reconnect after a failed master event read"),
107
N_("Slave I/O thread killed while waiting to reconnect "
108
"after a failed read"),
109
N_("Reconnecting after a failed master event read"),
110
N_("Slave I/O thread: Failed reading log event, "
111
"reconnecting to retry, log '%s' at postion %s"),
113
N_("Slave I/O thread killed during or after a "
114
"reconnect done to recover from failed read")
119
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
121
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
122
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
123
static bool wait_for_relay_log_space(Relay_log_info* rli);
124
static inline bool io_slave_killed(THD* thd,Master_info* mi);
125
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
126
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
127
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi);
128
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
129
bool suppress_warnings);
130
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
131
bool reconnect, bool suppress_warnings);
132
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
133
void* thread_killed_arg);
134
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
135
static Log_event* next_event(Relay_log_info* rli);
136
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
137
static int32_t terminate_slave_thread(THD *thd,
138
pthread_mutex_t* term_lock,
139
pthread_cond_t* term_cond,
140
volatile uint32_t *slave_running,
142
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
145
Find out which replications threads are running
149
mask Return value here
150
mi master_info for slave
151
inverse If set, returns which threads are not running
154
Get a bit mask for which threads are running so that we can later restart
158
mask If inverse == 0, running threads
159
If inverse == 1, stopped threads
162
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
164
bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
165
register int32_t tmp_mask=0;
168
tmp_mask |= SLAVE_IO;
170
tmp_mask |= SLAVE_SQL;
172
tmp_mask^= (SLAVE_IO | SLAVE_SQL);
182
void lock_slave_threads(Master_info* mi)
184
//TODO: see if we can do this without dual mutex
185
pthread_mutex_lock(&mi->run_lock);
186
pthread_mutex_lock(&mi->rli.run_lock);
192
unlock_slave_threads()
195
void unlock_slave_threads(Master_info* mi)
197
//TODO: see if we can do this without dual mutex
198
pthread_mutex_unlock(&mi->rli.run_lock);
199
pthread_mutex_unlock(&mi->run_lock);
204
/* Initialize slave structures */
209
This is called when mysqld starts. Before client connections are
210
accepted. However bootstrap may conflict with us if it does START SLAVE.
211
So it's safer to take the lock.
213
pthread_mutex_lock(&LOCK_active_mi);
215
TODO: re-write this to interate through the list of files
218
active_mi= new Master_info;
221
If master_host is not specified, try to read it from the master_info file.
222
If master_host is specified, create the master_info file if it doesn't
227
sql_print_error(_("Failed to allocate memory for the master info structure"));
231
if (init_master_info(active_mi,master_info_file,relay_log_info_file,
232
1, (SLAVE_IO | SLAVE_SQL)))
234
sql_print_error(_("Failed to initialize the master info structure"));
238
/* If server id is not set, start_slave_thread() will say it */
240
if (active_mi->host[0] && !opt_skip_slave_start)
242
if (start_slave_threads(1 /* need mutex */,
243
0 /* no wait for start*/,
247
SLAVE_IO | SLAVE_SQL))
249
sql_print_error(_("Failed to create slave threads"));
253
pthread_mutex_unlock(&LOCK_active_mi);
257
pthread_mutex_unlock(&LOCK_active_mi);
263
Init function to set up array for errors that should be skipped for slave
266
init_slave_skip_errors()
267
arg List of errors numbers to skip, separated with ','
270
Called from get_options() in mysqld.cc on start-up
273
void init_slave_skip_errors(const char* arg)
277
if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
279
fprintf(stderr, "Badly out of memory, please check your system status\n");
283
for (;my_isspace(system_charset_info,*arg);++arg)
285
if (!my_strnncoll(system_charset_info,(unsigned char*)arg,4,(const unsigned char*)"all",4))
287
bitmap_set_all(&slave_error_mask);
293
if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
295
if (err_code < MAX_SLAVE_ERROR)
296
bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
297
while (!my_isdigit(system_charset_info,*p) && *p)
304
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
307
return(0); /* successfully do nothing */
308
int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
309
pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
311
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
314
if ((error=terminate_slave_thread(mi->io_thd,io_lock,
321
if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
323
mi->rli.abort_slave=1;
324
if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
326
&mi->rli.slave_running,
336
Wait for a slave thread to terminate.
338
This function is called after requesting the thread to terminate
339
(by setting @c abort_slave member of @c Relay_log_info or @c
340
Master_info structure to 1). Termination of the thread is
341
controlled with the the predicate <code>*slave_running</code>.
343
Function will acquire @c term_lock before waiting on the condition
344
unless @c skip_lock is true in which case the mutex should be owned
345
by the caller of this function and will remain acquired after
346
return from the function.
349
Associated lock to use when waiting for @c term_cond
352
Condition that is signalled when the thread has terminated
355
Pointer to predicate to check for slave thread termination
358
If @c true the lock will not be acquired before waiting on
359
the condition. In this case, it is assumed that the calling
360
function acquires the lock before calling this function.
365
terminate_slave_thread(THD *thd,
366
pthread_mutex_t* term_lock,
367
pthread_cond_t* term_cond,
368
volatile uint32_t *slave_running,
374
pthread_mutex_lock(term_lock);
376
safe_mutex_assert_owner(term_lock);
381
pthread_mutex_unlock(term_lock);
382
return(ER_SLAVE_NOT_RUNNING);
385
THD_CHECK_SENTRY(thd);
388
Is is critical to test if the slave is running. Otherwise, we might
389
be referening freed memory trying to kick it
392
while (*slave_running) // Should always be true
394
pthread_mutex_lock(&thd->LOCK_delete);
395
#ifndef DONT_USE_THR_ALARM
397
Error codes from pthread_kill are:
398
EINVAL: invalid signal number (can't happen)
399
ESRCH: thread already killed (can happen, should be ignored)
401
int32_t err= pthread_kill(thd->real_id, thr_client_alarm);
402
assert(err != EINVAL);
404
thd->awake(THD::NOT_KILLED);
405
pthread_mutex_unlock(&thd->LOCK_delete);
408
There is a small chance that slave thread might miss the first
409
alarm. To protect againts it, resend the signal until it reacts
411
struct timespec abstime;
412
set_timespec(abstime,2);
413
error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
414
assert(error == ETIMEDOUT || error == 0);
417
assert(*slave_running == 0);
420
pthread_mutex_unlock(term_lock);
425
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
426
pthread_mutex_t *cond_lock,
427
pthread_cond_t *start_cond,
428
volatile uint32_t *slave_running,
429
volatile uint32_t *slave_run_id,
439
pthread_mutex_lock(start_lock);
443
pthread_cond_broadcast(start_cond);
445
pthread_mutex_unlock(start_lock);
446
sql_print_error(_("Server id not set, will not start slave"));
447
return(ER_BAD_SLAVE);
453
pthread_cond_broadcast(start_cond);
455
pthread_mutex_unlock(start_lock);
456
return(ER_SLAVE_MUST_STOP);
458
start_id= *slave_run_id;
461
struct sched_param tmp_sched_param;
463
memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
464
tmp_sched_param.sched_priority= CONNECT_PRIOR;
465
(void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
467
if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
470
pthread_mutex_unlock(start_lock);
471
return(ER_SLAVE_THREAD);
473
if (start_cond && cond_lock) // caller has cond_lock
475
THD* thd = current_thd;
476
while (start_id == *slave_run_id)
478
const char* old_msg = thd->enter_cond(start_cond,cond_lock,
479
"Waiting for slave thread to start");
480
pthread_cond_wait(start_cond,cond_lock);
481
thd->exit_cond(old_msg);
482
pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
484
return(thd->killed_errno());
488
pthread_mutex_unlock(start_lock);
494
start_slave_threads()
497
SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
498
sense to do that for starting a slave--we always care if it actually
499
started the threads that were not previously running
502
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
504
const char* master_info_fname __attribute__((unused)),
505
const char* slave_info_fname __attribute__((unused)),
508
pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
509
pthread_cond_t* cond_io=0,*cond_sql=0;
512
if (need_slave_mutex)
514
lock_io = &mi->run_lock;
515
lock_sql = &mi->rli.run_lock;
519
cond_io = &mi->start_cond;
520
cond_sql = &mi->rli.start_cond;
521
lock_cond_io = &mi->run_lock;
522
lock_cond_sql = &mi->rli.run_lock;
525
if (thread_mask & SLAVE_IO)
526
error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
528
&mi->slave_running, &mi->slave_run_id,
529
mi, 1); //high priority, to read the most possible
530
if (!error && (thread_mask & SLAVE_SQL))
532
error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
534
&mi->rli.slave_running, &mi->rli.slave_run_id,
537
terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
544
static int32_t end_slave_on_walk(Master_info* mi, unsigned char* /*unused*/)
553
Free all resources used by slave
562
This is called when the server terminates, in close_connections().
563
It terminates slave threads. However, some CHANGE MASTER etc may still be
564
running presently. If a START SLAVE was in progress, the mutex lock below
565
will make us wait until slave threads have started, and START SLAVE
566
returns, then we terminate them here.
568
pthread_mutex_lock(&LOCK_active_mi);
572
TODO: replace the line below with
573
list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
574
once multi-master code is ready.
576
terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
577
end_master_info(active_mi);
581
pthread_mutex_unlock(&LOCK_active_mi);
586
static bool io_slave_killed(THD* thd, Master_info* mi)
588
assert(mi->io_thd == thd);
589
assert(mi->slave_running); // tracking buffer overrun
590
return(mi->abort_slave || abort_loop || thd->killed);
594
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
596
assert(rli->sql_thd == thd);
597
assert(rli->slave_running == 1);// tracking buffer overrun
598
if (abort_loop || thd->killed || rli->abort_slave)
601
If we are in an unsafe situation (stopping could corrupt replication),
602
we give one minute to the slave SQL thread of grace before really
603
terminating, in the hope that it will be able to read more events and
604
the unsafe situation will soon be left. Note that this one minute starts
605
from the last time anything happened in the slave SQL thread. So it's
606
really one minute of idleness, we don't timeout if the slave SQL thread
609
if (rli->last_event_start_time == 0)
611
if (difftime(time(0), rli->last_event_start_time) > 60)
613
rli->report(ERROR_LEVEL, 0,
614
_("SQL thread had to stop in an unsafe situation, in "
615
"the middle of applying updates to a "
616
"non-transactional table without any primary key. "
617
"There is a risk of duplicate updates when the slave "
618
"SQL thread is restarted. Please check your tables' "
619
"contents after restart."));
628
skip_load_data_infile()
631
This is used to tell a 3.23 master to break send_file()
634
void skip_load_data_infile(NET *net)
636
(void)net_request_file(net, "/dev/null");
637
(void)my_net_read(net); // discard response
638
(void)net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0); // ok
643
bool net_request_file(NET* net, const char* fname)
645
return(net_write_command(net, 251, (unsigned char*) fname, strlen(fname),
646
(unsigned char*) "", 0));
650
From other comments and tests in code, it looks like
651
sometimes Query_log_event and Load_log_event can have db == 0
652
(see rewrite_db() above for example)
653
(cases where this happens are unclear; it may be when the master is 3.23).
656
const char *print_slave_db_safe(const char* db)
658
return((db ? db : ""));
661
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
662
const char *default_val)
666
if ((length=my_b_gets(f,var, max_size)))
668
char* last_p = var + length -1;
670
*last_p = 0; // if we stopped on newline, kill it
674
If we truncated a line or stopped on last char, remove all chars
675
up to and including newline.
678
while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
682
else if (default_val)
684
strmake(var, default_val, max_size-1);
691
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
696
if (my_b_gets(f, buf, sizeof(buf)))
701
else if (default_val)
709
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
714
if (my_b_gets(f, buf, sizeof(buf)))
716
if (sscanf(buf, "%f", var) != 1)
721
else if (default_val != 0.0)
729
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
731
if (io_slave_killed(thd, mi))
733
if (info && global_system_variables.log_warnings)
734
sql_print_information(info);
742
Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
743
relying on the binlog's version. This is not perfect: imagine an upgrade
744
of the master without waiting that all slaves are in sync with the master;
745
then a slave could be fooled about the binlog's format. This is what happens
746
when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
747
slaves are fooled. So we do this only to distinguish between 3.23 and more
748
recent masters (it's too late to change things for 3.23).
755
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
758
String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
759
char err_buff[MAX_SLAVE_ERRMSG];
760
const char* errmsg= 0;
762
DRIZZLE_RES *master_res= 0;
763
DRIZZLE_ROW master_row;
767
Free old description_event_for_queue (that is needed if we are in
770
delete mi->rli.relay_log.description_event_for_queue;
771
mi->rli.relay_log.description_event_for_queue= 0;
773
if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
775
errmsg = _("Master reported unrecognized DRIZZLE version");
776
err_code= ER_SLAVE_FATAL_ERROR;
777
sprintf(err_buff, ER(err_code), errmsg);
778
err_msg.append(err_buff);
783
Note the following switch will bug when we have DRIZZLE branch 30 ;)
785
switch (*drizzle->server_version)
790
errmsg = _("Master reported unrecognized DRIZZLE version");
791
err_code= ER_SLAVE_FATAL_ERROR;
792
sprintf(err_buff, ER(err_code), errmsg);
793
err_msg.append(err_buff);
796
mi->rli.relay_log.description_event_for_queue= new
797
Format_description_log_event(1, drizzle->server_version);
800
mi->rli.relay_log.description_event_for_queue= new
801
Format_description_log_event(3, drizzle->server_version);
805
Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
806
take the early steps (like tests for "is this a 3.23 master") which we
807
have to take before we receive the real master's Format_desc which will
808
override this one. Note that the Format_desc we create below is garbage
809
(it has the format of the *slave*); it's only good to help know if the
810
master is 3.23, 4.0, etc.
812
mi->rli.relay_log.description_event_for_queue= new
813
Format_description_log_event(4, drizzle->server_version);
819
This does not mean that a 5.0 slave will be able to read a 6.0 master; but
820
as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
821
can't read a 6.0 master, this will show up when the slave can't read some
822
events sent by the master, and there will be error messages.
825
if (err_msg.length() != 0)
828
/* as we are here, we tried to allocate the event */
829
if (!mi->rli.relay_log.description_event_for_queue)
831
errmsg= _("default Format_description_log_event");
832
err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
833
sprintf(err_buff, ER(err_code), errmsg);
834
err_msg.append(err_buff);
839
Compare the master and slave's clock. Do not die if master's clock is
840
unavailable (very old master not supporting UNIX_TIMESTAMP()?).
843
if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
844
(master_res= drizzle_store_result(drizzle)) &&
845
(master_row= drizzle_fetch_row(master_res)))
847
mi->clock_diff_with_master=
848
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
850
else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
852
mi->clock_diff_with_master= 0; /* The "most sensible" value */
853
sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
854
"do not trust column Seconds_Behind_Master of SHOW "
855
"SLAVE STATUS. Error: %s (%d)"),
856
drizzle_error(drizzle), drizzle_errno(drizzle));
859
drizzle_free_result(master_res);
862
Check that the master's server id and ours are different. Because if they
863
are equal (which can result from a simple copy of master's datadir to slave,
864
thus copying some my.cnf), replication will work but all events will be
866
Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
868
Note: we could have put a @@SERVER_ID in the previous SELECT
869
UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
871
if (!drizzle_real_query(drizzle,
872
STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
873
(master_res= drizzle_store_result(drizzle)))
875
if ((master_row= drizzle_fetch_row(master_res)) &&
876
(::server_id == strtoul(master_row[1], 0, 10)) &&
877
!mi->rli.replicate_same_server_id)
880
_("The slave I/O thread stops because master and slave have equal "
881
"DRIZZLE server ids; these ids must be different "
882
"for replication to work (or "
883
"the --replicate-same-server-id option must be used "
884
"on slave but this does"
885
"not always make sense; please check the manual before using it).");
886
err_code= ER_SLAVE_FATAL_ERROR;
887
sprintf(err_buff, ER(err_code), errmsg);
888
err_msg.append(err_buff);
890
drizzle_free_result(master_res);
896
Check that the master's global character_set_server and ours are the same.
897
Not fatal if query fails (old master?).
898
Note that we don't check for equality of global character_set_client and
899
collation_connection (neither do we prevent their setting in
900
set_var.cc). That's because from what I (Guilhem) have tested, the global
901
values of these 2 are never used (new connections don't use them).
902
We don't test equality of global collation_database either as it's is
903
going to be deprecated (made read-only) in 4.1 very soon.
904
The test is only relevant if master < 5.0.3 (we'll test only if it's older
905
than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
906
charset info in each binlog event.
907
We don't do it for 3.23 because masters <3.23.50 hang on
908
SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
909
test only if master is 4.x.
912
/* redundant with rest of code but safer against later additions */
913
if (*drizzle->server_version == '3')
916
if ((*drizzle->server_version == '4') &&
917
!drizzle_real_query(drizzle,
918
STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
919
(master_res= drizzle_store_result(drizzle)))
921
if ((master_row= drizzle_fetch_row(master_res)) &&
922
strcmp(master_row[0], global_system_variables.collation_server->name))
925
_("The slave I/O thread stops because master and slave have"
926
" different values for the COLLATION_SERVER global variable."
927
" The values must be equal for replication to work");
928
err_code= ER_SLAVE_FATAL_ERROR;
929
sprintf(err_buff, ER(err_code), errmsg);
930
err_msg.append(err_buff);
932
drizzle_free_result(master_res);
938
Perform analogous check for time zone. Theoretically we also should
939
perform check here to verify that SYSTEM time zones are the same on
940
slave and master, but we can't rely on value of @@system_time_zone
941
variable (it is time zone abbreviation) since it determined at start
942
time and so could differ for slave and master even if they are really
943
in the same system time zone. So we are omiting this check and just
944
relying on documentation. Also according to Monty there are many users
945
who are using replication between servers in various time zones. Hence
946
such check will broke everything for them. (And now everything will
947
work for them because by default both their master and slave will have
949
This check is only necessary for 4.x masters (and < 5.0.4 masters but
952
if ((*drizzle->server_version == '4') &&
953
!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
954
(master_res= drizzle_store_result(drizzle)))
956
if ((master_row= drizzle_fetch_row(master_res)) &&
957
strcmp(master_row[0],
958
global_system_variables.time_zone->get_name()->ptr()))
961
_("The slave I/O thread stops because master and slave have"
962
" different values for the TIME_ZONE global variable."
963
" The values must be equal for replication to work");
964
err_code= ER_SLAVE_FATAL_ERROR;
965
sprintf(err_buff, ER(err_code), errmsg);
966
err_msg.append(err_buff);
968
drizzle_free_result(master_res);
974
if (mi->heartbeat_period != 0.0)
977
const char query_format[]= "SET @master_heartbeat_period= %s";
978
char query[sizeof(query_format) - 2 + sizeof(llbuf)];
980
the period is an uint64_t of nano-secs.
982
llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
983
sprintf(query, query_format, llbuf);
985
if (drizzle_real_query(drizzle, query, strlen(query))
986
&& !check_io_slave_killed(mi->io_thd, mi, NULL))
988
err_msg.append("The slave I/O thread stops because querying master with '");
989
err_msg.append(query);
990
err_msg.append("' failed;");
991
err_msg.append(" error: ");
992
err_code= drizzle_errno(drizzle);
993
err_msg.qs_append(err_code);
994
err_msg.append(" '");
995
err_msg.append(drizzle_error(drizzle));
997
drizzle_free_result(drizzle_store_result(drizzle));
1000
drizzle_free_result(drizzle_store_result(drizzle));
1004
if (err_msg.length() != 0)
1006
sql_print_error(err_msg.ptr());
1007
assert(err_code != 0);
1008
mi->report(ERROR_LEVEL, err_code, err_msg.ptr());
1016
static bool wait_for_relay_log_space(Relay_log_info* rli)
1018
bool slave_killed=0;
1019
Master_info* mi = rli->mi;
1020
const char *save_proc_info;
1021
THD* thd = mi->io_thd;
1023
pthread_mutex_lock(&rli->log_space_lock);
1024
save_proc_info= thd->enter_cond(&rli->log_space_cond,
1025
&rli->log_space_lock,
1026
_("Waiting for the slave SQL thread "
1027
"to free enough relay log space"));
1028
while (rli->log_space_limit < rli->log_space_total &&
1029
!(slave_killed=io_slave_killed(thd,mi)) &&
1030
!rli->ignore_log_space_limit)
1031
pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1032
thd->exit_cond(save_proc_info);
1033
return(slave_killed);
1038
Builds a Rotate from the ignored events' info and writes it to relay log.
1041
write_ignored_events_info_to_relay_log()
1042
thd pointer to I/O thread's thd
1046
Slave I/O thread, going to die, must leave a durable trace of the
1047
ignored events' end position for the use of the slave SQL thread, by
1048
calling this function. Only that thread can call it (see assertion).
1050
static void write_ignored_events_info_to_relay_log(THD *thd __attribute__((unused)),
1053
Relay_log_info *rli= &mi->rli;
1054
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1056
assert(thd == mi->io_thd);
1057
pthread_mutex_lock(log_lock);
1058
if (rli->ign_master_log_name_end[0])
1060
Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1061
0, rli->ign_master_log_pos_end,
1062
Rotate_log_event::DUP_NAME);
1063
rli->ign_master_log_name_end[0]= 0;
1064
/* can unlock before writing as slave SQL thd will soon see our Rotate */
1065
pthread_mutex_unlock(log_lock);
1066
if (likely((bool)ev))
1068
ev->server_id= 0; // don't be ignored by slave SQL thread
1069
if (unlikely(rli->relay_log.append(ev)))
1070
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
1071
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
1072
_("failed to write a Rotate event"
1073
" to the relay log, SHOW SLAVE STATUS may be"
1075
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1076
if (flush_master_info(mi, 1))
1077
sql_print_error(_("Failed to flush master info file"));
1081
mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
1082
ER(ER_SLAVE_CREATE_EVENT_FAILURE),
1083
_("Rotate_event (out of memory?),"
1084
" SHOW SLAVE STATUS may be inaccurate"));
1087
pthread_mutex_unlock(log_lock);
1092
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
1093
bool *suppress_warnings)
1095
unsigned char buf[1024], *pos= buf;
1096
uint32_t report_host_len, report_user_len=0, report_password_len=0;
1098
*suppress_warnings= false;
1101
report_host_len= strlen(report_host);
1103
report_user_len= strlen(report_user);
1104
if (report_password)
1105
report_password_len= strlen(report_password);
1106
/* 30 is a good safety margin */
1107
if (report_host_len + report_user_len + report_password_len + 30 >
1109
return(0); // safety
1111
int4store(pos, server_id); pos+= 4;
1112
pos= net_store_data(pos, (unsigned char*) report_host, report_host_len);
1113
pos= net_store_data(pos, (unsigned char*) report_user, report_user_len);
1114
pos= net_store_data(pos, (unsigned char*) report_password, report_password_len);
1115
int2store(pos, (uint16_t) report_port); pos+= 2;
1116
int4store(pos, rpl_recovery_rank); pos+= 4;
1117
/* The master will fill in master_id */
1118
int4store(pos, 0); pos+= 4;
1120
if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
1122
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1124
*suppress_warnings= true; // Suppress reconnect warning
1126
else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
1129
snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle),
1130
drizzle_errno(drizzle));
1131
mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1132
ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1140
bool show_master_info(THD* thd, Master_info* mi)
1142
// TODO: fix this for multi-master
1143
List<Item> field_list;
1144
Protocol *protocol= thd->protocol;
1146
field_list.push_back(new Item_empty_string("Slave_IO_State",
1148
field_list.push_back(new Item_empty_string("Master_Host",
1150
field_list.push_back(new Item_empty_string("Master_User",
1152
field_list.push_back(new Item_return_int("Master_Port", 7,
1153
DRIZZLE_TYPE_LONG));
1154
field_list.push_back(new Item_return_int("Connect_Retry", 10,
1155
DRIZZLE_TYPE_LONG));
1156
field_list.push_back(new Item_empty_string("Master_Log_File",
1158
field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
1159
DRIZZLE_TYPE_LONGLONG));
1160
field_list.push_back(new Item_empty_string("Relay_Log_File",
1162
field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
1163
DRIZZLE_TYPE_LONGLONG));
1164
field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
1166
field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
1167
field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1168
field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
1169
field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
1170
field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
1171
field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
1172
field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
1173
field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
1175
field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
1176
field_list.push_back(new Item_empty_string("Last_Error", 20));
1177
field_list.push_back(new Item_return_int("Skip_Counter", 10,
1178
DRIZZLE_TYPE_LONG));
1179
field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
1180
DRIZZLE_TYPE_LONGLONG));
1181
field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
1182
DRIZZLE_TYPE_LONGLONG));
1183
field_list.push_back(new Item_empty_string("Until_Condition", 6));
1184
field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1185
field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1186
DRIZZLE_TYPE_LONGLONG));
1187
field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
1188
field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
1189
sizeof(mi->ssl_ca)));
1190
field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
1191
sizeof(mi->ssl_capath)));
1192
field_list.push_back(new Item_empty_string("Master_SSL_Cert",
1193
sizeof(mi->ssl_cert)));
1194
field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
1195
sizeof(mi->ssl_cipher)));
1196
field_list.push_back(new Item_empty_string("Master_SSL_Key",
1197
sizeof(mi->ssl_key)));
1198
field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1199
DRIZZLE_TYPE_LONGLONG));
1200
field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
1202
field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
1203
field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1204
field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
1205
field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
1207
if (protocol->send_fields(&field_list,
1208
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1213
String *packet= &thd->packet;
1214
protocol->prepare_for_resend();
1217
slave_running can be accessed without run_lock but not other
1218
non-volotile members like mi->io_thd, which is guarded by the mutex.
1220
pthread_mutex_lock(&mi->run_lock);
1221
protocol->store(mi->io_thd ? mi->io_thd->get_proc_info() : "", &my_charset_bin);
1222
pthread_mutex_unlock(&mi->run_lock);
1224
pthread_mutex_lock(&mi->data_lock);
1225
pthread_mutex_lock(&mi->rli.data_lock);
1226
protocol->store(mi->host, &my_charset_bin);
1227
protocol->store(mi->user, &my_charset_bin);
1228
protocol->store((uint32_t) mi->port);
1229
protocol->store((uint32_t) mi->connect_retry);
1230
protocol->store(mi->master_log_name, &my_charset_bin);
1231
protocol->store((uint64_t) mi->master_log_pos);
1232
protocol->store(mi->rli.group_relay_log_name +
1233
dirname_length(mi->rli.group_relay_log_name),
1235
protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1236
protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
1237
protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1238
"Yes" : "No", &my_charset_bin);
1239
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1240
protocol->store(rpl_filter->get_do_db());
1241
protocol->store(rpl_filter->get_ignore_db());
1244
String tmp(buf, sizeof(buf), &my_charset_bin);
1245
rpl_filter->get_do_table(&tmp);
1246
protocol->store(&tmp);
1247
rpl_filter->get_ignore_table(&tmp);
1248
protocol->store(&tmp);
1249
rpl_filter->get_wild_do_table(&tmp);
1250
protocol->store(&tmp);
1251
rpl_filter->get_wild_ignore_table(&tmp);
1252
protocol->store(&tmp);
1254
protocol->store(mi->rli.last_error().number);
1255
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1256
protocol->store((uint32_t) mi->rli.slave_skip_counter);
1257
protocol->store((uint64_t) mi->rli.group_master_log_pos);
1258
protocol->store((uint64_t) mi->rli.log_space_total);
1261
mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
1262
( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
1263
"Relay"), &my_charset_bin);
1264
protocol->store(mi->rli.until_log_name, &my_charset_bin);
1265
protocol->store((uint64_t) mi->rli.until_log_pos);
1267
protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
1268
protocol->store(mi->ssl_ca, &my_charset_bin);
1269
protocol->store(mi->ssl_capath, &my_charset_bin);
1270
protocol->store(mi->ssl_cert, &my_charset_bin);
1271
protocol->store(mi->ssl_cipher, &my_charset_bin);
1272
protocol->store(mi->ssl_key, &my_charset_bin);
1275
Seconds_Behind_Master: if SQL thread is running and I/O thread is
1276
connected, we can compute it otherwise show NULL (i.e. unknown).
1278
if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
1279
mi->rli.slave_running)
1281
long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
1282
- mi->clock_diff_with_master);
1284
Apparently on some systems time_diff can be <0. Here are possible
1285
reasons related to MySQL:
1286
- the master is itself a slave of another master whose time is ahead.
1287
- somebody used an explicit SET TIMESTAMP on the master.
1288
Possible reason related to granularity-to-second of time functions
1289
(nothing to do with MySQL), which can explain a value of -1:
1290
assume the master's and slave's time are perfectly synchronized, and
1291
that at slave's connection time, when the master's timestamp is read,
1292
it is at the very end of second 1, and (a very short time later) when
1293
the slave's timestamp is read it is at the very beginning of second
1294
2. Then the recorded value for master is 1 and the recorded value for
1295
slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1296
between timestamp of slave and rli->last_master_timestamp is 0
1297
(i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1298
This confuses users, so we don't go below 0: hence the cmax().
1300
last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1301
special marker to say "consider we have caught up".
1303
protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1304
cmax((long)0, time_diff) : 0));
1308
protocol->store_null();
1310
protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
1313
protocol->store(mi->last_error().number);
1315
protocol->store(mi->last_error().message, &my_charset_bin);
1317
protocol->store(mi->rli.last_error().number);
1319
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1321
pthread_mutex_unlock(&mi->rli.data_lock);
1322
pthread_mutex_unlock(&mi->data_lock);
1324
if (my_net_write(&thd->net, (unsigned char*) thd->packet.ptr(), packet->length()))
1332
void set_slave_thread_options(THD* thd)
1335
It's nonsense to constrain the slave threads with max_join_size; if a
1336
query succeeded on master, we HAVE to execute it. So set
1337
OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
1338
(and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
1339
SELECT examining more than 4 billion rows would still fail (yes, because
1340
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1341
only for client threads.
1343
uint64_t options= thd->options | OPTION_BIG_SELECTS;
1344
if (opt_log_slave_updates)
1345
options|= OPTION_BIN_LOG;
1347
options&= ~OPTION_BIN_LOG;
1348
thd->options= options;
1349
thd->variables.completion_type= 0;
1353
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
1355
thd->variables.character_set_client=
1356
global_system_variables.character_set_client;
1357
thd->variables.collation_connection=
1358
global_system_variables.collation_connection;
1359
thd->variables.collation_server=
1360
global_system_variables.collation_server;
1361
thd->update_charset();
1364
We use a const cast here since the conceptual (and externally
1365
visible) behavior of the function is to set the default charset of
1366
the thread. That the cache has to be invalidated is a secondary
1369
const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
1377
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
1379
int32_t simulate_error= 0;
1380
thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
1381
SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1382
thd->security_ctx->skip_grants();
1383
my_net_init(&thd->net, 0);
1385
Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1386
slave threads, since a replication event can become this much larger
1387
than the corresponding packet (query) sent from client to master.
1389
thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1390
+ MAX_LOG_EVENT_HEADER; /* note, incr over the global not session var */
1391
thd->slave_thread = 1;
1392
thd->enable_slow_log= opt_log_slow_slave_statements;
1393
set_slave_thread_options(thd);
1394
thd->client_capabilities = CLIENT_LOCAL_FILES;
1395
pthread_mutex_lock(&LOCK_thread_count);
1396
thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
1397
pthread_mutex_unlock(&LOCK_thread_count);
1399
simulate_error|= (1 << SLAVE_THD_IO);
1400
simulate_error|= (1 << SLAVE_THD_SQL);
1401
if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
1408
if (thd_type == SLAVE_THD_SQL)
1409
thd_proc_info(thd, "Waiting for the next event in relay log");
1411
thd_proc_info(thd, "Waiting for master update");
1412
thd->version=refresh_version;
1418
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1419
void* thread_killed_arg)
1422
thr_alarm_t alarmed;
1424
thr_alarm_init(&alarmed);
1425
time_t start_time= my_time(0);
1426
time_t end_time= start_time+sec;
1428
while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1432
The only reason we are asking for alarm is so that
1433
we will be woken up in case of murder, so if we do not get killed,
1434
set the alarm so it goes off after we wake up naturally
1436
thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
1438
thr_end_alarm(&alarmed);
1440
if ((*thread_killed)(thd,thread_killed_arg))
1442
start_time= my_time(0);
1448
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
1449
bool *suppress_warnings)
1451
unsigned char buf[FN_REFLEN + 10];
1453
int32_t binlog_flags = 0; // for now
1454
char* logname = mi->master_log_name;
1456
*suppress_warnings= false;
1458
// TODO if big log files: Change next to int8store()
1459
int4store(buf, (uint32_t) mi->master_log_pos);
1460
int2store(buf + 4, binlog_flags);
1461
int4store(buf + 6, server_id);
1462
len = (uint32_t) strlen(logname);
1463
memcpy(buf + 10, logname,len);
1464
if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
1467
Something went wrong, so we will just reconnect and retry later
1468
in the future, we should do a better error analysis, but for
1469
now we just fill up the error log :-)
1471
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1472
*suppress_warnings= true; // Suppress reconnect warning
1474
sql_print_error(_("Error on COM_BINLOG_DUMP: %d %s, will retry in %d secs"),
1475
drizzle_errno(drizzle), drizzle_error(drizzle),
1484
Read one event from the master
1488
DRIZZLE DRIZZLE connection
1489
mi Master connection information
1490
suppress_warnings TRUE when a normal net read timeout has caused us to
1491
try a reconnect. We do not want to print anything to
1492
the error log in this case because this a anormal
1493
event in an idle server.
1496
'packet_error' Error
1497
number Length of packet
1500
static uint32_t read_event(DRIZZLE *drizzle,
1501
Master_info *mi __attribute__((unused)),
1502
bool* suppress_warnings)
1506
*suppress_warnings= false;
1508
my_real_read() will time us out
1509
We check if we were told to die, and if not, try reading again
1511
if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
1512
return(packet_error);
1514
len = cli_safe_read(drizzle);
1515
if (len == packet_error || (int32_t) len < 1)
1517
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1520
We are trying a normal reconnect after a read timeout;
1521
we suppress prints to .err file as long as the reconnect
1522
happens without problems
1524
*suppress_warnings= true;
1527
sql_print_error(_("Error reading packet from server: %s ( server_errno=%d)"),
1528
drizzle_error(drizzle), drizzle_errno(drizzle));
1529
return(packet_error);
1532
/* Check if eof packet */
1533
if (len < 8 && drizzle->net.read_pos[0] == 254)
1535
sql_print_information(_("Slave: received end packet from server, apparent "
1536
"master shutdown: %s"),
1537
drizzle_error(drizzle));
1538
return(packet_error);
1545
int32_t check_expected_error(THD* thd __attribute__((unused)),
1546
Relay_log_info const *rli __attribute__((unused)),
1547
int32_t expected_error)
1549
switch (expected_error) {
1550
case ER_NET_READ_ERROR:
1551
case ER_NET_ERROR_ON_WRITE:
1552
case ER_QUERY_INTERRUPTED:
1553
case ER_SERVER_SHUTDOWN:
1554
case ER_NEW_ABORTING_CONNECTION:
1563
Check if the current error is of temporary nature of not.
1564
Some errors are temporary in nature, such as
1565
ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. Ndb also signals
1566
that the error is temporary by pushing a warning with the error code
1567
ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1569
static int32_t has_temporary_error(THD *thd)
1571
if (thd->is_fatal_error)
1574
if (thd->main_da.is_error())
1577
my_error(ER_LOCK_DEADLOCK, MYF(0));
1581
If there is no message in THD, we can't say if it's a temporary
1582
error or not. This is currently the case for Incident_log_event,
1583
which sets no message. Return FALSE.
1585
if (!thd->is_error())
1589
Temporary error codes:
1590
currently, InnoDB deadlock detected by InnoDB or lock
1591
wait timeout (innodb_lock_wait_timeout exceeded
1593
if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
1594
thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
1602
Applies the given event and advances the relay log position.
1604
In essence, this function does:
1607
ev->apply_event(rli);
1608
ev->update_pos(rli);
1611
But it also does some maintainance, such as skipping events if
1612
needed and reporting errors.
1614
If the @c skip flag is set, then it is tested whether the event
1615
should be skipped, by looking at the slave_skip_counter and the
1616
server id. The skip flag should be set when calling this from a
1617
replication thread but not set when executing an explicit BINLOG
1622
@retval 1 Error calling ev->apply_event().
1624
@retval 2 No error calling ev->apply_event(), but error calling
1627
int32_t apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
1630
int32_t exec_res= 0;
1633
Execute the event to change the database and update the binary
1634
log coordinates, but first we set some data that is needed for
1637
The event will be executed unless it is supposed to be skipped.
1639
Queries originating from this server must be skipped. Low-level
1640
events (Format_description_log_event, Rotate_log_event,
1641
Stop_log_event) from this server must also be skipped. But for
1642
those we don't want to modify 'group_master_log_pos', because
1643
these events did not exist on the master.
1644
Format_description_log_event is not completely skipped.
1646
Skip queries specified by the user in 'slave_skip_counter'. We
1647
can't however skip events that has something to do with the log
1650
Filtering on own server id is extremely important, to ignore
1651
execution of events created by the creation/rotation of the relay
1652
log (remember that now the relay log starts with its Format_desc,
1656
thd->server_id = ev->server_id; // use the original server id for logging
1657
thd->set_time(); // time the query
1658
thd->lex->current_select= 0;
1660
ev->when= my_time(0);
1661
ev->thd = thd; // because up to this point, ev->thd == 0
1665
int32_t reason= ev->shall_skip(rli);
1666
if (reason == Log_event::EVENT_SKIP_COUNT)
1667
--rli->slave_skip_counter;
1668
pthread_mutex_unlock(&rli->data_lock);
1669
if (reason == Log_event::EVENT_SKIP_NOT)
1670
exec_res= ev->apply_event(rli);
1673
exec_res= ev->apply_event(rli);
1677
int32_t error= ev->update_pos(rli);
1679
The update should not fail, so print an error message and
1680
return an error code.
1682
TODO: Replace this with a decent error message when merged
1683
with BUG#24954 (which adds several new error message).
1688
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
1689
_("It was not possible to update the positions"
1690
" of the relay log information: the slave may"
1691
" be in an inconsistent state."
1692
" Stopped in %s position %s"),
1693
rli->group_relay_log_name,
1694
llstr(rli->group_relay_log_pos, buf));
1699
return(exec_res ? 1 : 0);
1704
Top-level function for executing the next event from the relay log.
1706
This function reads the event from the relay log, executes it, and
1707
advances the relay log position. It also handles errors, etc.
1709
This function may fail to apply the event for the following reasons:
1711
- The position specfied by the UNTIL condition of the START SLAVE
1714
- It was not possible to read the event from the log.
1716
- The slave is killed.
1718
- An error occurred when applying the event, and the event has been
1719
tried slave_trans_retries times. If the event has been retried
1720
fewer times, 0 is returned.
1722
- init_master_info or init_relay_log_pos failed. (These are called
1723
if a failure occurs when applying the event.)</li>
1725
- An error occurred when updating the binlog position.
1727
@retval 0 The event was applied.
1729
@retval 1 The event was not applied.
1731
static int32_t exec_relay_log_event(THD* thd, Relay_log_info* rli)
1734
We acquire this mutex since we need it for all operations except
1735
event execution. But we will release it in places where we will
1736
wait for something for example inside of next_event().
1738
pthread_mutex_lock(&rli->data_lock);
1740
Log_event * ev = next_event(rli);
1742
assert(rli->sql_thd==thd);
1744
if (sql_slave_killed(thd,rli))
1746
pthread_mutex_unlock(&rli->data_lock);
1755
This tests if the position of the beginning of the current event
1756
hits the UNTIL barrier.
1758
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
1759
rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
1760
rli->group_master_log_pos :
1761
ev->log_pos - ev->data_written))
1764
sql_print_information(_("Slave SQL thread stopped because it reached its"
1765
" UNTIL position %s"),
1766
llstr(rli->until_pos(), buf));
1768
Setting abort_slave flag because we do not want additional message about
1769
error in query execution to be printed.
1771
rli->abort_slave= 1;
1772
pthread_mutex_unlock(&rli->data_lock);
1776
exec_res= apply_event_and_update_pos(ev, thd, rli, true);
1779
Format_description_log_event should not be deleted because it will be
1780
used to read info about the relay log's format; it will be deleted when
1781
the SQL thread does not need it, i.e. when this thread terminates.
1783
if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
1789
update_log_pos failed: this should not happen, so we don't
1795
if (slave_trans_retries)
1797
int32_t temp_err= 0;
1798
if (exec_res && (temp_err= has_temporary_error(thd)))
1802
We were in a transaction which has been rolled back because of a
1804
let's seek back to BEGIN log event and retry it all again.
1805
Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
1806
there is no rollback since 5.0.13 (ref: manual).
1807
We have to not only seek but also
1808
a) init_master_info(), to seek back to hot relay log's start for later
1809
(for when we will come back to this hot log after re-processing the
1810
possibly existing old logs where BEGIN is: check_binlog_magic() will
1811
then need the cache to be at position 0 (see comments at beginning of
1812
init_master_info()).
1813
b) init_relay_log_pos(), because the BEGIN may be an older relay log.
1815
if (rli->trans_retries < slave_trans_retries)
1817
if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
1818
sql_print_error(_("Failed to initialize the master info structure"));
1819
else if (init_relay_log_pos(rli,
1820
rli->group_relay_log_name,
1821
rli->group_relay_log_pos,
1823
sql_print_error(_("Error initializing relay log position: %s"),
1828
end_trans(thd, ROLLBACK);
1829
/* chance for concurrent connection to get more locks */
1830
safe_sleep(thd, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1831
(CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1832
pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1833
rli->trans_retries++;
1834
rli->retried_trans++;
1835
pthread_mutex_unlock(&rli->data_lock);
1839
sql_print_error(_("Slave SQL thread retried transaction %lu time(s) "
1840
"in vain, giving up. Consider raising the value of "
1841
"the slave_transaction_retries variable."),
1842
slave_trans_retries);
1844
else if ((exec_res && !temp_err) ||
1845
(opt_using_transactions &&
1846
rli->group_relay_log_pos == rli->event_relay_log_pos))
1849
Only reset the retry counter if the entire group succeeded
1850
or failed with a non-transient error. On a successful
1851
event, the execution will proceed as usual; in the case of a
1852
non-transient error, the slave will stop with an error.
1854
rli->trans_retries= 0; // restart from fresh
1859
pthread_mutex_unlock(&rli->data_lock);
1860
rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
1861
ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
1862
_("Could not parse relay log event entry. The possible reasons "
1863
"are: the master's binary log is corrupted (you can check this "
1864
"by running 'mysqlbinlog' on the binary log), the slave's "
1865
"relay log is corrupted (you can check this by running "
1866
"'mysqlbinlog' on the relay log), a network problem, or a bug "
1867
"in the master's or slave's DRIZZLE code. If you want to check "
1868
"the master's binary log or slave's relay log, you will be "
1869
"able to know their names by issuing 'SHOW SLAVE STATUS' "
1876
@brief Try to reconnect slave IO thread.
1878
@details Terminates current connection to master, sleeps for
1879
@c mi->connect_retry msecs and initiates new connection with
1880
@c safe_reconnect(). Variable pointed by @c retry_count is increased -
1881
if it exceeds @c master_retry_count then connection is not re-established
1882
and function signals error.
1883
Unless @c suppres_warnings is TRUE, a warning is put in the server error log
1884
when reconnecting. The warning message and messages used to report errors
1885
are taken from @c messages array. In case @c master_retry_count is exceeded,
1886
no messages are added to the log.
1888
@param[in] thd Thread context.
1889
@param[in] DRIZZLE DRIZZLE connection.
1890
@param[in] mi Master connection information.
1891
@param[in,out] retry_count Number of attempts to reconnect.
1892
@param[in] suppress_warnings TRUE when a normal net read timeout
1893
has caused to reconnecting.
1894
@param[in] messages Messages to print/log, see
1895
reconnect_messages[] array.
1898
@retval 1 There was an error.
1901
static int32_t try_to_reconnect(THD *thd, DRIZZLE *drizzle, Master_info *mi,
1902
uint32_t *retry_count, bool suppress_warnings,
1903
const char *messages[SLAVE_RECON_MSG_MAX])
1905
mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1906
thd->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
1907
drizzle_disconnect(drizzle);
1908
if ((*retry_count)++)
1910
if (*retry_count > master_retry_count)
1911
return 1; // Don't retry forever
1912
safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1915
if (check_io_slave_killed(thd, mi,
1916
_(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1918
thd->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
1919
if (!suppress_warnings)
1921
char buf[256], llbuff[22];
1922
snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1923
IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
1925
Raise a warining during registering on master/requesting dump.
1926
Log a message reading event.
1928
if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
1930
mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1931
ER(ER_SLAVE_MASTER_COM_FAILURE),
1932
_(messages[SLAVE_RECON_MSG_COMMAND]), buf);
1936
sql_print_information(buf);
1939
if (safe_reconnect(thd, drizzle, mi, 1) || io_slave_killed(thd, mi))
1941
if (global_system_variables.log_warnings)
1942
sql_print_information(_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1949
/* Slave I/O Thread entry point */
1951
pthread_handler_t handle_slave_io(void *arg)
1953
THD *thd; // needs to be first for thread_stack
1955
Master_info *mi = (Master_info*)arg;
1956
Relay_log_info *rli= &mi->rli;
1958
uint32_t retry_count;
1959
bool suppress_warnings;
1960
uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
1967
pthread_mutex_lock(&mi->run_lock);
1968
/* Inform waiting threads that slave has started */
1971
mi->events_till_disconnect = disconnect_slave_event_count;
1974
THD_CHECK_SENTRY(thd);
1977
pthread_detach_this_thread();
1978
thd->thread_stack= (char*) &thd; // remember where our stack is
1979
if (init_slave_thread(thd, SLAVE_THD_IO))
1981
pthread_cond_broadcast(&mi->start_cond);
1982
pthread_mutex_unlock(&mi->run_lock);
1983
sql_print_error(_("Failed during slave I/O thread initialization"));
1986
pthread_mutex_lock(&LOCK_thread_count);
1987
threads.append(thd);
1988
pthread_mutex_unlock(&LOCK_thread_count);
1989
mi->slave_running = 1;
1990
mi->abort_slave = 0;
1991
pthread_mutex_unlock(&mi->run_lock);
1992
pthread_cond_broadcast(&mi->start_cond);
1994
if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
1996
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
1997
ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
2001
thd_proc_info(thd, "Connecting to master");
2002
// we can get killed during safe_connect
2003
if (!safe_connect(thd, drizzle, mi))
2005
sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
2006
"replication started in log '%s' at position %s"),
2007
mi->user, mi->host, mi->port,
2009
llstr(mi->master_log_pos,llbuff));
2011
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
2012
thread, since a replication event can become this much larger than
2013
the corresponding packet (query) sent from client to master.
2015
drizzle->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
2019
sql_print_information(_("Slave I/O thread killed while connecting to master"));
2025
// TODO: the assignment below should be under mutex (5.0)
2026
mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
2027
thd->slave_net = &drizzle->net;
2028
thd_proc_info(thd, "Checking master version");
2029
if (get_master_version_and_clock(drizzle, mi))
2032
if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
2035
Register ourselves with the master.
2037
thd_proc_info(thd, "Registering slave on master");
2038
if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2040
if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
2041
"while registering slave on master"))
2043
sql_print_error(_("Slave I/O thread couldn't register on master"));
2044
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2045
reconnect_messages[SLAVE_RECON_ACT_REG]))
2052
if (!retry_count_reg)
2055
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2056
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2057
reconnect_messages[SLAVE_RECON_ACT_REG]))
2063
while (!io_slave_killed(thd,mi))
2065
thd_proc_info(thd, "Requesting binlog dump");
2066
if (request_dump(drizzle, mi, &suppress_warnings))
2068
sql_print_error(_("Failed on request_dump()"));
2069
if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while \
2070
requesting master dump")) ||
2071
try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2072
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2076
if (!retry_count_dump)
2079
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2080
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2081
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2086
while (!io_slave_killed(thd,mi))
2090
We say "waiting" because read_event() will wait if there's nothing to
2091
read. But if there's something to read, it will not wait. The
2092
important thing is to not confuse users by saying "reading" whereas
2093
we're in fact receiving nothing.
2095
thd_proc_info(thd, _("Waiting for master to send event"));
2096
event_len= read_event(drizzle, mi, &suppress_warnings);
2097
if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while "
2100
if (!retry_count_event)
2102
retry_count_event++;
2103
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2104
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2105
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2110
if (event_len == packet_error)
2112
uint32_t drizzle_error_number= drizzle_errno(drizzle);
2113
switch (drizzle_error_number) {
2114
case CR_NET_PACKET_TOO_LARGE:
2115
sql_print_error(_("Log entry on master is longer than "
2116
"max_allowed_packet (%ld) on "
2117
"slave. If the entry is correct, restart the "
2118
"server with a higher value of "
2119
"max_allowed_packet"),
2120
thd->variables.max_allowed_packet);
2122
case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2123
sql_print_error(ER(drizzle_error_number), drizzle_error_number,
2124
drizzle_error(drizzle));
2126
case EE_OUTOFMEMORY:
2127
case ER_OUTOFMEMORY:
2129
_("Stopping slave I/O thread due to out-of-memory error from master"));
2132
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2133
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2136
} // if (event_len == packet_error)
2138
retry_count=0; // ok event, reset retry counter
2139
thd_proc_info(thd, _("Queueing master event to the relay log"));
2140
if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2144
if (flush_master_info(mi, 1))
2146
sql_print_error(_("Failed to flush master info file"));
2150
See if the relay logs take too much space.
2151
We don't lock mi->rli.log_space_lock here; this dirty read saves time
2152
and does not introduce any problem:
2153
- if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
2154
the clean value is 0), then we are reading only one more event as we
2155
should, and we'll block only at the next event. No big deal.
2156
- if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
2157
the clean value is 1), then we are going into wait_for_relay_log_space()
2158
for no reason, but this function will do a clean read, notice the clean
2159
value and exit immediately.
2161
if (rli->log_space_limit && rli->log_space_limit <
2162
rli->log_space_total &&
2163
!rli->ignore_log_space_limit)
2164
if (wait_for_relay_log_space(rli))
2166
sql_print_error(_("Slave I/O thread aborted while waiting for "
2167
"relay log space"));
2175
// print the current replication position
2176
sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2178
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2179
pthread_mutex_lock(&LOCK_thread_count);
2180
thd->query = thd->db = 0; // extra safety
2181
thd->query_length= thd->db_length= 0;
2182
pthread_mutex_unlock(&LOCK_thread_count);
2186
Here we need to clear the active VIO before closing the
2187
connection with the master. The reason is that THD::awake()
2188
might be called from terminate_slave_thread() because somebody
2189
issued a STOP SLAVE. If that happends, the close_active_vio()
2190
can be called in the middle of closing the VIO associated with
2191
the 'mysql' object, causing a crash.
2193
drizzle_close(drizzle);
2196
write_ignored_events_info_to_relay_log(thd, mi);
2197
thd_proc_info(thd, _("Waiting for slave mutex on exit"));
2198
pthread_mutex_lock(&mi->run_lock);
2200
/* Forget the relay log's format */
2201
delete mi->rli.relay_log.description_event_for_queue;
2202
mi->rli.relay_log.description_event_for_queue= 0;
2203
// TODO: make rpl_status part of Master_info
2204
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2205
assert(thd->net.buff != 0);
2206
net_end(&thd->net); // destructor will not free it, because net.vio is 0
2207
close_thread_tables(thd);
2208
pthread_mutex_lock(&LOCK_thread_count);
2209
THD_CHECK_SENTRY(thd);
2211
pthread_mutex_unlock(&LOCK_thread_count);
2213
mi->slave_running= 0;
2216
Note: the order of the two following calls (first broadcast, then unlock)
2217
is important. Otherwise a killer_thread can execute between the calls and
2218
delete the mi structure leading to a crash! (see BUG#25306 for details)
2220
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
2221
pthread_mutex_unlock(&mi->run_lock);
2224
return(0); // Can't return anything here
2228
/* Slave SQL Thread entry point */
2230
pthread_handler_t handle_slave_sql(void *arg)
2232
THD *thd; /* needs to be first for thread_stack */
2233
char llbuff[22],llbuff1[22];
2235
Relay_log_info* rli = &((Master_info*)arg)->rli;
2240
assert(rli->inited);
2241
pthread_mutex_lock(&rli->run_lock);
2242
assert(!rli->slave_running);
2244
rli->events_till_abort = abort_slave_event_count;
2247
thd->thread_stack = (char*)&thd; // remember where our stack is
2250
/* Inform waiting threads that slave has started */
2251
rli->slave_run_id++;
2252
rli->slave_running = 1;
2254
pthread_detach_this_thread();
2255
if (init_slave_thread(thd, SLAVE_THD_SQL))
2258
TODO: this is currently broken - slave start and change master
2259
will be stuck if we fail here
2261
pthread_cond_broadcast(&rli->start_cond);
2262
pthread_mutex_unlock(&rli->run_lock);
2263
sql_print_error(_("Failed during slave thread initialization"));
2266
thd->init_for_queries();
2267
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
2268
pthread_mutex_lock(&LOCK_thread_count);
2269
threads.append(thd);
2270
pthread_mutex_unlock(&LOCK_thread_count);
2272
We are going to set slave_running to 1. Assuming slave I/O thread is
2273
alive and connected, this is going to make Seconds_Behind_Master be 0
2274
i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
2275
the moment we start we can think we are caught up, and the next second we
2276
start receiving data so we realize we are not caught up and
2277
Seconds_Behind_Master grows. No big deal.
2279
rli->abort_slave = 0;
2280
pthread_mutex_unlock(&rli->run_lock);
2281
pthread_cond_broadcast(&rli->start_cond);
2284
Reset errors for a clean start (otherwise, if the master is idle, the SQL
2285
thread may execute no Query_log_event, so the error will remain even
2286
though there's no problem anymore). Do not reset the master timestamp
2287
(imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
2288
as we are not sure that we are going to receive a query, we want to
2289
remember the last master timestamp (to say how many seconds behind we are
2291
But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
2295
//tell the I/O thread to take relay_log_space_limit into account from now on
2296
pthread_mutex_lock(&rli->log_space_lock);
2297
rli->ignore_log_space_limit= 0;
2298
pthread_mutex_unlock(&rli->log_space_lock);
2299
rli->trans_retries= 0; // start from "no error"
2301
if (init_relay_log_pos(rli,
2302
rli->group_relay_log_name,
2303
rli->group_relay_log_pos,
2304
1 /*need data lock*/, &errmsg,
2305
1 /*look for a description_event*/))
2307
sql_print_error(_("Error initializing relay log position: %s"),
2311
THD_CHECK_SENTRY(thd);
2312
assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2314
Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2315
correct position when it's called just after my_b_seek() (the questionable
2316
stuff is those "seek is done on next read" comments in the my_b_seek()
2318
The crude reality is that this assertion randomly fails whereas
2319
replication seems to work fine. And there is no easy explanation why it
2320
fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
2321
init_relay_log_pos() called above). Maybe the assertion would be
2322
meaningful if we held rli->data_lock between the my_b_seek() and the
2325
assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2326
assert(rli->sql_thd == thd);
2328
if (global_system_variables.log_warnings)
2329
sql_print_information(_("Slave SQL thread initialized, "
2330
"starting replication in log '%s' at "
2331
"position %s, relay log '%s' position: %s"),
2333
llstr(rli->group_master_log_pos,llbuff),
2334
rli->group_relay_log_name,
2335
llstr(rli->group_relay_log_pos,llbuff1));
2337
/* execute init_slave variable */
2338
if (sys_init_slave.value_length)
2340
execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
2341
if (thd->is_slave_error)
2343
sql_print_error(_("Slave SQL thread aborted. "
2344
"Can't execute init_slave query"));
2350
First check until condition - probably there is nothing to execute. We
2351
do not want to wait for next event in this case.
2353
pthread_mutex_lock(&rli->data_lock);
2354
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
2355
rli->is_until_satisfied(rli->group_master_log_pos))
2358
sql_print_information(_("Slave SQL thread stopped because it reached its"
2359
" UNTIL position %s"), llstr(rli->until_pos(), buf));
2360
pthread_mutex_unlock(&rli->data_lock);
2363
pthread_mutex_unlock(&rli->data_lock);
2365
/* Read queries from the IO/THREAD until this thread is killed */
2367
while (!sql_slave_killed(thd,rli))
2369
thd_proc_info(thd, _("Reading event from the relay log"));
2370
assert(rli->sql_thd == thd);
2371
THD_CHECK_SENTRY(thd);
2372
if (exec_relay_log_event(thd,rli))
2374
// do not scare the user if SQL thread was simply killed or stopped
2375
if (!sql_slave_killed(thd,rli))
2378
retrieve as much info as possible from the thd and, error
2379
codes and warnings and print this to the error log as to
2380
allow the user to locate the error
2382
uint32_t const last_errno= rli->last_error().number;
2384
if (thd->is_error())
2386
char const *const errmsg= thd->main_da.message();
2388
if (last_errno == 0)
2390
rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), errmsg);
2392
else if (last_errno != thd->main_da.sql_errno())
2394
sql_print_error(_("Slave (additional info): %s Error_code: %d"),
2395
errmsg, thd->main_da.sql_errno());
2399
/* Print any warnings issued */
2400
List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
2403
Added controlled slave thread cancel for replication
2404
of user-defined variables.
2406
bool udf_error = false;
2409
if (err->code == ER_CANT_OPEN_LIBRARY)
2411
sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
2414
sql_print_error(_("Error loading user-defined library, slave SQL "
2415
"thread aborted. Install the missing library, "
2416
"and restart the slave SQL thread with "
2417
"\"SLAVE START\". We stopped at log '%s' "
2419
RPL_LOG_NAME, llstr(rli->group_master_log_pos,
2422
sql_print_error(_("Error running query, slave SQL thread aborted. "
2423
"Fix the problem, and restart "
2424
"the slave SQL thread with \"SLAVE START\". "
2425
"We stopped at log '%s' position %s"),
2427
llstr(rli->group_master_log_pos, llbuff));
2433
/* Thread stopped. Print the current replication position to the log */
2434
sql_print_information(_("Slave SQL thread exiting, replication stopped in "
2435
"log '%s' at position %s"),
2437
llstr(rli->group_master_log_pos,llbuff));
2442
Some events set some playgrounds, which won't be cleared because thread
2443
stops. Stopping of this thread may not be known to these events ("stop"
2444
request is detected only by the present function, not by events), so we
2445
must "proactively" clear playgrounds:
2447
rli->cleanup_context(thd, 1);
2448
pthread_mutex_lock(&LOCK_thread_count);
2450
Some extra safety, which should not been needed (normally, event deletion
2451
should already have done these assignments (each event which sets these
2452
variables is supposed to set them to 0 before terminating)).
2454
thd->query= thd->db= thd->catalog= 0;
2455
thd->query_length= thd->db_length= 0;
2456
pthread_mutex_unlock(&LOCK_thread_count);
2457
thd_proc_info(thd, "Waiting for slave mutex on exit");
2458
pthread_mutex_lock(&rli->run_lock);
2459
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
2460
pthread_mutex_lock(&rli->data_lock);
2461
assert(rli->slave_running == 1); // tracking buffer overrun
2462
/* When master_pos_wait() wakes up it will check this and terminate */
2463
rli->slave_running= 0;
2464
/* Forget the relay log's format */
2465
delete rli->relay_log.description_event_for_exec;
2466
rli->relay_log.description_event_for_exec= 0;
2467
/* Wake up master_pos_wait() */
2468
pthread_mutex_unlock(&rli->data_lock);
2469
pthread_cond_broadcast(&rli->data_cond);
2470
rli->ignore_log_space_limit= 0; /* don't need any lock */
2471
/* we die so won't remember charset - re-update them on next thread start */
2472
rli->cached_charset_invalidate();
2473
rli->save_temporary_tables = thd->temporary_tables;
2476
TODO: see if we can do this conditionally in next_event() instead
2477
to avoid unneeded position re-init
2479
thd->temporary_tables = 0; // remove tempation from destructor to close them
2480
assert(thd->net.buff != 0);
2481
net_end(&thd->net); // destructor will not free it, because we are weird
2482
assert(rli->sql_thd == thd);
2483
THD_CHECK_SENTRY(thd);
2485
pthread_mutex_lock(&LOCK_thread_count);
2486
THD_CHECK_SENTRY(thd);
2488
pthread_mutex_unlock(&LOCK_thread_count);
2490
Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
2491
is important. Otherwise a killer_thread can execute between the calls and
2492
delete the mi structure leading to a crash! (see BUG#25306 for details)
2494
pthread_cond_broadcast(&rli->stop_cond);
2495
pthread_mutex_unlock(&rli->run_lock); // tell the world we are done
2499
return(0); // Can't return anything here
2504
process_io_create_file()
2507
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
2511
bool cev_not_written;
2512
THD *thd = mi->io_thd;
2513
NET *net = &mi->drizzle->net;
2515
if (unlikely(!cev->is_valid()))
2518
if (!rpl_filter->db_ok(cev->db))
2520
skip_load_data_infile(net);
2523
assert(cev->inited_from_old);
2524
thd->file_id = cev->file_id = mi->file_id++;
2525
thd->server_id = cev->server_id;
2526
cev_not_written = 1;
2528
if (unlikely(net_request_file(net,cev->fname)))
2530
sql_print_error(_("Slave I/O: failed requesting download of '%s'"),
2536
This dummy block is so we could instantiate Append_block_log_event
2537
once and then modify it slightly instead of doing it multiple times
2541
Append_block_log_event aev(thd,0,0,0,0);
2545
if (unlikely((num_bytes=my_net_read(net)) == packet_error))
2547
sql_print_error(_("Network read error downloading '%s' from master"),
2551
if (unlikely(!num_bytes)) /* eof */
2553
/* 3.23 master wants it */
2554
net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0);
2556
If we wrote Create_file_log_event, then we need to write
2557
Execute_load_log_event. If we did not write Create_file_log_event,
2558
then this is an empty file and we can just do as if the LOAD DATA
2559
INFILE had not existed, i.e. write nothing.
2561
if (unlikely(cev_not_written))
2563
Execute_load_log_event xev(thd,0,0);
2564
xev.log_pos = cev->log_pos;
2565
if (unlikely(mi->rli.relay_log.append(&xev)))
2567
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2568
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2569
_("error writing Exec_load event to relay log"));
2572
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2575
if (unlikely(cev_not_written))
2577
cev->block = net->read_pos;
2578
cev->block_len = num_bytes;
2579
if (unlikely(mi->rli.relay_log.append(cev)))
2581
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2582
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2583
_("error writing Create_file event to relay log"));
2587
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2591
aev.block = net->read_pos;
2592
aev.block_len = num_bytes;
2593
aev.log_pos = cev->log_pos;
2594
if (unlikely(mi->rli.relay_log.append(&aev)))
2596
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2597
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2598
_("error writing Append_block event to relay log"));
2601
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
2612
Start using a new binary log on the master
2616
mi master_info for the slave
2617
rev The rotate log event read from the binary log
2620
Updates the master info with the place in the next binary
2621
log where we should start reading.
2622
Rotate the relay log to avoid mixed-format relay logs.
2625
We assume we already locked mi->data_lock
2629
1 Log event is illegal
2633
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
2635
safe_mutex_assert_owner(&mi->data_lock);
2637
if (unlikely(!rev->is_valid()))
2640
/* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2641
memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
2642
mi->master_log_pos= rev->pos;
2644
If we do not do this, we will be getting the first
2645
rotate event forever, so we need to not disconnect after one.
2647
if (disconnect_slave_event_count)
2648
mi->events_till_disconnect++;
2651
If description_event_for_queue is format <4, there is conversion in the
2652
relay log to the slave's format (4). And Rotate can mean upgrade or
2653
nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
2654
no need to reset description_event_for_queue now. And if it's nothing (same
2655
master version as before), no need (still using the slave's format).
2657
if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
2659
delete mi->rli.relay_log.description_event_for_queue;
2660
/* start from format 3 (DRIZZLE 4.0) again */
2661
mi->rli.relay_log.description_event_for_queue= new
2662
Format_description_log_event(3);
2665
Rotate the relay log makes binlog format detection easier (at next slave
2666
start or mysqlbinlog)
2668
rotate_relay_log(mi); /* will take the right mutexes */
2673
Reads a 3.23 event and converts it to the slave's format. This code was
2674
copied from DRIZZLE 4.0.
2676
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
2679
const char *errmsg = 0;
2681
bool ignore_event= 0;
2683
Relay_log_info *rli= &mi->rli;
2686
If we get Load event, we need to pass a non-reusable buffer
2687
to read_log_event, so we do a trick
2689
if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
2691
if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
2693
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2694
ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
2697
memcpy(tmp_buf,buf,event_len);
2699
Create_file constructor wants a 0 as last char of buffer, this 0 will
2700
serve as the string-termination char for the file's name (which is at the
2702
We must increment event_len, otherwise the event constructor will not see
2703
this end 0, which leads to segfault.
2705
tmp_buf[event_len++]=0;
2706
int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
2707
buf = (const char*)tmp_buf;
2710
This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
2711
send the loaded file, and write it to the relay log in the form of
2712
Append_block/Exec_load (the SQL thread needs the data, as that thread is not
2713
connected to the master).
2715
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2716
mi->rli.relay_log.description_event_for_queue);
2719
sql_print_error(_("Read invalid event from master: '%s', "
2720
"master could be corrupt but a more likely cause "
2721
"of this is a bug"),
2723
free((char*) tmp_buf);
2727
pthread_mutex_lock(&mi->data_lock);
2728
ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
2729
switch (ev->get_type_code()) {
2735
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2738
pthread_mutex_unlock(&mi->data_lock);
2743
case CREATE_FILE_EVENT:
2745
Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
2746
queue_old_event() which is for 3.23 events which don't comprise
2747
CREATE_FILE_EVENT. This is because read_log_event() above has just
2748
transformed LOAD_EVENT into CREATE_FILE_EVENT.
2751
/* We come here when and only when tmp_buf != 0 */
2752
assert(tmp_buf != 0);
2754
ev->log_pos+= inc_pos;
2755
int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2757
mi->master_log_pos += inc_pos;
2758
pthread_mutex_unlock(&mi->data_lock);
2759
free((char*)tmp_buf);
2766
if (likely(!ignore_event))
2770
Don't do it for fake Rotate events (see comment in
2771
Log_event::Log_event(const char* buf...) in log_event.cc).
2773
ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
2774
if (unlikely(rli->relay_log.append(ev)))
2777
pthread_mutex_unlock(&mi->data_lock);
2780
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2783
mi->master_log_pos+= inc_pos;
2784
pthread_mutex_unlock(&mi->data_lock);
2789
Reads a 4.0 event and converts it to the slave's format. This code was copied
2790
from queue_binlog_ver_1_event(), with some affordable simplifications.
2792
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
2795
const char *errmsg = 0;
2798
Relay_log_info *rli= &mi->rli;
2800
/* read_log_event() will adjust log_pos to be end_log_pos */
2801
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2802
mi->rli.relay_log.description_event_for_queue);
2805
sql_print_error(_("Read invalid event from master: '%s', "
2806
"master could be corrupt but a more likely cause of "
2809
free((char*) tmp_buf);
2812
pthread_mutex_lock(&mi->data_lock);
2813
switch (ev->get_type_code()) {
2817
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2820
pthread_mutex_unlock(&mi->data_lock);
2829
if (unlikely(rli->relay_log.append(ev)))
2832
pthread_mutex_unlock(&mi->data_lock);
2835
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2837
mi->master_log_pos+= inc_pos;
2839
pthread_mutex_unlock(&mi->data_lock);
2846
Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
2847
(exactly, slave's) format. To do the conversion, we create a 5.0 event from
2848
the 3.23/4.0 bytes, then write this event to the relay log.
2851
Test this code before release - it has to be tested on a separate
2852
setup with 3.23 master or 4.0 master
2855
static int32_t queue_old_event(Master_info *mi, const char *buf,
2858
switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
2861
return(queue_binlog_ver_1_event(mi,buf,event_len));
2863
return(queue_binlog_ver_3_event(mi,buf,event_len));
2864
default: /* unsupported format; eg version 2 */
2872
If the event is 3.23/4.0, passes it to queue_old_event() which will convert
2873
it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
2874
no format conversion, it's pure read/write of bytes.
2875
So a 5.0.0 slave's relay log can contain events in the slave's format or in
2879
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
2883
uint32_t inc_pos= 0;
2884
Relay_log_info *rli= &mi->rli;
2885
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
2888
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
2889
buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
2890
return(queue_old_event(mi,buf,event_len));
2892
pthread_mutex_lock(&mi->data_lock);
2894
switch (buf[EVENT_TYPE_OFFSET]) {
2897
We needn't write this event to the relay log. Indeed, it just indicates a
2898
master server shutdown. The only thing this does is cleaning. But
2899
cleaning is already done on a per-master-thread basis (as the master
2900
server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
2901
prepared statements' deletion are TODO only when we binlog prep stmts).
2903
We don't even increment mi->master_log_pos, because we may be just after
2904
a Rotate event. Btw, in a few milliseconds we are going to have a Start
2905
event from the next binlog (unless the master is presently running
2911
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
2912
if (unlikely(process_io_rotate(mi,&rev)))
2914
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2918
Now the I/O thread has just changed its mi->master_log_name, so
2919
incrementing mi->master_log_pos is nonsense.
2924
case FORMAT_DESCRIPTION_EVENT:
2927
Create an event, and save it (when we rotate the relay log, we will have
2928
to write this event again).
2931
We are the only thread which reads/writes description_event_for_queue.
2932
The relay_log struct does not move (though some members of it can
2933
change), so we needn't any lock (no rli->data_lock, no log lock).
2935
Format_description_log_event* tmp;
2937
if (!(tmp= (Format_description_log_event*)
2938
Log_event::read_log_event(buf, event_len, &errmsg,
2939
mi->rli.relay_log.description_event_for_queue)))
2941
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2944
delete mi->rli.relay_log.description_event_for_queue;
2945
mi->rli.relay_log.description_event_for_queue= tmp;
2947
Though this does some conversion to the slave's format, this will
2948
preserve the master's binlog format version, and number of event types.
2951
If the event was not requested by the slave (the slave did not ask for
2952
it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
2954
inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
2958
case HEARTBEAT_LOG_EVENT:
2961
HB (heartbeat) cannot come before RL (Relay)
2964
Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
2967
error= ER_SLAVE_HEARTBEAT_FAILURE;
2968
error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2969
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2970
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2971
error_msg.append(STRING_WITH_LEN(" log_pos "));
2972
llstr(hb.log_pos, llbuf);
2973
error_msg.append(llbuf, strlen(llbuf));
2976
mi->received_heartbeats++;
2978
compare local and event's versions of log_file, log_pos.
2980
Heartbeat is sent only after an event corresponding to the corrdinates
2981
the heartbeat carries.
2982
Slave can not have a difference in coordinates except in the only
2983
special case when mi->master_log_name, master_log_pos have never
2984
been updated by Rotate event i.e when slave does not have any history
2985
with the master (and thereafter mi->master_log_pos is NULL).
2987
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2989
if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
2990
&& mi->master_log_name != NULL)
2991
|| mi->master_log_pos != hb.log_pos)
2993
/* missed events of heartbeat from the past */
2994
error= ER_SLAVE_HEARTBEAT_FAILURE;
2995
error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
2996
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2997
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2998
error_msg.append(STRING_WITH_LEN(" log_pos "));
2999
llstr(hb.log_pos, llbuf);
3000
error_msg.append(llbuf, strlen(llbuf));
3003
goto skip_relay_logging;
3013
If this event is originating from this server, don't queue it.
3014
We don't check this for 3.23 events because it's simpler like this; 3.23
3015
will be filtered anyway by the SQL slave thread which also tests the
3016
server id (we must also keep this test in the SQL thread, in case somebody
3017
upgrades a 4.0 slave which has a not-filtered relay log).
3019
ANY event coming from ourselves can be ignored: it is obvious for queries;
3020
for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
3021
(--log-slave-updates would not log that) unless this slave is also its
3022
direct master (an unsupported, useless setup!).
3025
pthread_mutex_lock(log_lock);
3027
if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
3028
!mi->rli.replicate_same_server_id)
3031
Do not write it to the relay log.
3032
a) We still want to increment mi->master_log_pos, so that we won't
3033
re-read this event from the master if the slave IO thread is now
3034
stopped/restarted (more efficient if the events we are ignoring are big
3036
b) We want to record that we are skipping events, for the information of
3037
the slave SQL thread, otherwise that thread may let
3038
rli->group_relay_log_pos stay too small if the last binlog's event is
3040
But events which were generated by this slave and which do not exist in
3041
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
3044
if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
3045
buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3046
buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3048
mi->master_log_pos+= inc_pos;
3049
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
3050
assert(rli->ign_master_log_name_end[0]);
3051
rli->ign_master_log_pos_end= mi->master_log_pos;
3053
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3057
/* write the event to the relay log */
3058
if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3060
mi->master_log_pos+= inc_pos;
3061
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3065
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
3067
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
3069
pthread_mutex_unlock(log_lock);
3074
pthread_mutex_unlock(&mi->data_lock);
3076
mi->report(ERROR_LEVEL, error, ER(error),
3077
(error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
3078
_("could not queue event from master") :
3084
void end_relay_log_info(Relay_log_info* rli)
3088
if (rli->info_fd >= 0)
3090
end_io_cache(&rli->info_file);
3091
(void) my_close(rli->info_fd, MYF(MY_WME));
3094
if (rli->cur_log_fd >= 0)
3096
end_io_cache(&rli->cache_buf);
3097
(void)my_close(rli->cur_log_fd, MYF(MY_WME));
3098
rli->cur_log_fd = -1;
3101
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
3102
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3104
Delete the slave's temporary tables from memory.
3105
In the future there will be other actions than this, to ensure persistance
3106
of slave's temp tables after shutdown.
3108
rli->close_temporary_tables();
3113
Try to connect until successful or slave killed
3117
thd Thread handler for slave
3118
DRIZZLE DRIZZLE connection handle
3119
mi Replication handle
3126
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi)
3128
return(connect_to_master(thd, drizzle, mi, 0, 0));
3137
Try to connect until successful or slave killed or we have retried
3138
master_retry_count times
3141
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3142
bool reconnect, bool suppress_warnings)
3144
int32_t slave_was_killed;
3145
int32_t last_errno= -2; // impossible error
3146
uint32_t err_count=0;
3149
mi->events_till_disconnect = disconnect_slave_event_count;
3150
uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
3151
if (opt_slave_compressed_protocol)
3152
client_flag=CLIENT_COMPRESS; /* We will use compression */
3154
drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3155
drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3157
while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3158
(reconnect ? drizzle_reconnect(drizzle) != 0 :
3159
drizzle_connect(drizzle, mi->host, mi->user, mi->password, 0,
3160
mi->port, 0, client_flag) == 0))
3162
/* Don't repeat last error */
3163
if ((int32_t)drizzle_errno(drizzle) != last_errno)
3165
last_errno=drizzle_errno(drizzle);
3166
suppress_warnings= 0;
3167
mi->report(ERROR_LEVEL, last_errno,
3168
_("error %s to master '%s@%s:%d'"
3169
" - retry-time: %d retries: %u"),
3170
(reconnect ? _("reconnecting") : _("connecting")),
3171
mi->user, mi->host, mi->port,
3172
mi->connect_retry, master_retry_count);
3175
By default we try forever. The reason is that failure will trigger
3176
master election, so if the user did not set master_retry_count we
3177
do not want to have election triggered on the first failure to
3180
if (++err_count == master_retry_count)
3184
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
3187
safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3191
if (!slave_was_killed)
3195
if (!suppress_warnings && global_system_variables.log_warnings)
3196
sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3197
"replication resumed in log '%s' at "
3198
"position %s"), mi->user,
3201
llstr(mi->master_log_pos,llbuff));
3205
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
3208
drizzle->reconnect= 1;
3209
return(slave_was_killed);
3217
Try to connect until successful or slave killed or we have retried
3218
master_retry_count times
3221
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3222
bool suppress_warnings)
3224
return(connect_to_master(thd, drizzle, mi, 1, suppress_warnings));
3229
Store the file and position where the execute-slave thread are in the
3233
flush_relay_log_info()
3234
rli Relay log information
3237
- As this is only called by the slave thread, we don't need to
3238
have a lock on this.
3239
- If there is an active transaction, then we don't update the position
3240
in the relay log. This is to ensure that we re-execute statements
3241
if we die in the middle of an transaction that was rolled back.
3242
- As a transaction never spans binary logs, we don't have to handle the
3243
case where we do a relay-log-rotation in the middle of the transaction.
3244
If this would not be the case, we would have to ensure that we
3245
don't delete the relay log file where the transaction started when
3246
we switch to a new relay log file.
3249
- Change the log file information to a binary format to avoid calling
3257
bool flush_relay_log_info(Relay_log_info* rli)
3261
if (unlikely(rli->no_storage))
3264
IO_CACHE *file = &rli->info_file;
3265
char buff[FN_REFLEN*2+22*2+4], *pos;
3267
my_b_seek(file, 0L);
3268
pos=my_stpcpy(buff, rli->group_relay_log_name);
3270
pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
3272
pos=my_stpcpy(pos, rli->group_master_log_name);
3274
pos=int64_t2str(rli->group_master_log_pos, pos, 10);
3276
if (my_b_write(file, (unsigned char*) buff, (size_t) (pos-buff)+1))
3278
if (flush_io_cache(file))
3281
/* Flushing the relay log is done by the slave I/O thread */
3287
Called when we notice that the current "hot" log got rotated under our feet.
3290
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
3292
assert(rli->cur_log != &rli->cache_buf);
3293
assert(rli->cur_log_fd == -1);
3295
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3296
if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
3300
We want to start exactly where we was before:
3301
relay_log_pos Current log pos
3302
pending Number of bytes already processed from the event
3304
rli->event_relay_log_pos= cmax(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3305
my_b_seek(cur_log,rli->event_relay_log_pos);
3310
static Log_event* next_event(Relay_log_info* rli)
3313
IO_CACHE* cur_log = rli->cur_log;
3314
pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3315
const char* errmsg=0;
3316
THD* thd = rli->sql_thd;
3320
if (abort_slave_event_count && !rli->events_till_abort--)
3324
For most operations we need to protect rli members with data_lock,
3325
so we assume calling function acquired this mutex for us and we will
3326
hold it for the most of the loop below However, we will release it
3327
whenever it is worth the hassle, and in the cases when we go into a
3328
pthread_cond_wait() with the non-data_lock mutex
3330
safe_mutex_assert_owner(&rli->data_lock);
3332
while (!sql_slave_killed(thd,rli))
3335
We can have two kinds of log reading:
3337
rli->cur_log points at the IO_CACHE of relay_log, which
3338
is actively being updated by the I/O thread. We need to be careful
3339
in this case and make sure that we are not looking at a stale log that
3340
has already been rotated. If it has been, we reopen the log.
3342
The other case is much simpler:
3343
We just have a read only log that nobody else will be updating.
3346
if ((hot_log = (cur_log != &rli->cache_buf)))
3348
assert(rli->cur_log_fd == -1); // foreign descriptor
3349
pthread_mutex_lock(log_lock);
3352
Reading xxx_file_id is safe because the log will only
3353
be rotated when we hold relay_log.LOCK_log
3355
if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
3357
// The master has switched to a new log file; Reopen the old log file
3358
cur_log=reopen_relay_log(rli, &errmsg);
3359
pthread_mutex_unlock(log_lock);
3360
if (!cur_log) // No more log files
3362
hot_log=0; // Using old binary log
3366
As there is no guarantee that the relay is open (for example, an I/O
3367
error during a write by the slave I/O thread may have closed it), we
3370
if (!my_b_inited(cur_log))
3372
assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3373
assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
3376
Relay log is always in new format - if the master is 3.23, the
3377
I/O thread will convert the format for us.
3378
A problem: the description event may be in a previous relay log. So if
3379
the slave has been shutdown meanwhile, we would have to look in old relay
3380
logs, which may even have been deleted. So we need to write this
3381
description event at the beginning of the relay log.
3382
When the relay log is created when the I/O thread starts, easy: the
3383
master will send the description event and we will queue it.
3384
But if the relay log is created by new_file(): then the solution is:
3385
DRIZZLE_BIN_LOG::open() will write the buffered description event.
3387
if ((ev=Log_event::read_log_event(cur_log,0,
3388
rli->relay_log.description_event_for_exec)))
3391
assert(thd==rli->sql_thd);
3393
read it while we have a lock, to avoid a mutex lock in
3394
inc_event_relay_log_pos()
3396
rli->future_event_relay_log_pos= my_b_tell(cur_log);
3398
pthread_mutex_unlock(log_lock);
3401
assert(thd==rli->sql_thd);
3402
if (opt_reckless_slave) // For mysql-test
3404
if (cur_log->error < 0)
3406
errmsg = "slave SQL thread aborted because of I/O error";
3408
pthread_mutex_unlock(log_lock);
3411
if (!cur_log->error) /* EOF */
3414
On a hot log, EOF means that there are no more updates to
3415
process and we must block until I/O thread adds some and
3416
signals us to continue
3421
We say in Seconds_Behind_Master that we have "caught up". Note that
3422
for example if network link is broken but I/O slave thread hasn't
3423
noticed it (slave_net_timeout not elapsed), then we'll say "caught
3424
up" whereas we're not really caught up. Fixing that would require
3425
internally cutting timeout in smaller pieces in network read, no
3426
thanks. Another example: SQL has caught up on I/O, now I/O has read
3427
a new event and is queuing it; the false "0" will exist until SQL
3428
finishes executing the new event; it will be look abnormal only if
3429
the events have old timestamps (then you get "many", 0, "many").
3431
Transient phases like this can be fixed with implemeting
3432
Heartbeat event which provides the slave the status of the
3433
master at time the master does not have any new update to send.
3434
Seconds_Behind_Master would be zero only when master has no
3435
more updates in binlog for slave. The heartbeat can be sent
3436
in a (small) fraction of slave_net_timeout. Until it's done
3437
rli->last_master_timestamp is temporarely (for time of
3438
waiting for the following event) reset whenever EOF is
3441
time_t save_timestamp= rli->last_master_timestamp;
3442
rli->last_master_timestamp= 0;
3444
assert(rli->relay_log.get_open_count() ==
3445
rli->cur_log_old_open_count);
3447
if (rli->ign_master_log_name_end[0])
3449
/* We generate and return a Rotate, to make our positions advance */
3450
ev= new Rotate_log_event(rli->ign_master_log_name_end,
3451
0, rli->ign_master_log_pos_end,
3452
Rotate_log_event::DUP_NAME);
3453
rli->ign_master_log_name_end[0]= 0;
3454
pthread_mutex_unlock(log_lock);
3457
errmsg= "Slave SQL thread failed to create a Rotate event "
3458
"(out of memory?), SHOW SLAVE STATUS may be inaccurate";
3461
ev->server_id= 0; // don't be ignored by slave SQL thread
3466
We can, and should release data_lock while we are waiting for
3467
update. If we do not, show slave status will block
3469
pthread_mutex_unlock(&rli->data_lock);
3473
- the I/O thread has reached log_space_limit
3474
- the SQL thread has read all relay logs, but cannot purge for some
3476
* it has already purged all logs except the current one
3477
* there are other logs than the current one but they're involved in
3478
a transaction that finishes in the current one (or is not finished)
3480
Wake up the possibly waiting I/O thread, and set a boolean asking
3481
the I/O thread to temporarily ignore the log_space_limit
3482
constraint, because we do not want the I/O thread to block because of
3483
space (it's ok if it blocks for any other reason (e.g. because the
3484
master does not send anything). Then the I/O thread stops waiting
3485
and reads more events.
3486
The SQL thread decides when the I/O thread should take log_space_limit
3487
into account again : ignore_log_space_limit is reset to 0
3488
in purge_first_log (when the SQL thread purges the just-read relay
3489
log), and also when the SQL thread starts. We should also reset
3490
ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
3491
fact, no need as RESET SLAVE requires that the slave
3492
be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
3495
pthread_mutex_lock(&rli->log_space_lock);
3496
// prevent the I/O thread from blocking next times
3497
rli->ignore_log_space_limit= 1;
3499
If the I/O thread is blocked, unblock it. Ok to broadcast
3500
after unlock, because the mutex is only destroyed in
3501
~Relay_log_info(), i.e. when rli is destroyed, and rli will
3502
not be destroyed before we exit the present function.
3504
pthread_mutex_unlock(&rli->log_space_lock);
3505
pthread_cond_broadcast(&rli->log_space_cond);
3506
// Note that wait_for_update_relay_log unlocks lock_log !
3507
rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
3508
// re-acquire data lock since we released it earlier
3509
pthread_mutex_lock(&rli->data_lock);
3510
rli->last_master_timestamp= save_timestamp;
3514
If the log was not hot, we need to move to the next log in
3515
sequence. The next log could be hot or cold, we deal with both
3516
cases separately after doing some common initialization
3518
end_io_cache(cur_log);
3519
assert(rli->cur_log_fd >= 0);
3520
my_close(rli->cur_log_fd, MYF(MY_WME));
3521
rli->cur_log_fd = -1;
3523
if (relay_log_purge)
3526
purge_first_log will properly set up relay log coordinates in rli.
3527
If the group's coordinates are equal to the event's coordinates
3528
(i.e. the relay log was not rotated in the middle of a group),
3529
we can purge this relay log too.
3530
We do uint64_t and string comparisons, this may be slow but
3531
- purging the last relay log is nice (it can save 1GB of disk), so we
3532
like to detect the case where we can do it, and given this,
3533
- I see no better detection method
3534
- purge_first_log is not called that often
3536
if (rli->relay_log.purge_first_log
3538
rli->group_relay_log_pos == rli->event_relay_log_pos
3539
&& !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
3541
errmsg = "Error purging processed logs";
3548
If hot_log is set, then we already have a lock on
3549
LOCK_log. If not, we have to get the lock.
3551
According to Sasha, the only time this code will ever be executed
3552
is if we are recovering from a bug.
3554
if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
3556
errmsg = "error switching to the next log";
3559
rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3560
strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
3561
sizeof(rli->event_relay_log_name)-1);
3562
flush_relay_log_info(rli);
3566
Now we want to open this next log. To know if it's a hot log (the one
3567
being written by the I/O thread now) or a cold log, we can use
3568
is_active(); if it is hot, we use the I/O cache; if it's cold we open
3569
the file normally. But if is_active() reports that the log is hot, this
3570
may change between the test and the consequence of the test. So we may
3571
open the I/O cache whereas the log is now cold, which is nonsense.
3572
To guard against this, we need to have LOCK_log.
3575
if (!hot_log) /* if hot_log, we already have this mutex */
3576
pthread_mutex_lock(log_lock);
3577
if (rli->relay_log.is_active(rli->linfo.log_file_name))
3580
if (global_system_variables.log_warnings)
3581
sql_print_information(_("next log '%s' is currently active"),
3582
rli->linfo.log_file_name);
3584
rli->cur_log= cur_log= rli->relay_log.get_log_file();
3585
rli->cur_log_old_open_count= rli->relay_log.get_open_count();
3586
assert(rli->cur_log_fd == -1);
3589
Read pointer has to be at the start since we are the only
3591
We must keep the LOCK_log to read the 4 first bytes, as this is a hot
3592
log (same as when we call read_log_event() above: for a hot log we
3595
if (check_binlog_magic(cur_log,&errmsg))
3597
if (!hot_log) pthread_mutex_unlock(log_lock);
3600
if (!hot_log) pthread_mutex_unlock(log_lock);
3603
if (!hot_log) pthread_mutex_unlock(log_lock);
3605
if we get here, the log was not hot, so we will have to open it
3606
ourselves. We are sure that the log is still not hot now (a log can get
3607
from hot to cold, but not from cold to hot). No need for LOCK_log.
3610
if (global_system_variables.log_warnings)
3611
sql_print_information(_("next log '%s' is not active"),
3612
rli->linfo.log_file_name);
3614
// open_binlog() will check the magic header
3615
if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
3622
Read failed with a non-EOF error.
3623
TODO: come up with something better to handle this error
3626
pthread_mutex_unlock(log_lock);
3627
sql_print_error(_("Slave SQL thread: I/O error reading "
3628
"event(errno: %d cur_log->error: %d)"),
3629
my_errno,cur_log->error);
3630
// set read position to the beginning of the event
3631
my_b_seek(cur_log,rli->event_relay_log_pos);
3632
/* otherwise, we have had a partial read */
3633
errmsg = _("Aborting slave SQL thread because of partial event read");
3634
break; // To end of function
3637
if (!errmsg && global_system_variables.log_warnings)
3639
sql_print_information(_("Error reading relay log event: %s"),
3640
_("slave SQL thread was killed"));
3646
sql_print_error(_("Error reading relay log event: %s"), errmsg);
3651
Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
3652
because of size is simpler because when we do it we already have all relevant
3653
locks; here we don't, so this function is mainly taking locks).
3654
Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
3658
void rotate_relay_log(Master_info* mi)
3660
Relay_log_info* rli= &mi->rli;
3662
/* We don't lock rli->run_lock. This would lead to deadlocks. */
3663
pthread_mutex_lock(&mi->run_lock);
3666
We need to test inited because otherwise, new_file() will attempt to lock
3667
LOCK_log, which may not be inited (if we're not a slave).
3674
/* If the relay log is closed, new_file() will do nothing. */
3675
rli->relay_log.new_file();
3678
We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
3679
be counted, so imagine a succession of FLUSH LOGS and assume the slave
3680
threads are started:
3681
relay_log_space decreases by the size of the deleted relay log, but does
3682
not increase, so flush-after-flush we may become negative, which is wrong.
3683
Even if this will be corrected as soon as a query is replicated on the
3684
slave (because the I/O thread will then call harvest_bytes_written() which
3685
will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
3686
output in SHOW SLAVE STATUS meanwhile. So we harvest now.
3687
If the log is closed, then this will just harvest the last writes, probably
3688
0 as they probably have been harvested.
3690
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3692
pthread_mutex_unlock(&mi->run_lock);
3698
Detects, based on master's version (as found in the relay log), if master
3700
@param rli Relay_log_info which tells the master's version
3701
@param bug_id Number of the bug as found in bugs.mysql.com
3702
@param report bool report error message, default TRUE
3703
@return true if master has the bug, FALSE if it does not.
3705
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
3707
struct st_version_range_for_one_bug {
3709
const unsigned char introduced_in[3]; // first version with bug
3710
const unsigned char fixed_in[3]; // first version with fix
3712
static struct st_version_range_for_one_bug versions_for_all_bugs[]=
3714
{24432, { 5, 0, 24 }, { 5, 0, 38 } },
3715
{24432, { 5, 1, 12 }, { 5, 1, 17 } },
3716
{33029, { 5, 0, 0 }, { 5, 0, 58 } },
3717
{33029, { 5, 1, 0 }, { 5, 1, 12 } },
3719
const unsigned char *master_ver=
3720
rli->relay_log.description_event_for_exec->server_version_split;
3722
assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3725
i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3727
const unsigned char *introduced_in= versions_for_all_bugs[i].introduced_in,
3728
*fixed_in= versions_for_all_bugs[i].fixed_in;
3729
if ((versions_for_all_bugs[i].bug_id == bug_id) &&
3730
(memcmp(introduced_in, master_ver, 3) <= 0) &&
3731
(memcmp(fixed_in, master_ver, 3) > 0))
3736
// a short message for SHOW SLAVE STATUS (message length constraints)
3737
my_printf_error(ER_UNKNOWN_ERROR,
3738
_("master may suffer from"
3739
" http://bugs.mysql.com/bug.php?id=%u"
3740
" so slave stops; check error log on slave"
3741
" for more info"), MYF(0), bug_id);
3742
// a verbose message for the error log
3743
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
3744
_("According to the master's version ('%s'),"
3745
" it is probable that master suffers from this bug:"
3746
" http://bugs.mysql.com/bug.php?id=%u"
3747
" and thus replicating the current binary log event"
3748
" may make the slave's data become different from the"
3750
" To take no risk, slave refuses to replicate"
3751
" this event and stops."
3752
" We recommend that all updates be stopped on the"
3753
" master and slave, that the data of both be"
3754
" manually synchronized,"
3755
" that master's binary logs be deleted,"
3756
" that master be upgraded to a version at least"
3757
" equal to '%d.%d.%d'. Then replication can be"
3759
rli->relay_log.description_event_for_exec->server_version,
3761
fixed_in[0], fixed_in[1], fixed_in[2]);
3769
BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
3770
exclusive, if one statement in a SP generated AUTO_INCREMENT value
3771
by the top statement, all statements after it would be considered
3772
generated AUTO_INCREMENT value by the top statement, and a
3773
erroneous INSERT_ID value might be associated with these statement,
3774
which could cause duplicate entry error and stop the slave.
3776
Detect buggy master to work around.
3778
bool rpl_master_erroneous_autoinc(THD *thd)
3780
if (active_mi && active_mi->rli.sql_thd == thd)
3782
Relay_log_info *rli= &active_mi->rli;
3783
return rpl_master_has_bug(rli, 33029, false);
3788
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
3789
template class I_List_iterator<i_string>;
3790
template class I_List_iterator<i_string_pair>;
3794
@} (end of group Replication)