1
/* Copyright (C) 2000-2003 DRIZZLE AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
@addtogroup Replication
23
@brief Code to run the io thread and the sql thread on the
26
#include <drizzled/server_includes.h>
28
#include <storage/myisam/myisam.h>
32
#include "rpl_filter.h"
33
#include "repl_failsafe.h"
34
#include <mysys/thr_alarm.h>
35
#include <libdrizzle/sql_common.h>
36
#include <libdrizzle/errmsg.h>
37
#include <mysys/mysys_err.h>
38
#include <drizzled/drizzled_error_messages.h>
40
#ifdef HAVE_REPLICATION
42
#include "rpl_tblmap.h"
44
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
46
#define MAX_SLAVE_RETRY_PAUSE 5
47
bool use_slave_mask = 0;
48
MY_BITMAP slave_error_mask;
50
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
52
char* slave_load_tmpdir = 0;
53
Master_info *active_mi= 0;
54
bool replicate_same_server_id;
55
uint64_t relay_log_space_limit = 0;
58
When slave thread exits, we need to remember the temporary tables so we
59
can re-use them on slave start.
61
TODO: move the vars below under Master_info
64
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
65
int32_t events_till_abort = -1;
67
enum enum_slave_reconnect_actions
69
SLAVE_RECON_ACT_REG= 0,
70
SLAVE_RECON_ACT_DUMP= 1,
71
SLAVE_RECON_ACT_EVENT= 2,
75
enum enum_slave_reconnect_messages
77
SLAVE_RECON_MSG_WAIT= 0,
78
SLAVE_RECON_MSG_KILLED_WAITING= 1,
79
SLAVE_RECON_MSG_AFTER= 2,
80
SLAVE_RECON_MSG_FAILED= 3,
81
SLAVE_RECON_MSG_COMMAND= 4,
82
SLAVE_RECON_MSG_KILLED_AFTER= 5,
86
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
89
N_("Waiting to reconnect after a failed registration on master"),
90
N_("Slave I/O thread killed while waitnig to reconnect after a "
91
"failed registration on master"),
92
N_("Reconnecting after a failed registration on master"),
93
N_("failed registering on master, reconnecting to try again, "
94
"log '%s' at postion %s"),
96
N_("Slave I/O thread killed during or after reconnect")
99
N_("Waiting to reconnect after a failed binlog dump request"),
100
N_("Slave I/O thread killed while retrying master dump"),
101
N_("Reconnecting after a failed binlog dump request"),
102
N_("failed dump request, reconnecting to try again, "
103
"log '%s' at postion %s"),
105
N_("Slave I/O thread killed during or after reconnect")
108
N_("Waiting to reconnect after a failed master event read"),
109
N_("Slave I/O thread killed while waiting to reconnect "
110
"after a failed read"),
111
N_("Reconnecting after a failed master event read"),
112
N_("Slave I/O thread: Failed reading log event, "
113
"reconnecting to retry, log '%s' at postion %s"),
115
N_("Slave I/O thread killed during or after a "
116
"reconnect done to recover from failed read")
121
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
123
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
124
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
125
static bool wait_for_relay_log_space(Relay_log_info* rli);
126
static inline bool io_slave_killed(THD* thd,Master_info* mi);
127
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
128
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
129
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi);
130
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
131
bool suppress_warnings);
132
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
133
bool reconnect, bool suppress_warnings);
134
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
135
void* thread_killed_arg);
136
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
137
static Log_event* next_event(Relay_log_info* rli);
138
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
139
static int32_t terminate_slave_thread(THD *thd,
140
pthread_mutex_t* term_lock,
141
pthread_cond_t* term_cond,
142
volatile uint32_t *slave_running,
144
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
147
Find out which replications threads are running
151
mask Return value here
152
mi master_info for slave
153
inverse If set, returns which threads are not running
156
Get a bit mask for which threads are running so that we can later restart
160
mask If inverse == 0, running threads
161
If inverse == 1, stopped threads
164
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
166
bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
167
register int32_t tmp_mask=0;
170
tmp_mask |= SLAVE_IO;
172
tmp_mask |= SLAVE_SQL;
174
tmp_mask^= (SLAVE_IO | SLAVE_SQL);
184
void lock_slave_threads(Master_info* mi)
186
//TODO: see if we can do this without dual mutex
187
pthread_mutex_lock(&mi->run_lock);
188
pthread_mutex_lock(&mi->rli.run_lock);
194
unlock_slave_threads()
197
void unlock_slave_threads(Master_info* mi)
199
//TODO: see if we can do this without dual mutex
200
pthread_mutex_unlock(&mi->rli.run_lock);
201
pthread_mutex_unlock(&mi->run_lock);
206
/* Initialize slave structures */
211
This is called when mysqld starts. Before client connections are
212
accepted. However bootstrap may conflict with us if it does START SLAVE.
213
So it's safer to take the lock.
215
pthread_mutex_lock(&LOCK_active_mi);
217
TODO: re-write this to interate through the list of files
220
active_mi= new Master_info;
223
If master_host is not specified, try to read it from the master_info file.
224
If master_host is specified, create the master_info file if it doesn't
229
sql_print_error(_("Failed to allocate memory for the master info structure"));
233
if (init_master_info(active_mi,master_info_file,relay_log_info_file,
234
1, (SLAVE_IO | SLAVE_SQL)))
236
sql_print_error(_("Failed to initialize the master info structure"));
240
/* If server id is not set, start_slave_thread() will say it */
242
if (active_mi->host[0] && !opt_skip_slave_start)
244
if (start_slave_threads(1 /* need mutex */,
245
0 /* no wait for start*/,
249
SLAVE_IO | SLAVE_SQL))
251
sql_print_error(_("Failed to create slave threads"));
255
pthread_mutex_unlock(&LOCK_active_mi);
259
pthread_mutex_unlock(&LOCK_active_mi);
265
Init function to set up array for errors that should be skipped for slave
268
init_slave_skip_errors()
269
arg List of errors numbers to skip, separated with ','
272
Called from get_options() in mysqld.cc on start-up
275
void init_slave_skip_errors(const char* arg)
279
if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
281
fprintf(stderr, "Badly out of memory, please check your system status\n");
285
for (;my_isspace(system_charset_info,*arg);++arg)
287
if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
289
bitmap_set_all(&slave_error_mask);
295
if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
297
if (err_code < MAX_SLAVE_ERROR)
298
bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
299
while (!my_isdigit(system_charset_info,*p) && *p)
306
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
309
return(0); /* successfully do nothing */
310
int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
311
pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
313
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
316
if ((error=terminate_slave_thread(mi->io_thd,io_lock,
323
if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
325
mi->rli.abort_slave=1;
326
if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
328
&mi->rli.slave_running,
338
Wait for a slave thread to terminate.
340
This function is called after requesting the thread to terminate
341
(by setting @c abort_slave member of @c Relay_log_info or @c
342
Master_info structure to 1). Termination of the thread is
343
controlled with the the predicate <code>*slave_running</code>.
345
Function will acquire @c term_lock before waiting on the condition
346
unless @c skip_lock is true in which case the mutex should be owned
347
by the caller of this function and will remain acquired after
348
return from the function.
351
Associated lock to use when waiting for @c term_cond
354
Condition that is signalled when the thread has terminated
357
Pointer to predicate to check for slave thread termination
360
If @c true the lock will not be acquired before waiting on
361
the condition. In this case, it is assumed that the calling
362
function acquires the lock before calling this function.
367
terminate_slave_thread(THD *thd,
368
pthread_mutex_t* term_lock,
369
pthread_cond_t* term_cond,
370
volatile uint32_t *slave_running,
376
pthread_mutex_lock(term_lock);
378
safe_mutex_assert_owner(term_lock);
383
pthread_mutex_unlock(term_lock);
384
return(ER_SLAVE_NOT_RUNNING);
387
THD_CHECK_SENTRY(thd);
390
Is is critical to test if the slave is running. Otherwise, we might
391
be referening freed memory trying to kick it
394
while (*slave_running) // Should always be true
396
pthread_mutex_lock(&thd->LOCK_delete);
397
#ifndef DONT_USE_THR_ALARM
399
Error codes from pthread_kill are:
400
EINVAL: invalid signal number (can't happen)
401
ESRCH: thread already killed (can happen, should be ignored)
403
int32_t err= pthread_kill(thd->real_id, thr_client_alarm);
404
assert(err != EINVAL);
406
thd->awake(THD::NOT_KILLED);
407
pthread_mutex_unlock(&thd->LOCK_delete);
410
There is a small chance that slave thread might miss the first
411
alarm. To protect againts it, resend the signal until it reacts
413
struct timespec abstime;
414
set_timespec(abstime,2);
415
error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
416
assert(error == ETIMEDOUT || error == 0);
419
assert(*slave_running == 0);
422
pthread_mutex_unlock(term_lock);
427
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
428
pthread_mutex_t *cond_lock,
429
pthread_cond_t *start_cond,
430
volatile uint32_t *slave_running,
431
volatile uint32_t *slave_run_id,
441
pthread_mutex_lock(start_lock);
445
pthread_cond_broadcast(start_cond);
447
pthread_mutex_unlock(start_lock);
448
sql_print_error(_("Server id not set, will not start slave"));
449
return(ER_BAD_SLAVE);
455
pthread_cond_broadcast(start_cond);
457
pthread_mutex_unlock(start_lock);
458
return(ER_SLAVE_MUST_STOP);
460
start_id= *slave_run_id;
463
struct sched_param tmp_sched_param;
465
memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
466
tmp_sched_param.sched_priority= CONNECT_PRIOR;
467
(void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
469
if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
472
pthread_mutex_unlock(start_lock);
473
return(ER_SLAVE_THREAD);
475
if (start_cond && cond_lock) // caller has cond_lock
477
THD* thd = current_thd;
478
while (start_id == *slave_run_id)
480
const char* old_msg = thd->enter_cond(start_cond,cond_lock,
481
"Waiting for slave thread to start");
482
pthread_cond_wait(start_cond,cond_lock);
483
thd->exit_cond(old_msg);
484
pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
486
return(thd->killed_errno());
490
pthread_mutex_unlock(start_lock);
496
start_slave_threads()
499
SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
500
sense to do that for starting a slave--we always care if it actually
501
started the threads that were not previously running
504
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
506
const char* master_info_fname __attribute__((unused)),
507
const char* slave_info_fname __attribute__((unused)),
510
pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
511
pthread_cond_t* cond_io=0,*cond_sql=0;
514
if (need_slave_mutex)
516
lock_io = &mi->run_lock;
517
lock_sql = &mi->rli.run_lock;
521
cond_io = &mi->start_cond;
522
cond_sql = &mi->rli.start_cond;
523
lock_cond_io = &mi->run_lock;
524
lock_cond_sql = &mi->rli.run_lock;
527
if (thread_mask & SLAVE_IO)
528
error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
530
&mi->slave_running, &mi->slave_run_id,
531
mi, 1); //high priority, to read the most possible
532
if (!error && (thread_mask & SLAVE_SQL))
534
error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
536
&mi->rli.slave_running, &mi->rli.slave_run_id,
539
terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
546
static int32_t end_slave_on_walk(Master_info* mi, uchar* /*unused*/)
555
Free all resources used by slave
564
This is called when the server terminates, in close_connections().
565
It terminates slave threads. However, some CHANGE MASTER etc may still be
566
running presently. If a START SLAVE was in progress, the mutex lock below
567
will make us wait until slave threads have started, and START SLAVE
568
returns, then we terminate them here.
570
pthread_mutex_lock(&LOCK_active_mi);
574
TODO: replace the line below with
575
list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
576
once multi-master code is ready.
578
terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
579
end_master_info(active_mi);
583
pthread_mutex_unlock(&LOCK_active_mi);
588
static bool io_slave_killed(THD* thd, Master_info* mi)
590
assert(mi->io_thd == thd);
591
assert(mi->slave_running); // tracking buffer overrun
592
return(mi->abort_slave || abort_loop || thd->killed);
596
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
598
assert(rli->sql_thd == thd);
599
assert(rli->slave_running == 1);// tracking buffer overrun
600
if (abort_loop || thd->killed || rli->abort_slave)
603
If we are in an unsafe situation (stopping could corrupt replication),
604
we give one minute to the slave SQL thread of grace before really
605
terminating, in the hope that it will be able to read more events and
606
the unsafe situation will soon be left. Note that this one minute starts
607
from the last time anything happened in the slave SQL thread. So it's
608
really one minute of idleness, we don't timeout if the slave SQL thread
611
if (rli->last_event_start_time == 0)
613
if (difftime(time(0), rli->last_event_start_time) > 60)
615
rli->report(ERROR_LEVEL, 0,
616
_("SQL thread had to stop in an unsafe situation, in "
617
"the middle of applying updates to a "
618
"non-transactional table without any primary key. "
619
"There is a risk of duplicate updates when the slave "
620
"SQL thread is restarted. Please check your tables' "
621
"contents after restart."));
630
skip_load_data_infile()
633
This is used to tell a 3.23 master to break send_file()
636
void skip_load_data_infile(NET *net)
638
(void)net_request_file(net, "/dev/null");
639
(void)my_net_read(net); // discard response
640
(void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok
645
bool net_request_file(NET* net, const char* fname)
647
return(net_write_command(net, 251, (uchar*) fname, strlen(fname),
652
From other comments and tests in code, it looks like
653
sometimes Query_log_event and Load_log_event can have db == 0
654
(see rewrite_db() above for example)
655
(cases where this happens are unclear; it may be when the master is 3.23).
658
const char *print_slave_db_safe(const char* db)
660
return((db ? db : ""));
663
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
664
const char *default_val)
668
if ((length=my_b_gets(f,var, max_size)))
670
char* last_p = var + length -1;
672
*last_p = 0; // if we stopped on newline, kill it
676
If we truncated a line or stopped on last char, remove all chars
677
up to and including newline.
680
while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
684
else if (default_val)
686
strmake(var, default_val, max_size-1);
693
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
698
if (my_b_gets(f, buf, sizeof(buf)))
703
else if (default_val)
711
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
716
if (my_b_gets(f, buf, sizeof(buf)))
718
if (sscanf(buf, "%f", var) != 1)
723
else if (default_val != 0.0)
731
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
733
if (io_slave_killed(thd, mi))
735
if (info && global_system_variables.log_warnings)
736
sql_print_information(info);
744
Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
745
relying on the binlog's version. This is not perfect: imagine an upgrade
746
of the master without waiting that all slaves are in sync with the master;
747
then a slave could be fooled about the binlog's format. This is what happens
748
when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
749
slaves are fooled. So we do this only to distinguish between 3.23 and more
750
recent masters (it's too late to change things for 3.23).
757
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
760
String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
761
char err_buff[MAX_SLAVE_ERRMSG];
762
const char* errmsg= 0;
764
DRIZZLE_RES *master_res= 0;
765
DRIZZLE_ROW master_row;
769
Free old description_event_for_queue (that is needed if we are in
772
delete mi->rli.relay_log.description_event_for_queue;
773
mi->rli.relay_log.description_event_for_queue= 0;
775
if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
777
errmsg = _("Master reported unrecognized DRIZZLE version");
778
err_code= ER_SLAVE_FATAL_ERROR;
779
sprintf(err_buff, ER(err_code), errmsg);
780
err_msg.append(err_buff);
785
Note the following switch will bug when we have DRIZZLE branch 30 ;)
787
switch (*drizzle->server_version)
792
errmsg = _("Master reported unrecognized DRIZZLE version");
793
err_code= ER_SLAVE_FATAL_ERROR;
794
sprintf(err_buff, ER(err_code), errmsg);
795
err_msg.append(err_buff);
798
mi->rli.relay_log.description_event_for_queue= new
799
Format_description_log_event(1, drizzle->server_version);
802
mi->rli.relay_log.description_event_for_queue= new
803
Format_description_log_event(3, drizzle->server_version);
807
Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
808
take the early steps (like tests for "is this a 3.23 master") which we
809
have to take before we receive the real master's Format_desc which will
810
override this one. Note that the Format_desc we create below is garbage
811
(it has the format of the *slave*); it's only good to help know if the
812
master is 3.23, 4.0, etc.
814
mi->rli.relay_log.description_event_for_queue= new
815
Format_description_log_event(4, drizzle->server_version);
821
This does not mean that a 5.0 slave will be able to read a 6.0 master; but
822
as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
823
can't read a 6.0 master, this will show up when the slave can't read some
824
events sent by the master, and there will be error messages.
827
if (err_msg.length() != 0)
830
/* as we are here, we tried to allocate the event */
831
if (!mi->rli.relay_log.description_event_for_queue)
833
errmsg= _("default Format_description_log_event");
834
err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
835
sprintf(err_buff, ER(err_code), errmsg);
836
err_msg.append(err_buff);
841
Compare the master and slave's clock. Do not die if master's clock is
842
unavailable (very old master not supporting UNIX_TIMESTAMP()?).
845
if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
846
(master_res= drizzle_store_result(drizzle)) &&
847
(master_row= drizzle_fetch_row(master_res)))
849
mi->clock_diff_with_master=
850
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
852
else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
854
mi->clock_diff_with_master= 0; /* The "most sensible" value */
855
sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
856
"do not trust column Seconds_Behind_Master of SHOW "
857
"SLAVE STATUS. Error: %s (%d)"),
858
drizzle_error(drizzle), drizzle_errno(drizzle));
861
drizzle_free_result(master_res);
864
Check that the master's server id and ours are different. Because if they
865
are equal (which can result from a simple copy of master's datadir to slave,
866
thus copying some my.cnf), replication will work but all events will be
868
Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
870
Note: we could have put a @@SERVER_ID in the previous SELECT
871
UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
873
if (!drizzle_real_query(drizzle,
874
STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
875
(master_res= drizzle_store_result(drizzle)))
877
if ((master_row= drizzle_fetch_row(master_res)) &&
878
(::server_id == strtoul(master_row[1], 0, 10)) &&
879
!mi->rli.replicate_same_server_id)
882
_("The slave I/O thread stops because master and slave have equal "
883
"DRIZZLE server ids; these ids must be different "
884
"for replication to work (or "
885
"the --replicate-same-server-id option must be used "
886
"on slave but this does"
887
"not always make sense; please check the manual before using it).");
888
err_code= ER_SLAVE_FATAL_ERROR;
889
sprintf(err_buff, ER(err_code), errmsg);
890
err_msg.append(err_buff);
892
drizzle_free_result(master_res);
898
Check that the master's global character_set_server and ours are the same.
899
Not fatal if query fails (old master?).
900
Note that we don't check for equality of global character_set_client and
901
collation_connection (neither do we prevent their setting in
902
set_var.cc). That's because from what I (Guilhem) have tested, the global
903
values of these 2 are never used (new connections don't use them).
904
We don't test equality of global collation_database either as it's is
905
going to be deprecated (made read-only) in 4.1 very soon.
906
The test is only relevant if master < 5.0.3 (we'll test only if it's older
907
than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
908
charset info in each binlog event.
909
We don't do it for 3.23 because masters <3.23.50 hang on
910
SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
911
test only if master is 4.x.
914
/* redundant with rest of code but safer against later additions */
915
if (*drizzle->server_version == '3')
918
if ((*drizzle->server_version == '4') &&
919
!drizzle_real_query(drizzle,
920
STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
921
(master_res= drizzle_store_result(drizzle)))
923
if ((master_row= drizzle_fetch_row(master_res)) &&
924
strcmp(master_row[0], global_system_variables.collation_server->name))
927
_("The slave I/O thread stops because master and slave have"
928
" different values for the COLLATION_SERVER global variable."
929
" The values must be equal for replication to work");
930
err_code= ER_SLAVE_FATAL_ERROR;
931
sprintf(err_buff, ER(err_code), errmsg);
932
err_msg.append(err_buff);
934
drizzle_free_result(master_res);
940
Perform analogous check for time zone. Theoretically we also should
941
perform check here to verify that SYSTEM time zones are the same on
942
slave and master, but we can't rely on value of @@system_time_zone
943
variable (it is time zone abbreviation) since it determined at start
944
time and so could differ for slave and master even if they are really
945
in the same system time zone. So we are omiting this check and just
946
relying on documentation. Also according to Monty there are many users
947
who are using replication between servers in various time zones. Hence
948
such check will broke everything for them. (And now everything will
949
work for them because by default both their master and slave will have
951
This check is only necessary for 4.x masters (and < 5.0.4 masters but
954
if ((*drizzle->server_version == '4') &&
955
!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
956
(master_res= drizzle_store_result(drizzle)))
958
if ((master_row= drizzle_fetch_row(master_res)) &&
959
strcmp(master_row[0],
960
global_system_variables.time_zone->get_name()->ptr()))
963
_("The slave I/O thread stops because master and slave have"
964
" different values for the TIME_ZONE global variable."
965
" The values must be equal for replication to work");
966
err_code= ER_SLAVE_FATAL_ERROR;
967
sprintf(err_buff, ER(err_code), errmsg);
968
err_msg.append(err_buff);
970
drizzle_free_result(master_res);
976
if (mi->heartbeat_period != 0.0)
979
const char query_format[]= "SET @master_heartbeat_period= %s";
980
char query[sizeof(query_format) - 2 + sizeof(llbuf)];
982
the period is an uint64_t of nano-secs.
984
llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
985
sprintf(query, query_format, llbuf);
987
if (drizzle_real_query(drizzle, query, strlen(query))
988
&& !check_io_slave_killed(mi->io_thd, mi, NULL))
990
err_msg.append("The slave I/O thread stops because querying master with '");
991
err_msg.append(query);
992
err_msg.append("' failed;");
993
err_msg.append(" error: ");
994
err_code= drizzle_errno(drizzle);
995
err_msg.qs_append(err_code);
996
err_msg.append(" '");
997
err_msg.append(drizzle_error(drizzle));
999
drizzle_free_result(drizzle_store_result(drizzle));
1002
drizzle_free_result(drizzle_store_result(drizzle));
1006
if (err_msg.length() != 0)
1008
sql_print_error(err_msg.ptr());
1009
assert(err_code != 0);
1010
mi->report(ERROR_LEVEL, err_code, err_msg.ptr());
1018
static bool wait_for_relay_log_space(Relay_log_info* rli)
1020
bool slave_killed=0;
1021
Master_info* mi = rli->mi;
1022
const char *save_proc_info;
1023
THD* thd = mi->io_thd;
1025
pthread_mutex_lock(&rli->log_space_lock);
1026
save_proc_info= thd->enter_cond(&rli->log_space_cond,
1027
&rli->log_space_lock,
1028
_("Waiting for the slave SQL thread "
1029
"to free enough relay log space"));
1030
while (rli->log_space_limit < rli->log_space_total &&
1031
!(slave_killed=io_slave_killed(thd,mi)) &&
1032
!rli->ignore_log_space_limit)
1033
pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1034
thd->exit_cond(save_proc_info);
1035
return(slave_killed);
1040
Builds a Rotate from the ignored events' info and writes it to relay log.
1043
write_ignored_events_info_to_relay_log()
1044
thd pointer to I/O thread's thd
1048
Slave I/O thread, going to die, must leave a durable trace of the
1049
ignored events' end position for the use of the slave SQL thread, by
1050
calling this function. Only that thread can call it (see assertion).
1052
static void write_ignored_events_info_to_relay_log(THD *thd __attribute__((unused)),
1055
Relay_log_info *rli= &mi->rli;
1056
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1058
assert(thd == mi->io_thd);
1059
pthread_mutex_lock(log_lock);
1060
if (rli->ign_master_log_name_end[0])
1062
Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1063
0, rli->ign_master_log_pos_end,
1064
Rotate_log_event::DUP_NAME);
1065
rli->ign_master_log_name_end[0]= 0;
1066
/* can unlock before writing as slave SQL thd will soon see our Rotate */
1067
pthread_mutex_unlock(log_lock);
1068
if (likely((bool)ev))
1070
ev->server_id= 0; // don't be ignored by slave SQL thread
1071
if (unlikely(rli->relay_log.append(ev)))
1072
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
1073
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
1074
_("failed to write a Rotate event"
1075
" to the relay log, SHOW SLAVE STATUS may be"
1077
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1078
if (flush_master_info(mi, 1))
1079
sql_print_error(_("Failed to flush master info file"));
1083
mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
1084
ER(ER_SLAVE_CREATE_EVENT_FAILURE),
1085
_("Rotate_event (out of memory?),"
1086
" SHOW SLAVE STATUS may be inaccurate"));
1089
pthread_mutex_unlock(log_lock);
1094
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
1095
bool *suppress_warnings)
1097
uchar buf[1024], *pos= buf;
1098
uint32_t report_host_len, report_user_len=0, report_password_len=0;
1100
*suppress_warnings= false;
1103
report_host_len= strlen(report_host);
1105
report_user_len= strlen(report_user);
1106
if (report_password)
1107
report_password_len= strlen(report_password);
1108
/* 30 is a good safety margin */
1109
if (report_host_len + report_user_len + report_password_len + 30 >
1111
return(0); // safety
1113
int4store(pos, server_id); pos+= 4;
1114
pos= net_store_data(pos, (uchar*) report_host, report_host_len);
1115
pos= net_store_data(pos, (uchar*) report_user, report_user_len);
1116
pos= net_store_data(pos, (uchar*) report_password, report_password_len);
1117
int2store(pos, (uint16_t) report_port); pos+= 2;
1118
int4store(pos, rpl_recovery_rank); pos+= 4;
1119
/* The master will fill in master_id */
1120
int4store(pos, 0); pos+= 4;
1122
if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
1124
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1126
*suppress_warnings= true; // Suppress reconnect warning
1128
else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
1131
snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle),
1132
drizzle_errno(drizzle));
1133
mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1134
ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1142
bool show_master_info(THD* thd, Master_info* mi)
1144
// TODO: fix this for multi-master
1145
List<Item> field_list;
1146
Protocol *protocol= thd->protocol;
1148
field_list.push_back(new Item_empty_string("Slave_IO_State",
1150
field_list.push_back(new Item_empty_string("Master_Host",
1152
field_list.push_back(new Item_empty_string("Master_User",
1154
field_list.push_back(new Item_return_int("Master_Port", 7,
1155
DRIZZLE_TYPE_LONG));
1156
field_list.push_back(new Item_return_int("Connect_Retry", 10,
1157
DRIZZLE_TYPE_LONG));
1158
field_list.push_back(new Item_empty_string("Master_Log_File",
1160
field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
1161
DRIZZLE_TYPE_LONGLONG));
1162
field_list.push_back(new Item_empty_string("Relay_Log_File",
1164
field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
1165
DRIZZLE_TYPE_LONGLONG));
1166
field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
1168
field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
1169
field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1170
field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
1171
field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
1172
field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
1173
field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
1174
field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
1175
field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
1177
field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
1178
field_list.push_back(new Item_empty_string("Last_Error", 20));
1179
field_list.push_back(new Item_return_int("Skip_Counter", 10,
1180
DRIZZLE_TYPE_LONG));
1181
field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
1182
DRIZZLE_TYPE_LONGLONG));
1183
field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
1184
DRIZZLE_TYPE_LONGLONG));
1185
field_list.push_back(new Item_empty_string("Until_Condition", 6));
1186
field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1187
field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1188
DRIZZLE_TYPE_LONGLONG));
1189
field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
1190
field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
1191
sizeof(mi->ssl_ca)));
1192
field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
1193
sizeof(mi->ssl_capath)));
1194
field_list.push_back(new Item_empty_string("Master_SSL_Cert",
1195
sizeof(mi->ssl_cert)));
1196
field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
1197
sizeof(mi->ssl_cipher)));
1198
field_list.push_back(new Item_empty_string("Master_SSL_Key",
1199
sizeof(mi->ssl_key)));
1200
field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1201
DRIZZLE_TYPE_LONGLONG));
1202
field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
1204
field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
1205
field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1206
field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
1207
field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
1209
if (protocol->send_fields(&field_list,
1210
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1215
String *packet= &thd->packet;
1216
protocol->prepare_for_resend();
1219
slave_running can be accessed without run_lock but not other
1220
non-volotile members like mi->io_thd, which is guarded by the mutex.
1222
pthread_mutex_lock(&mi->run_lock);
1223
protocol->store(mi->io_thd ? mi->io_thd->get_proc_info() : "", &my_charset_bin);
1224
pthread_mutex_unlock(&mi->run_lock);
1226
pthread_mutex_lock(&mi->data_lock);
1227
pthread_mutex_lock(&mi->rli.data_lock);
1228
protocol->store(mi->host, &my_charset_bin);
1229
protocol->store(mi->user, &my_charset_bin);
1230
protocol->store((uint32_t) mi->port);
1231
protocol->store((uint32_t) mi->connect_retry);
1232
protocol->store(mi->master_log_name, &my_charset_bin);
1233
protocol->store((uint64_t) mi->master_log_pos);
1234
protocol->store(mi->rli.group_relay_log_name +
1235
dirname_length(mi->rli.group_relay_log_name),
1237
protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1238
protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
1239
protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1240
"Yes" : "No", &my_charset_bin);
1241
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1242
protocol->store(rpl_filter->get_do_db());
1243
protocol->store(rpl_filter->get_ignore_db());
1246
String tmp(buf, sizeof(buf), &my_charset_bin);
1247
rpl_filter->get_do_table(&tmp);
1248
protocol->store(&tmp);
1249
rpl_filter->get_ignore_table(&tmp);
1250
protocol->store(&tmp);
1251
rpl_filter->get_wild_do_table(&tmp);
1252
protocol->store(&tmp);
1253
rpl_filter->get_wild_ignore_table(&tmp);
1254
protocol->store(&tmp);
1256
protocol->store(mi->rli.last_error().number);
1257
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1258
protocol->store((uint32_t) mi->rli.slave_skip_counter);
1259
protocol->store((uint64_t) mi->rli.group_master_log_pos);
1260
protocol->store((uint64_t) mi->rli.log_space_total);
1263
mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
1264
( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
1265
"Relay"), &my_charset_bin);
1266
protocol->store(mi->rli.until_log_name, &my_charset_bin);
1267
protocol->store((uint64_t) mi->rli.until_log_pos);
1269
protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
1270
protocol->store(mi->ssl_ca, &my_charset_bin);
1271
protocol->store(mi->ssl_capath, &my_charset_bin);
1272
protocol->store(mi->ssl_cert, &my_charset_bin);
1273
protocol->store(mi->ssl_cipher, &my_charset_bin);
1274
protocol->store(mi->ssl_key, &my_charset_bin);
1277
Seconds_Behind_Master: if SQL thread is running and I/O thread is
1278
connected, we can compute it otherwise show NULL (i.e. unknown).
1280
if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
1281
mi->rli.slave_running)
1283
long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
1284
- mi->clock_diff_with_master);
1286
Apparently on some systems time_diff can be <0. Here are possible
1287
reasons related to MySQL:
1288
- the master is itself a slave of another master whose time is ahead.
1289
- somebody used an explicit SET TIMESTAMP on the master.
1290
Possible reason related to granularity-to-second of time functions
1291
(nothing to do with MySQL), which can explain a value of -1:
1292
assume the master's and slave's time are perfectly synchronized, and
1293
that at slave's connection time, when the master's timestamp is read,
1294
it is at the very end of second 1, and (a very short time later) when
1295
the slave's timestamp is read it is at the very beginning of second
1296
2. Then the recorded value for master is 1 and the recorded value for
1297
slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1298
between timestamp of slave and rli->last_master_timestamp is 0
1299
(i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1300
This confuses users, so we don't go below 0: hence the max().
1302
last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1303
special marker to say "consider we have caught up".
1305
protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1306
max((long)0, time_diff) : 0));
1310
protocol->store_null();
1312
protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
1315
protocol->store(mi->last_error().number);
1317
protocol->store(mi->last_error().message, &my_charset_bin);
1319
protocol->store(mi->rli.last_error().number);
1321
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1323
pthread_mutex_unlock(&mi->rli.data_lock);
1324
pthread_mutex_unlock(&mi->data_lock);
1326
if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
1334
void set_slave_thread_options(THD* thd)
1337
It's nonsense to constrain the slave threads with max_join_size; if a
1338
query succeeded on master, we HAVE to execute it. So set
1339
OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
1340
(and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
1341
SELECT examining more than 4 billion rows would still fail (yes, because
1342
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1343
only for client threads.
1345
uint64_t options= thd->options | OPTION_BIG_SELECTS;
1346
if (opt_log_slave_updates)
1347
options|= OPTION_BIN_LOG;
1349
options&= ~OPTION_BIN_LOG;
1350
thd->options= options;
1351
thd->variables.completion_type= 0;
1355
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
1357
thd->variables.character_set_client=
1358
global_system_variables.character_set_client;
1359
thd->variables.collation_connection=
1360
global_system_variables.collation_connection;
1361
thd->variables.collation_server=
1362
global_system_variables.collation_server;
1363
thd->update_charset();
1366
We use a const cast here since the conceptual (and externally
1367
visible) behavior of the function is to set the default charset of
1368
the thread. That the cache has to be invalidated is a secondary
1371
const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
1379
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
1381
int32_t simulate_error= 0;
1382
thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
1383
SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1384
thd->security_ctx->skip_grants();
1385
my_net_init(&thd->net, 0);
1387
Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1388
slave threads, since a replication event can become this much larger
1389
than the corresponding packet (query) sent from client to master.
1391
thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1392
+ MAX_LOG_EVENT_HEADER; /* note, incr over the global not session var */
1393
thd->slave_thread = 1;
1394
thd->enable_slow_log= opt_log_slow_slave_statements;
1395
set_slave_thread_options(thd);
1396
thd->client_capabilities = CLIENT_LOCAL_FILES;
1397
pthread_mutex_lock(&LOCK_thread_count);
1398
thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
1399
pthread_mutex_unlock(&LOCK_thread_count);
1401
simulate_error|= (1 << SLAVE_THD_IO);
1402
simulate_error|= (1 << SLAVE_THD_SQL);
1403
if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
1410
if (thd_type == SLAVE_THD_SQL)
1411
thd_proc_info(thd, "Waiting for the next event in relay log");
1413
thd_proc_info(thd, "Waiting for master update");
1414
thd->version=refresh_version;
1420
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1421
void* thread_killed_arg)
1424
thr_alarm_t alarmed;
1426
thr_alarm_init(&alarmed);
1427
time_t start_time= my_time(0);
1428
time_t end_time= start_time+sec;
1430
while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1434
The only reason we are asking for alarm is so that
1435
we will be woken up in case of murder, so if we do not get killed,
1436
set the alarm so it goes off after we wake up naturally
1438
thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
1440
thr_end_alarm(&alarmed);
1442
if ((*thread_killed)(thd,thread_killed_arg))
1444
start_time= my_time(0);
1450
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
1451
bool *suppress_warnings)
1453
uchar buf[FN_REFLEN + 10];
1455
int32_t binlog_flags = 0; // for now
1456
char* logname = mi->master_log_name;
1458
*suppress_warnings= false;
1460
// TODO if big log files: Change next to int8store()
1461
int4store(buf, (uint32_t) mi->master_log_pos);
1462
int2store(buf + 4, binlog_flags);
1463
int4store(buf + 6, server_id);
1464
len = (uint32_t) strlen(logname);
1465
memcpy(buf + 10, logname,len);
1466
if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
1469
Something went wrong, so we will just reconnect and retry later
1470
in the future, we should do a better error analysis, but for
1471
now we just fill up the error log :-)
1473
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1474
*suppress_warnings= true; // Suppress reconnect warning
1476
sql_print_error(_("Error on COM_BINLOG_DUMP: %d %s, will retry in %d secs"),
1477
drizzle_errno(drizzle), drizzle_error(drizzle),
1486
Read one event from the master
1490
DRIZZLE DRIZZLE connection
1491
mi Master connection information
1492
suppress_warnings TRUE when a normal net read timeout has caused us to
1493
try a reconnect. We do not want to print anything to
1494
the error log in this case because this a anormal
1495
event in an idle server.
1498
'packet_error' Error
1499
number Length of packet
1502
static uint32_t read_event(DRIZZLE *drizzle,
1503
Master_info *mi __attribute__((unused)),
1504
bool* suppress_warnings)
1508
*suppress_warnings= false;
1510
my_real_read() will time us out
1511
We check if we were told to die, and if not, try reading again
1513
if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
1514
return(packet_error);
1516
len = cli_safe_read(drizzle);
1517
if (len == packet_error || (int32_t) len < 1)
1519
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1522
We are trying a normal reconnect after a read timeout;
1523
we suppress prints to .err file as long as the reconnect
1524
happens without problems
1526
*suppress_warnings= true;
1529
sql_print_error(_("Error reading packet from server: %s ( server_errno=%d)"),
1530
drizzle_error(drizzle), drizzle_errno(drizzle));
1531
return(packet_error);
1534
/* Check if eof packet */
1535
if (len < 8 && drizzle->net.read_pos[0] == 254)
1537
sql_print_information(_("Slave: received end packet from server, apparent "
1538
"master shutdown: %s"),
1539
drizzle_error(drizzle));
1540
return(packet_error);
1547
int32_t check_expected_error(THD* thd __attribute__((unused)),
1548
Relay_log_info const *rli __attribute__((unused)),
1549
int32_t expected_error)
1551
switch (expected_error) {
1552
case ER_NET_READ_ERROR:
1553
case ER_NET_ERROR_ON_WRITE:
1554
case ER_QUERY_INTERRUPTED:
1555
case ER_SERVER_SHUTDOWN:
1556
case ER_NEW_ABORTING_CONNECTION:
1565
Check if the current error is of temporary nature of not.
1566
Some errors are temporary in nature, such as
1567
ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. Ndb also signals
1568
that the error is temporary by pushing a warning with the error code
1569
ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1571
static int32_t has_temporary_error(THD *thd)
1573
if (thd->is_fatal_error)
1576
if (thd->main_da.is_error())
1579
my_error(ER_LOCK_DEADLOCK, MYF(0));
1583
If there is no message in THD, we can't say if it's a temporary
1584
error or not. This is currently the case for Incident_log_event,
1585
which sets no message. Return FALSE.
1587
if (!thd->is_error())
1591
Temporary error codes:
1592
currently, InnoDB deadlock detected by InnoDB or lock
1593
wait timeout (innodb_lock_wait_timeout exceeded
1595
if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
1596
thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
1604
Applies the given event and advances the relay log position.
1606
In essence, this function does:
1609
ev->apply_event(rli);
1610
ev->update_pos(rli);
1613
But it also does some maintainance, such as skipping events if
1614
needed and reporting errors.
1616
If the @c skip flag is set, then it is tested whether the event
1617
should be skipped, by looking at the slave_skip_counter and the
1618
server id. The skip flag should be set when calling this from a
1619
replication thread but not set when executing an explicit BINLOG
1624
@retval 1 Error calling ev->apply_event().
1626
@retval 2 No error calling ev->apply_event(), but error calling
1629
int32_t apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
1632
int32_t exec_res= 0;
1635
Execute the event to change the database and update the binary
1636
log coordinates, but first we set some data that is needed for
1639
The event will be executed unless it is supposed to be skipped.
1641
Queries originating from this server must be skipped. Low-level
1642
events (Format_description_log_event, Rotate_log_event,
1643
Stop_log_event) from this server must also be skipped. But for
1644
those we don't want to modify 'group_master_log_pos', because
1645
these events did not exist on the master.
1646
Format_description_log_event is not completely skipped.
1648
Skip queries specified by the user in 'slave_skip_counter'. We
1649
can't however skip events that has something to do with the log
1652
Filtering on own server id is extremely important, to ignore
1653
execution of events created by the creation/rotation of the relay
1654
log (remember that now the relay log starts with its Format_desc,
1658
thd->server_id = ev->server_id; // use the original server id for logging
1659
thd->set_time(); // time the query
1660
thd->lex->current_select= 0;
1662
ev->when= my_time(0);
1663
ev->thd = thd; // because up to this point, ev->thd == 0
1667
int32_t reason= ev->shall_skip(rli);
1668
if (reason == Log_event::EVENT_SKIP_COUNT)
1669
--rli->slave_skip_counter;
1670
pthread_mutex_unlock(&rli->data_lock);
1671
if (reason == Log_event::EVENT_SKIP_NOT)
1672
exec_res= ev->apply_event(rli);
1675
exec_res= ev->apply_event(rli);
1679
int32_t error= ev->update_pos(rli);
1681
The update should not fail, so print an error message and
1682
return an error code.
1684
TODO: Replace this with a decent error message when merged
1685
with BUG#24954 (which adds several new error message).
1690
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
1691
_("It was not possible to update the positions"
1692
" of the relay log information: the slave may"
1693
" be in an inconsistent state."
1694
" Stopped in %s position %s"),
1695
rli->group_relay_log_name,
1696
llstr(rli->group_relay_log_pos, buf));
1701
return(exec_res ? 1 : 0);
1706
Top-level function for executing the next event from the relay log.
1708
This function reads the event from the relay log, executes it, and
1709
advances the relay log position. It also handles errors, etc.
1711
This function may fail to apply the event for the following reasons:
1713
- The position specfied by the UNTIL condition of the START SLAVE
1716
- It was not possible to read the event from the log.
1718
- The slave is killed.
1720
- An error occurred when applying the event, and the event has been
1721
tried slave_trans_retries times. If the event has been retried
1722
fewer times, 0 is returned.
1724
- init_master_info or init_relay_log_pos failed. (These are called
1725
if a failure occurs when applying the event.)</li>
1727
- An error occurred when updating the binlog position.
1729
@retval 0 The event was applied.
1731
@retval 1 The event was not applied.
1733
static int32_t exec_relay_log_event(THD* thd, Relay_log_info* rli)
1736
We acquire this mutex since we need it for all operations except
1737
event execution. But we will release it in places where we will
1738
wait for something for example inside of next_event().
1740
pthread_mutex_lock(&rli->data_lock);
1742
Log_event * ev = next_event(rli);
1744
assert(rli->sql_thd==thd);
1746
if (sql_slave_killed(thd,rli))
1748
pthread_mutex_unlock(&rli->data_lock);
1757
This tests if the position of the beginning of the current event
1758
hits the UNTIL barrier.
1760
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
1761
rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
1762
rli->group_master_log_pos :
1763
ev->log_pos - ev->data_written))
1766
sql_print_information(_("Slave SQL thread stopped because it reached its"
1767
" UNTIL position %s"),
1768
llstr(rli->until_pos(), buf));
1770
Setting abort_slave flag because we do not want additional message about
1771
error in query execution to be printed.
1773
rli->abort_slave= 1;
1774
pthread_mutex_unlock(&rli->data_lock);
1778
exec_res= apply_event_and_update_pos(ev, thd, rli, true);
1781
Format_description_log_event should not be deleted because it will be
1782
used to read info about the relay log's format; it will be deleted when
1783
the SQL thread does not need it, i.e. when this thread terminates.
1785
if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
1791
update_log_pos failed: this should not happen, so we don't
1797
if (slave_trans_retries)
1799
int32_t temp_err= 0;
1800
if (exec_res && (temp_err= has_temporary_error(thd)))
1804
We were in a transaction which has been rolled back because of a
1806
let's seek back to BEGIN log event and retry it all again.
1807
Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
1808
there is no rollback since 5.0.13 (ref: manual).
1809
We have to not only seek but also
1810
a) init_master_info(), to seek back to hot relay log's start for later
1811
(for when we will come back to this hot log after re-processing the
1812
possibly existing old logs where BEGIN is: check_binlog_magic() will
1813
then need the cache to be at position 0 (see comments at beginning of
1814
init_master_info()).
1815
b) init_relay_log_pos(), because the BEGIN may be an older relay log.
1817
if (rli->trans_retries < slave_trans_retries)
1819
if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
1820
sql_print_error(_("Failed to initialize the master info structure"));
1821
else if (init_relay_log_pos(rli,
1822
rli->group_relay_log_name,
1823
rli->group_relay_log_pos,
1825
sql_print_error(_("Error initializing relay log position: %s"),
1830
end_trans(thd, ROLLBACK);
1831
/* chance for concurrent connection to get more locks */
1832
safe_sleep(thd, min(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1833
(CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1834
pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1835
rli->trans_retries++;
1836
rli->retried_trans++;
1837
pthread_mutex_unlock(&rli->data_lock);
1841
sql_print_error(_("Slave SQL thread retried transaction %lu time(s) "
1842
"in vain, giving up. Consider raising the value of "
1843
"the slave_transaction_retries variable."),
1844
slave_trans_retries);
1846
else if ((exec_res && !temp_err) ||
1847
(opt_using_transactions &&
1848
rli->group_relay_log_pos == rli->event_relay_log_pos))
1851
Only reset the retry counter if the entire group succeeded
1852
or failed with a non-transient error. On a successful
1853
event, the execution will proceed as usual; in the case of a
1854
non-transient error, the slave will stop with an error.
1856
rli->trans_retries= 0; // restart from fresh
1861
pthread_mutex_unlock(&rli->data_lock);
1862
rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
1863
ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
1864
_("Could not parse relay log event entry. The possible reasons "
1865
"are: the master's binary log is corrupted (you can check this "
1866
"by running 'mysqlbinlog' on the binary log), the slave's "
1867
"relay log is corrupted (you can check this by running "
1868
"'mysqlbinlog' on the relay log), a network problem, or a bug "
1869
"in the master's or slave's DRIZZLE code. If you want to check "
1870
"the master's binary log or slave's relay log, you will be "
1871
"able to know their names by issuing 'SHOW SLAVE STATUS' "
1878
@brief Try to reconnect slave IO thread.
1880
@details Terminates current connection to master, sleeps for
1881
@c mi->connect_retry msecs and initiates new connection with
1882
@c safe_reconnect(). Variable pointed by @c retry_count is increased -
1883
if it exceeds @c master_retry_count then connection is not re-established
1884
and function signals error.
1885
Unless @c suppres_warnings is TRUE, a warning is put in the server error log
1886
when reconnecting. The warning message and messages used to report errors
1887
are taken from @c messages array. In case @c master_retry_count is exceeded,
1888
no messages are added to the log.
1890
@param[in] thd Thread context.
1891
@param[in] DRIZZLE DRIZZLE connection.
1892
@param[in] mi Master connection information.
1893
@param[in,out] retry_count Number of attempts to reconnect.
1894
@param[in] suppress_warnings TRUE when a normal net read timeout
1895
has caused to reconnecting.
1896
@param[in] messages Messages to print/log, see
1897
reconnect_messages[] array.
1900
@retval 1 There was an error.
1903
static int32_t try_to_reconnect(THD *thd, DRIZZLE *drizzle, Master_info *mi,
1904
uint32_t *retry_count, bool suppress_warnings,
1905
const char *messages[SLAVE_RECON_MSG_MAX])
1907
mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1908
thd->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
1909
end_server(drizzle);
1910
if ((*retry_count)++)
1912
if (*retry_count > master_retry_count)
1913
return 1; // Don't retry forever
1914
safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1917
if (check_io_slave_killed(thd, mi,
1918
_(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1920
thd->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
1921
if (!suppress_warnings)
1923
char buf[256], llbuff[22];
1924
snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1925
IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
1927
Raise a warining during registering on master/requesting dump.
1928
Log a message reading event.
1930
if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
1932
mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1933
ER(ER_SLAVE_MASTER_COM_FAILURE),
1934
_(messages[SLAVE_RECON_MSG_COMMAND]), buf);
1938
sql_print_information(buf);
1941
if (safe_reconnect(thd, drizzle, mi, 1) || io_slave_killed(thd, mi))
1943
if (global_system_variables.log_warnings)
1944
sql_print_information(_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1951
/* Slave I/O Thread entry point */
1953
pthread_handler_t handle_slave_io(void *arg)
1955
THD *thd; // needs to be first for thread_stack
1957
Master_info *mi = (Master_info*)arg;
1958
Relay_log_info *rli= &mi->rli;
1960
uint32_t retry_count;
1961
bool suppress_warnings;
1962
uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
1969
pthread_mutex_lock(&mi->run_lock);
1970
/* Inform waiting threads that slave has started */
1973
mi->events_till_disconnect = disconnect_slave_event_count;
1976
THD_CHECK_SENTRY(thd);
1979
pthread_detach_this_thread();
1980
thd->thread_stack= (char*) &thd; // remember where our stack is
1981
if (init_slave_thread(thd, SLAVE_THD_IO))
1983
pthread_cond_broadcast(&mi->start_cond);
1984
pthread_mutex_unlock(&mi->run_lock);
1985
sql_print_error(_("Failed during slave I/O thread initialization"));
1988
pthread_mutex_lock(&LOCK_thread_count);
1989
threads.append(thd);
1990
pthread_mutex_unlock(&LOCK_thread_count);
1991
mi->slave_running = 1;
1992
mi->abort_slave = 0;
1993
pthread_mutex_unlock(&mi->run_lock);
1994
pthread_cond_broadcast(&mi->start_cond);
1996
if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
1998
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
1999
ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
2003
thd_proc_info(thd, "Connecting to master");
2004
// we can get killed during safe_connect
2005
if (!safe_connect(thd, drizzle, mi))
2007
sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
2008
"replication started in log '%s' at position %s"),
2009
mi->user, mi->host, mi->port,
2011
llstr(mi->master_log_pos,llbuff));
2013
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
2014
thread, since a replication event can become this much larger than
2015
the corresponding packet (query) sent from client to master.
2017
drizzle->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
2021
sql_print_information(_("Slave I/O thread killed while connecting to master"));
2027
// TODO: the assignment below should be under mutex (5.0)
2028
mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
2029
thd->slave_net = &drizzle->net;
2030
thd_proc_info(thd, "Checking master version");
2031
if (get_master_version_and_clock(drizzle, mi))
2034
if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
2037
Register ourselves with the master.
2039
thd_proc_info(thd, "Registering slave on master");
2040
if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2042
if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
2043
"while registering slave on master"))
2045
sql_print_error(_("Slave I/O thread couldn't register on master"));
2046
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2047
reconnect_messages[SLAVE_RECON_ACT_REG]))
2054
if (!retry_count_reg)
2057
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2058
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2059
reconnect_messages[SLAVE_RECON_ACT_REG]))
2065
while (!io_slave_killed(thd,mi))
2067
thd_proc_info(thd, "Requesting binlog dump");
2068
if (request_dump(drizzle, mi, &suppress_warnings))
2070
sql_print_error(_("Failed on request_dump()"));
2071
if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while \
2072
requesting master dump")) ||
2073
try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2074
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2078
if (!retry_count_dump)
2081
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2082
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2083
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2088
while (!io_slave_killed(thd,mi))
2092
We say "waiting" because read_event() will wait if there's nothing to
2093
read. But if there's something to read, it will not wait. The
2094
important thing is to not confuse users by saying "reading" whereas
2095
we're in fact receiving nothing.
2097
thd_proc_info(thd, _("Waiting for master to send event"));
2098
event_len= read_event(drizzle, mi, &suppress_warnings);
2099
if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while "
2102
if (!retry_count_event)
2104
retry_count_event++;
2105
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2106
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2107
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2112
if (event_len == packet_error)
2114
uint32_t drizzle_error_number= drizzle_errno(drizzle);
2115
switch (drizzle_error_number) {
2116
case CR_NET_PACKET_TOO_LARGE:
2117
sql_print_error(_("Log entry on master is longer than "
2118
"max_allowed_packet (%ld) on "
2119
"slave. If the entry is correct, restart the "
2120
"server with a higher value of "
2121
"max_allowed_packet"),
2122
thd->variables.max_allowed_packet);
2124
case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2125
sql_print_error(ER(drizzle_error_number), drizzle_error_number,
2126
drizzle_error(drizzle));
2128
case EE_OUTOFMEMORY:
2129
case ER_OUTOFMEMORY:
2131
_("Stopping slave I/O thread due to out-of-memory error from master"));
2134
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2135
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2138
} // if (event_len == packet_error)
2140
retry_count=0; // ok event, reset retry counter
2141
thd_proc_info(thd, _("Queueing master event to the relay log"));
2142
if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2146
if (flush_master_info(mi, 1))
2148
sql_print_error(_("Failed to flush master info file"));
2152
See if the relay logs take too much space.
2153
We don't lock mi->rli.log_space_lock here; this dirty read saves time
2154
and does not introduce any problem:
2155
- if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
2156
the clean value is 0), then we are reading only one more event as we
2157
should, and we'll block only at the next event. No big deal.
2158
- if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
2159
the clean value is 1), then we are going into wait_for_relay_log_space()
2160
for no reason, but this function will do a clean read, notice the clean
2161
value and exit immediately.
2163
if (rli->log_space_limit && rli->log_space_limit <
2164
rli->log_space_total &&
2165
!rli->ignore_log_space_limit)
2166
if (wait_for_relay_log_space(rli))
2168
sql_print_error(_("Slave I/O thread aborted while waiting for "
2169
"relay log space"));
2177
// print the current replication position
2178
sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2180
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2181
VOID(pthread_mutex_lock(&LOCK_thread_count));
2182
thd->query = thd->db = 0; // extra safety
2183
thd->query_length= thd->db_length= 0;
2184
VOID(pthread_mutex_unlock(&LOCK_thread_count));
2188
Here we need to clear the active VIO before closing the
2189
connection with the master. The reason is that THD::awake()
2190
might be called from terminate_slave_thread() because somebody
2191
issued a STOP SLAVE. If that happends, the close_active_vio()
2192
can be called in the middle of closing the VIO associated with
2193
the 'mysql' object, causing a crash.
2195
drizzle_close(drizzle);
2198
write_ignored_events_info_to_relay_log(thd, mi);
2199
thd_proc_info(thd, _("Waiting for slave mutex on exit"));
2200
pthread_mutex_lock(&mi->run_lock);
2202
/* Forget the relay log's format */
2203
delete mi->rli.relay_log.description_event_for_queue;
2204
mi->rli.relay_log.description_event_for_queue= 0;
2205
// TODO: make rpl_status part of Master_info
2206
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2207
assert(thd->net.buff != 0);
2208
net_end(&thd->net); // destructor will not free it, because net.vio is 0
2209
close_thread_tables(thd);
2210
pthread_mutex_lock(&LOCK_thread_count);
2211
THD_CHECK_SENTRY(thd);
2213
pthread_mutex_unlock(&LOCK_thread_count);
2215
mi->slave_running= 0;
2218
Note: the order of the two following calls (first broadcast, then unlock)
2219
is important. Otherwise a killer_thread can execute between the calls and
2220
delete the mi structure leading to a crash! (see BUG#25306 for details)
2222
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
2223
pthread_mutex_unlock(&mi->run_lock);
2226
return(0); // Can't return anything here
2230
/* Slave SQL Thread entry point */
2232
pthread_handler_t handle_slave_sql(void *arg)
2234
THD *thd; /* needs to be first for thread_stack */
2235
char llbuff[22],llbuff1[22];
2237
Relay_log_info* rli = &((Master_info*)arg)->rli;
2242
assert(rli->inited);
2243
pthread_mutex_lock(&rli->run_lock);
2244
assert(!rli->slave_running);
2246
rli->events_till_abort = abort_slave_event_count;
2249
thd->thread_stack = (char*)&thd; // remember where our stack is
2252
/* Inform waiting threads that slave has started */
2253
rli->slave_run_id++;
2254
rli->slave_running = 1;
2256
pthread_detach_this_thread();
2257
if (init_slave_thread(thd, SLAVE_THD_SQL))
2260
TODO: this is currently broken - slave start and change master
2261
will be stuck if we fail here
2263
pthread_cond_broadcast(&rli->start_cond);
2264
pthread_mutex_unlock(&rli->run_lock);
2265
sql_print_error(_("Failed during slave thread initialization"));
2268
thd->init_for_queries();
2269
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
2270
pthread_mutex_lock(&LOCK_thread_count);
2271
threads.append(thd);
2272
pthread_mutex_unlock(&LOCK_thread_count);
2274
We are going to set slave_running to 1. Assuming slave I/O thread is
2275
alive and connected, this is going to make Seconds_Behind_Master be 0
2276
i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
2277
the moment we start we can think we are caught up, and the next second we
2278
start receiving data so we realize we are not caught up and
2279
Seconds_Behind_Master grows. No big deal.
2281
rli->abort_slave = 0;
2282
pthread_mutex_unlock(&rli->run_lock);
2283
pthread_cond_broadcast(&rli->start_cond);
2286
Reset errors for a clean start (otherwise, if the master is idle, the SQL
2287
thread may execute no Query_log_event, so the error will remain even
2288
though there's no problem anymore). Do not reset the master timestamp
2289
(imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
2290
as we are not sure that we are going to receive a query, we want to
2291
remember the last master timestamp (to say how many seconds behind we are
2293
But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
2297
//tell the I/O thread to take relay_log_space_limit into account from now on
2298
pthread_mutex_lock(&rli->log_space_lock);
2299
rli->ignore_log_space_limit= 0;
2300
pthread_mutex_unlock(&rli->log_space_lock);
2301
rli->trans_retries= 0; // start from "no error"
2303
if (init_relay_log_pos(rli,
2304
rli->group_relay_log_name,
2305
rli->group_relay_log_pos,
2306
1 /*need data lock*/, &errmsg,
2307
1 /*look for a description_event*/))
2309
sql_print_error(_("Error initializing relay log position: %s"),
2313
THD_CHECK_SENTRY(thd);
2314
assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2316
Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2317
correct position when it's called just after my_b_seek() (the questionable
2318
stuff is those "seek is done on next read" comments in the my_b_seek()
2320
The crude reality is that this assertion randomly fails whereas
2321
replication seems to work fine. And there is no easy explanation why it
2322
fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
2323
init_relay_log_pos() called above). Maybe the assertion would be
2324
meaningful if we held rli->data_lock between the my_b_seek() and the
2327
assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2328
assert(rli->sql_thd == thd);
2330
if (global_system_variables.log_warnings)
2331
sql_print_information(_("Slave SQL thread initialized, "
2332
"starting replication in log '%s' at "
2333
"position %s, relay log '%s' position: %s"),
2335
llstr(rli->group_master_log_pos,llbuff),
2336
rli->group_relay_log_name,
2337
llstr(rli->group_relay_log_pos,llbuff1));
2339
/* execute init_slave variable */
2340
if (sys_init_slave.value_length)
2342
execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
2343
if (thd->is_slave_error)
2345
sql_print_error(_("Slave SQL thread aborted. "
2346
"Can't execute init_slave query"));
2352
First check until condition - probably there is nothing to execute. We
2353
do not want to wait for next event in this case.
2355
pthread_mutex_lock(&rli->data_lock);
2356
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
2357
rli->is_until_satisfied(rli->group_master_log_pos))
2360
sql_print_information(_("Slave SQL thread stopped because it reached its"
2361
" UNTIL position %s"), llstr(rli->until_pos(), buf));
2362
pthread_mutex_unlock(&rli->data_lock);
2365
pthread_mutex_unlock(&rli->data_lock);
2367
/* Read queries from the IO/THREAD until this thread is killed */
2369
while (!sql_slave_killed(thd,rli))
2371
thd_proc_info(thd, _("Reading event from the relay log"));
2372
assert(rli->sql_thd == thd);
2373
THD_CHECK_SENTRY(thd);
2374
if (exec_relay_log_event(thd,rli))
2376
// do not scare the user if SQL thread was simply killed or stopped
2377
if (!sql_slave_killed(thd,rli))
2380
retrieve as much info as possible from the thd and, error
2381
codes and warnings and print this to the error log as to
2382
allow the user to locate the error
2384
uint32_t const last_errno= rli->last_error().number;
2386
if (thd->is_error())
2388
char const *const errmsg= thd->main_da.message();
2390
if (last_errno == 0)
2392
rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), errmsg);
2394
else if (last_errno != thd->main_da.sql_errno())
2396
sql_print_error(_("Slave (additional info): %s Error_code: %d"),
2397
errmsg, thd->main_da.sql_errno());
2401
/* Print any warnings issued */
2402
List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
2405
Added controlled slave thread cancel for replication
2406
of user-defined variables.
2408
bool udf_error = false;
2411
if (err->code == ER_CANT_OPEN_LIBRARY)
2413
sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
2416
sql_print_error(_("Error loading user-defined library, slave SQL "
2417
"thread aborted. Install the missing library, "
2418
"and restart the slave SQL thread with "
2419
"\"SLAVE START\". We stopped at log '%s' "
2421
RPL_LOG_NAME, llstr(rli->group_master_log_pos,
2424
sql_print_error(_("Error running query, slave SQL thread aborted. "
2425
"Fix the problem, and restart "
2426
"the slave SQL thread with \"SLAVE START\". "
2427
"We stopped at log '%s' position %s"),
2429
llstr(rli->group_master_log_pos, llbuff));
2435
/* Thread stopped. Print the current replication position to the log */
2436
sql_print_information(_("Slave SQL thread exiting, replication stopped in "
2437
"log '%s' at position %s"),
2439
llstr(rli->group_master_log_pos,llbuff));
2444
Some events set some playgrounds, which won't be cleared because thread
2445
stops. Stopping of this thread may not be known to these events ("stop"
2446
request is detected only by the present function, not by events), so we
2447
must "proactively" clear playgrounds:
2449
rli->cleanup_context(thd, 1);
2450
VOID(pthread_mutex_lock(&LOCK_thread_count));
2452
Some extra safety, which should not been needed (normally, event deletion
2453
should already have done these assignments (each event which sets these
2454
variables is supposed to set them to 0 before terminating)).
2456
thd->query= thd->db= thd->catalog= 0;
2457
thd->query_length= thd->db_length= 0;
2458
VOID(pthread_mutex_unlock(&LOCK_thread_count));
2459
thd_proc_info(thd, "Waiting for slave mutex on exit");
2460
pthread_mutex_lock(&rli->run_lock);
2461
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
2462
pthread_mutex_lock(&rli->data_lock);
2463
assert(rli->slave_running == 1); // tracking buffer overrun
2464
/* When master_pos_wait() wakes up it will check this and terminate */
2465
rli->slave_running= 0;
2466
/* Forget the relay log's format */
2467
delete rli->relay_log.description_event_for_exec;
2468
rli->relay_log.description_event_for_exec= 0;
2469
/* Wake up master_pos_wait() */
2470
pthread_mutex_unlock(&rli->data_lock);
2471
pthread_cond_broadcast(&rli->data_cond);
2472
rli->ignore_log_space_limit= 0; /* don't need any lock */
2473
/* we die so won't remember charset - re-update them on next thread start */
2474
rli->cached_charset_invalidate();
2475
rli->save_temporary_tables = thd->temporary_tables;
2478
TODO: see if we can do this conditionally in next_event() instead
2479
to avoid unneeded position re-init
2481
thd->temporary_tables = 0; // remove tempation from destructor to close them
2482
assert(thd->net.buff != 0);
2483
net_end(&thd->net); // destructor will not free it, because we are weird
2484
assert(rli->sql_thd == thd);
2485
THD_CHECK_SENTRY(thd);
2487
pthread_mutex_lock(&LOCK_thread_count);
2488
THD_CHECK_SENTRY(thd);
2490
pthread_mutex_unlock(&LOCK_thread_count);
2492
Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
2493
is important. Otherwise a killer_thread can execute between the calls and
2494
delete the mi structure leading to a crash! (see BUG#25306 for details)
2496
pthread_cond_broadcast(&rli->stop_cond);
2497
pthread_mutex_unlock(&rli->run_lock); // tell the world we are done
2501
return(0); // Can't return anything here
2506
process_io_create_file()
2509
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
2513
bool cev_not_written;
2514
THD *thd = mi->io_thd;
2515
NET *net = &mi->drizzle->net;
2517
if (unlikely(!cev->is_valid()))
2520
if (!rpl_filter->db_ok(cev->db))
2522
skip_load_data_infile(net);
2525
assert(cev->inited_from_old);
2526
thd->file_id = cev->file_id = mi->file_id++;
2527
thd->server_id = cev->server_id;
2528
cev_not_written = 1;
2530
if (unlikely(net_request_file(net,cev->fname)))
2532
sql_print_error(_("Slave I/O: failed requesting download of '%s'"),
2538
This dummy block is so we could instantiate Append_block_log_event
2539
once and then modify it slightly instead of doing it multiple times
2543
Append_block_log_event aev(thd,0,0,0,0);
2547
if (unlikely((num_bytes=my_net_read(net)) == packet_error))
2549
sql_print_error(_("Network read error downloading '%s' from master"),
2553
if (unlikely(!num_bytes)) /* eof */
2555
/* 3.23 master wants it */
2556
net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0);
2558
If we wrote Create_file_log_event, then we need to write
2559
Execute_load_log_event. If we did not write Create_file_log_event,
2560
then this is an empty file and we can just do as if the LOAD DATA
2561
INFILE had not existed, i.e. write nothing.
2563
if (unlikely(cev_not_written))
2565
Execute_load_log_event xev(thd,0,0);
2566
xev.log_pos = cev->log_pos;
2567
if (unlikely(mi->rli.relay_log.append(&xev)))
2569
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2570
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2571
_("error writing Exec_load event to relay log"));
2574
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2577
if (unlikely(cev_not_written))
2579
cev->block = net->read_pos;
2580
cev->block_len = num_bytes;
2581
if (unlikely(mi->rli.relay_log.append(cev)))
2583
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2584
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2585
_("error writing Create_file event to relay log"));
2589
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2593
aev.block = net->read_pos;
2594
aev.block_len = num_bytes;
2595
aev.log_pos = cev->log_pos;
2596
if (unlikely(mi->rli.relay_log.append(&aev)))
2598
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2599
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2600
_("error writing Append_block event to relay log"));
2603
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
2614
Start using a new binary log on the master
2618
mi master_info for the slave
2619
rev The rotate log event read from the binary log
2622
Updates the master info with the place in the next binary
2623
log where we should start reading.
2624
Rotate the relay log to avoid mixed-format relay logs.
2627
We assume we already locked mi->data_lock
2631
1 Log event is illegal
2635
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
2637
safe_mutex_assert_owner(&mi->data_lock);
2639
if (unlikely(!rev->is_valid()))
2642
/* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2643
memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
2644
mi->master_log_pos= rev->pos;
2646
If we do not do this, we will be getting the first
2647
rotate event forever, so we need to not disconnect after one.
2649
if (disconnect_slave_event_count)
2650
mi->events_till_disconnect++;
2653
If description_event_for_queue is format <4, there is conversion in the
2654
relay log to the slave's format (4). And Rotate can mean upgrade or
2655
nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
2656
no need to reset description_event_for_queue now. And if it's nothing (same
2657
master version as before), no need (still using the slave's format).
2659
if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
2661
delete mi->rli.relay_log.description_event_for_queue;
2662
/* start from format 3 (DRIZZLE 4.0) again */
2663
mi->rli.relay_log.description_event_for_queue= new
2664
Format_description_log_event(3);
2667
Rotate the relay log makes binlog format detection easier (at next slave
2668
start or mysqlbinlog)
2670
rotate_relay_log(mi); /* will take the right mutexes */
2675
Reads a 3.23 event and converts it to the slave's format. This code was
2676
copied from DRIZZLE 4.0.
2678
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
2681
const char *errmsg = 0;
2683
bool ignore_event= 0;
2685
Relay_log_info *rli= &mi->rli;
2688
If we get Load event, we need to pass a non-reusable buffer
2689
to read_log_event, so we do a trick
2691
if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
2693
if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
2695
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2696
ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
2699
memcpy(tmp_buf,buf,event_len);
2701
Create_file constructor wants a 0 as last char of buffer, this 0 will
2702
serve as the string-termination char for the file's name (which is at the
2704
We must increment event_len, otherwise the event constructor will not see
2705
this end 0, which leads to segfault.
2707
tmp_buf[event_len++]=0;
2708
int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
2709
buf = (const char*)tmp_buf;
2712
This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
2713
send the loaded file, and write it to the relay log in the form of
2714
Append_block/Exec_load (the SQL thread needs the data, as that thread is not
2715
connected to the master).
2717
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2718
mi->rli.relay_log.description_event_for_queue);
2721
sql_print_error(_("Read invalid event from master: '%s', "
2722
"master could be corrupt but a more likely cause "
2723
"of this is a bug"),
2725
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
2729
pthread_mutex_lock(&mi->data_lock);
2730
ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
2731
switch (ev->get_type_code()) {
2737
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2740
pthread_mutex_unlock(&mi->data_lock);
2745
case CREATE_FILE_EVENT:
2747
Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
2748
queue_old_event() which is for 3.23 events which don't comprise
2749
CREATE_FILE_EVENT. This is because read_log_event() above has just
2750
transformed LOAD_EVENT into CREATE_FILE_EVENT.
2753
/* We come here when and only when tmp_buf != 0 */
2754
assert(tmp_buf != 0);
2756
ev->log_pos+= inc_pos;
2757
int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2759
mi->master_log_pos += inc_pos;
2760
pthread_mutex_unlock(&mi->data_lock);
2761
my_free((char*)tmp_buf, MYF(0));
2768
if (likely(!ignore_event))
2772
Don't do it for fake Rotate events (see comment in
2773
Log_event::Log_event(const char* buf...) in log_event.cc).
2775
ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
2776
if (unlikely(rli->relay_log.append(ev)))
2779
pthread_mutex_unlock(&mi->data_lock);
2782
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2785
mi->master_log_pos+= inc_pos;
2786
pthread_mutex_unlock(&mi->data_lock);
2791
Reads a 4.0 event and converts it to the slave's format. This code was copied
2792
from queue_binlog_ver_1_event(), with some affordable simplifications.
2794
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
2797
const char *errmsg = 0;
2800
Relay_log_info *rli= &mi->rli;
2802
/* read_log_event() will adjust log_pos to be end_log_pos */
2803
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2804
mi->rli.relay_log.description_event_for_queue);
2807
sql_print_error(_("Read invalid event from master: '%s', "
2808
"master could be corrupt but a more likely cause of "
2811
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
2814
pthread_mutex_lock(&mi->data_lock);
2815
switch (ev->get_type_code()) {
2819
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2822
pthread_mutex_unlock(&mi->data_lock);
2831
if (unlikely(rli->relay_log.append(ev)))
2834
pthread_mutex_unlock(&mi->data_lock);
2837
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2839
mi->master_log_pos+= inc_pos;
2841
pthread_mutex_unlock(&mi->data_lock);
2848
Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
2849
(exactly, slave's) format. To do the conversion, we create a 5.0 event from
2850
the 3.23/4.0 bytes, then write this event to the relay log.
2853
Test this code before release - it has to be tested on a separate
2854
setup with 3.23 master or 4.0 master
2857
static int32_t queue_old_event(Master_info *mi, const char *buf,
2860
switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
2863
return(queue_binlog_ver_1_event(mi,buf,event_len));
2865
return(queue_binlog_ver_3_event(mi,buf,event_len));
2866
default: /* unsupported format; eg version 2 */
2874
If the event is 3.23/4.0, passes it to queue_old_event() which will convert
2875
it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
2876
no format conversion, it's pure read/write of bytes.
2877
So a 5.0.0 slave's relay log can contain events in the slave's format or in
2881
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
2885
uint32_t inc_pos= 0;
2886
Relay_log_info *rli= &mi->rli;
2887
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
2890
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
2891
buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
2892
return(queue_old_event(mi,buf,event_len));
2894
pthread_mutex_lock(&mi->data_lock);
2896
switch (buf[EVENT_TYPE_OFFSET]) {
2899
We needn't write this event to the relay log. Indeed, it just indicates a
2900
master server shutdown. The only thing this does is cleaning. But
2901
cleaning is already done on a per-master-thread basis (as the master
2902
server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
2903
prepared statements' deletion are TODO only when we binlog prep stmts).
2905
We don't even increment mi->master_log_pos, because we may be just after
2906
a Rotate event. Btw, in a few milliseconds we are going to have a Start
2907
event from the next binlog (unless the master is presently running
2913
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
2914
if (unlikely(process_io_rotate(mi,&rev)))
2916
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2920
Now the I/O thread has just changed its mi->master_log_name, so
2921
incrementing mi->master_log_pos is nonsense.
2926
case FORMAT_DESCRIPTION_EVENT:
2929
Create an event, and save it (when we rotate the relay log, we will have
2930
to write this event again).
2933
We are the only thread which reads/writes description_event_for_queue.
2934
The relay_log struct does not move (though some members of it can
2935
change), so we needn't any lock (no rli->data_lock, no log lock).
2937
Format_description_log_event* tmp;
2939
if (!(tmp= (Format_description_log_event*)
2940
Log_event::read_log_event(buf, event_len, &errmsg,
2941
mi->rli.relay_log.description_event_for_queue)))
2943
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2946
delete mi->rli.relay_log.description_event_for_queue;
2947
mi->rli.relay_log.description_event_for_queue= tmp;
2949
Though this does some conversion to the slave's format, this will
2950
preserve the master's binlog format version, and number of event types.
2953
If the event was not requested by the slave (the slave did not ask for
2954
it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
2956
inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
2960
case HEARTBEAT_LOG_EVENT:
2963
HB (heartbeat) cannot come before RL (Relay)
2966
Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
2969
error= ER_SLAVE_HEARTBEAT_FAILURE;
2970
error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2971
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2972
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2973
error_msg.append(STRING_WITH_LEN(" log_pos "));
2974
llstr(hb.log_pos, llbuf);
2975
error_msg.append(llbuf, strlen(llbuf));
2978
mi->received_heartbeats++;
2980
compare local and event's versions of log_file, log_pos.
2982
Heartbeat is sent only after an event corresponding to the corrdinates
2983
the heartbeat carries.
2984
Slave can not have a difference in coordinates except in the only
2985
special case when mi->master_log_name, master_log_pos have never
2986
been updated by Rotate event i.e when slave does not have any history
2987
with the master (and thereafter mi->master_log_pos is NULL).
2989
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2991
if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
2992
&& mi->master_log_name != NULL)
2993
|| mi->master_log_pos != hb.log_pos)
2995
/* missed events of heartbeat from the past */
2996
error= ER_SLAVE_HEARTBEAT_FAILURE;
2997
error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
2998
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2999
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
3000
error_msg.append(STRING_WITH_LEN(" log_pos "));
3001
llstr(hb.log_pos, llbuf);
3002
error_msg.append(llbuf, strlen(llbuf));
3005
goto skip_relay_logging;
3015
If this event is originating from this server, don't queue it.
3016
We don't check this for 3.23 events because it's simpler like this; 3.23
3017
will be filtered anyway by the SQL slave thread which also tests the
3018
server id (we must also keep this test in the SQL thread, in case somebody
3019
upgrades a 4.0 slave which has a not-filtered relay log).
3021
ANY event coming from ourselves can be ignored: it is obvious for queries;
3022
for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
3023
(--log-slave-updates would not log that) unless this slave is also its
3024
direct master (an unsupported, useless setup!).
3027
pthread_mutex_lock(log_lock);
3029
if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
3030
!mi->rli.replicate_same_server_id)
3033
Do not write it to the relay log.
3034
a) We still want to increment mi->master_log_pos, so that we won't
3035
re-read this event from the master if the slave IO thread is now
3036
stopped/restarted (more efficient if the events we are ignoring are big
3038
b) We want to record that we are skipping events, for the information of
3039
the slave SQL thread, otherwise that thread may let
3040
rli->group_relay_log_pos stay too small if the last binlog's event is
3042
But events which were generated by this slave and which do not exist in
3043
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
3046
if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
3047
buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3048
buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3050
mi->master_log_pos+= inc_pos;
3051
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
3052
assert(rli->ign_master_log_name_end[0]);
3053
rli->ign_master_log_pos_end= mi->master_log_pos;
3055
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3059
/* write the event to the relay log */
3060
if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3062
mi->master_log_pos+= inc_pos;
3063
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3067
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
3069
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
3071
pthread_mutex_unlock(log_lock);
3076
pthread_mutex_unlock(&mi->data_lock);
3078
mi->report(ERROR_LEVEL, error, ER(error),
3079
(error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
3080
_("could not queue event from master") :
3086
void end_relay_log_info(Relay_log_info* rli)
3090
if (rli->info_fd >= 0)
3092
end_io_cache(&rli->info_file);
3093
(void) my_close(rli->info_fd, MYF(MY_WME));
3096
if (rli->cur_log_fd >= 0)
3098
end_io_cache(&rli->cache_buf);
3099
(void)my_close(rli->cur_log_fd, MYF(MY_WME));
3100
rli->cur_log_fd = -1;
3103
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
3104
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3106
Delete the slave's temporary tables from memory.
3107
In the future there will be other actions than this, to ensure persistance
3108
of slave's temp tables after shutdown.
3110
rli->close_temporary_tables();
3115
Try to connect until successful or slave killed
3119
thd Thread handler for slave
3120
DRIZZLE DRIZZLE connection handle
3121
mi Replication handle
3128
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi)
3130
return(connect_to_master(thd, drizzle, mi, 0, 0));
3139
Try to connect until successful or slave killed or we have retried
3140
master_retry_count times
3143
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3144
bool reconnect, bool suppress_warnings)
3146
int32_t slave_was_killed;
3147
int32_t last_errno= -2; // impossible error
3148
uint32_t err_count=0;
3151
mi->events_till_disconnect = disconnect_slave_event_count;
3152
uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
3153
if (opt_slave_compressed_protocol)
3154
client_flag=CLIENT_COMPRESS; /* We will use compression */
3156
drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3157
drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3159
while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3160
(reconnect ? drizzle_reconnect(drizzle) != 0 :
3161
drizzle_connect(drizzle, mi->host, mi->user, mi->password, 0,
3162
mi->port, 0, client_flag) == 0))
3164
/* Don't repeat last error */
3165
if ((int32_t)drizzle_errno(drizzle) != last_errno)
3167
last_errno=drizzle_errno(drizzle);
3168
suppress_warnings= 0;
3169
mi->report(ERROR_LEVEL, last_errno,
3170
_("error %s to master '%s@%s:%d'"
3171
" - retry-time: %d retries: %u"),
3172
(reconnect ? _("reconnecting") : _("connecting")),
3173
mi->user, mi->host, mi->port,
3174
mi->connect_retry, master_retry_count);
3177
By default we try forever. The reason is that failure will trigger
3178
master election, so if the user did not set master_retry_count we
3179
do not want to have election triggered on the first failure to
3182
if (++err_count == master_retry_count)
3186
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
3189
safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3193
if (!slave_was_killed)
3197
if (!suppress_warnings && global_system_variables.log_warnings)
3198
sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3199
"replication resumed in log '%s' at "
3200
"position %s"), mi->user,
3203
llstr(mi->master_log_pos,llbuff));
3207
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
3208
general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d",
3209
mi->user, mi->host, mi->port);
3212
drizzle->reconnect= 1;
3213
return(slave_was_killed);
3221
Try to connect until successful or slave killed or we have retried
3222
master_retry_count times
3225
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3226
bool suppress_warnings)
3228
return(connect_to_master(thd, drizzle, mi, 1, suppress_warnings));
3233
Store the file and position where the execute-slave thread are in the
3237
flush_relay_log_info()
3238
rli Relay log information
3241
- As this is only called by the slave thread, we don't need to
3242
have a lock on this.
3243
- If there is an active transaction, then we don't update the position
3244
in the relay log. This is to ensure that we re-execute statements
3245
if we die in the middle of an transaction that was rolled back.
3246
- As a transaction never spans binary logs, we don't have to handle the
3247
case where we do a relay-log-rotation in the middle of the transaction.
3248
If this would not be the case, we would have to ensure that we
3249
don't delete the relay log file where the transaction started when
3250
we switch to a new relay log file.
3253
- Change the log file information to a binary format to avoid calling
3261
bool flush_relay_log_info(Relay_log_info* rli)
3265
if (unlikely(rli->no_storage))
3268
IO_CACHE *file = &rli->info_file;
3269
char buff[FN_REFLEN*2+22*2+4], *pos;
3271
my_b_seek(file, 0L);
3272
pos=stpcpy(buff, rli->group_relay_log_name);
3274
pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
3276
pos=stpcpy(pos, rli->group_master_log_name);
3278
pos=int64_t2str(rli->group_master_log_pos, pos, 10);
3280
if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
3282
if (flush_io_cache(file))
3285
/* Flushing the relay log is done by the slave I/O thread */
3291
Called when we notice that the current "hot" log got rotated under our feet.
3294
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
3296
assert(rli->cur_log != &rli->cache_buf);
3297
assert(rli->cur_log_fd == -1);
3299
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3300
if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
3304
We want to start exactly where we was before:
3305
relay_log_pos Current log pos
3306
pending Number of bytes already processed from the event
3308
rli->event_relay_log_pos= max(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3309
my_b_seek(cur_log,rli->event_relay_log_pos);
3314
static Log_event* next_event(Relay_log_info* rli)
3317
IO_CACHE* cur_log = rli->cur_log;
3318
pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3319
const char* errmsg=0;
3320
THD* thd = rli->sql_thd;
3324
if (abort_slave_event_count && !rli->events_till_abort--)
3328
For most operations we need to protect rli members with data_lock,
3329
so we assume calling function acquired this mutex for us and we will
3330
hold it for the most of the loop below However, we will release it
3331
whenever it is worth the hassle, and in the cases when we go into a
3332
pthread_cond_wait() with the non-data_lock mutex
3334
safe_mutex_assert_owner(&rli->data_lock);
3336
while (!sql_slave_killed(thd,rli))
3339
We can have two kinds of log reading:
3341
rli->cur_log points at the IO_CACHE of relay_log, which
3342
is actively being updated by the I/O thread. We need to be careful
3343
in this case and make sure that we are not looking at a stale log that
3344
has already been rotated. If it has been, we reopen the log.
3346
The other case is much simpler:
3347
We just have a read only log that nobody else will be updating.
3350
if ((hot_log = (cur_log != &rli->cache_buf)))
3352
assert(rli->cur_log_fd == -1); // foreign descriptor
3353
pthread_mutex_lock(log_lock);
3356
Reading xxx_file_id is safe because the log will only
3357
be rotated when we hold relay_log.LOCK_log
3359
if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
3361
// The master has switched to a new log file; Reopen the old log file
3362
cur_log=reopen_relay_log(rli, &errmsg);
3363
pthread_mutex_unlock(log_lock);
3364
if (!cur_log) // No more log files
3366
hot_log=0; // Using old binary log
3370
As there is no guarantee that the relay is open (for example, an I/O
3371
error during a write by the slave I/O thread may have closed it), we
3374
if (!my_b_inited(cur_log))
3376
assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3377
assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
3380
Relay log is always in new format - if the master is 3.23, the
3381
I/O thread will convert the format for us.
3382
A problem: the description event may be in a previous relay log. So if
3383
the slave has been shutdown meanwhile, we would have to look in old relay
3384
logs, which may even have been deleted. So we need to write this
3385
description event at the beginning of the relay log.
3386
When the relay log is created when the I/O thread starts, easy: the
3387
master will send the description event and we will queue it.
3388
But if the relay log is created by new_file(): then the solution is:
3389
DRIZZLE_BIN_LOG::open() will write the buffered description event.
3391
if ((ev=Log_event::read_log_event(cur_log,0,
3392
rli->relay_log.description_event_for_exec)))
3395
assert(thd==rli->sql_thd);
3397
read it while we have a lock, to avoid a mutex lock in
3398
inc_event_relay_log_pos()
3400
rli->future_event_relay_log_pos= my_b_tell(cur_log);
3402
pthread_mutex_unlock(log_lock);
3405
assert(thd==rli->sql_thd);
3406
if (opt_reckless_slave) // For mysql-test
3408
if (cur_log->error < 0)
3410
errmsg = "slave SQL thread aborted because of I/O error";
3412
pthread_mutex_unlock(log_lock);
3415
if (!cur_log->error) /* EOF */
3418
On a hot log, EOF means that there are no more updates to
3419
process and we must block until I/O thread adds some and
3420
signals us to continue
3425
We say in Seconds_Behind_Master that we have "caught up". Note that
3426
for example if network link is broken but I/O slave thread hasn't
3427
noticed it (slave_net_timeout not elapsed), then we'll say "caught
3428
up" whereas we're not really caught up. Fixing that would require
3429
internally cutting timeout in smaller pieces in network read, no
3430
thanks. Another example: SQL has caught up on I/O, now I/O has read
3431
a new event and is queuing it; the false "0" will exist until SQL
3432
finishes executing the new event; it will be look abnormal only if
3433
the events have old timestamps (then you get "many", 0, "many").
3435
Transient phases like this can be fixed with implemeting
3436
Heartbeat event which provides the slave the status of the
3437
master at time the master does not have any new update to send.
3438
Seconds_Behind_Master would be zero only when master has no
3439
more updates in binlog for slave. The heartbeat can be sent
3440
in a (small) fraction of slave_net_timeout. Until it's done
3441
rli->last_master_timestamp is temporarely (for time of
3442
waiting for the following event) reset whenever EOF is
3445
time_t save_timestamp= rli->last_master_timestamp;
3446
rli->last_master_timestamp= 0;
3448
assert(rli->relay_log.get_open_count() ==
3449
rli->cur_log_old_open_count);
3451
if (rli->ign_master_log_name_end[0])
3453
/* We generate and return a Rotate, to make our positions advance */
3454
ev= new Rotate_log_event(rli->ign_master_log_name_end,
3455
0, rli->ign_master_log_pos_end,
3456
Rotate_log_event::DUP_NAME);
3457
rli->ign_master_log_name_end[0]= 0;
3458
pthread_mutex_unlock(log_lock);
3461
errmsg= "Slave SQL thread failed to create a Rotate event "
3462
"(out of memory?), SHOW SLAVE STATUS may be inaccurate";
3465
ev->server_id= 0; // don't be ignored by slave SQL thread
3470
We can, and should release data_lock while we are waiting for
3471
update. If we do not, show slave status will block
3473
pthread_mutex_unlock(&rli->data_lock);
3477
- the I/O thread has reached log_space_limit
3478
- the SQL thread has read all relay logs, but cannot purge for some
3480
* it has already purged all logs except the current one
3481
* there are other logs than the current one but they're involved in
3482
a transaction that finishes in the current one (or is not finished)
3484
Wake up the possibly waiting I/O thread, and set a boolean asking
3485
the I/O thread to temporarily ignore the log_space_limit
3486
constraint, because we do not want the I/O thread to block because of
3487
space (it's ok if it blocks for any other reason (e.g. because the
3488
master does not send anything). Then the I/O thread stops waiting
3489
and reads more events.
3490
The SQL thread decides when the I/O thread should take log_space_limit
3491
into account again : ignore_log_space_limit is reset to 0
3492
in purge_first_log (when the SQL thread purges the just-read relay
3493
log), and also when the SQL thread starts. We should also reset
3494
ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
3495
fact, no need as RESET SLAVE requires that the slave
3496
be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
3499
pthread_mutex_lock(&rli->log_space_lock);
3500
// prevent the I/O thread from blocking next times
3501
rli->ignore_log_space_limit= 1;
3503
If the I/O thread is blocked, unblock it. Ok to broadcast
3504
after unlock, because the mutex is only destroyed in
3505
~Relay_log_info(), i.e. when rli is destroyed, and rli will
3506
not be destroyed before we exit the present function.
3508
pthread_mutex_unlock(&rli->log_space_lock);
3509
pthread_cond_broadcast(&rli->log_space_cond);
3510
// Note that wait_for_update_relay_log unlocks lock_log !
3511
rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
3512
// re-acquire data lock since we released it earlier
3513
pthread_mutex_lock(&rli->data_lock);
3514
rli->last_master_timestamp= save_timestamp;
3518
If the log was not hot, we need to move to the next log in
3519
sequence. The next log could be hot or cold, we deal with both
3520
cases separately after doing some common initialization
3522
end_io_cache(cur_log);
3523
assert(rli->cur_log_fd >= 0);
3524
my_close(rli->cur_log_fd, MYF(MY_WME));
3525
rli->cur_log_fd = -1;
3527
if (relay_log_purge)
3530
purge_first_log will properly set up relay log coordinates in rli.
3531
If the group's coordinates are equal to the event's coordinates
3532
(i.e. the relay log was not rotated in the middle of a group),
3533
we can purge this relay log too.
3534
We do uint64_t and string comparisons, this may be slow but
3535
- purging the last relay log is nice (it can save 1GB of disk), so we
3536
like to detect the case where we can do it, and given this,
3537
- I see no better detection method
3538
- purge_first_log is not called that often
3540
if (rli->relay_log.purge_first_log
3542
rli->group_relay_log_pos == rli->event_relay_log_pos
3543
&& !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
3545
errmsg = "Error purging processed logs";
3552
If hot_log is set, then we already have a lock on
3553
LOCK_log. If not, we have to get the lock.
3555
According to Sasha, the only time this code will ever be executed
3556
is if we are recovering from a bug.
3558
if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
3560
errmsg = "error switching to the next log";
3563
rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3564
strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
3565
sizeof(rli->event_relay_log_name)-1);
3566
flush_relay_log_info(rli);
3570
Now we want to open this next log. To know if it's a hot log (the one
3571
being written by the I/O thread now) or a cold log, we can use
3572
is_active(); if it is hot, we use the I/O cache; if it's cold we open
3573
the file normally. But if is_active() reports that the log is hot, this
3574
may change between the test and the consequence of the test. So we may
3575
open the I/O cache whereas the log is now cold, which is nonsense.
3576
To guard against this, we need to have LOCK_log.
3579
if (!hot_log) /* if hot_log, we already have this mutex */
3580
pthread_mutex_lock(log_lock);
3581
if (rli->relay_log.is_active(rli->linfo.log_file_name))
3584
if (global_system_variables.log_warnings)
3585
sql_print_information(_("next log '%s' is currently active"),
3586
rli->linfo.log_file_name);
3588
rli->cur_log= cur_log= rli->relay_log.get_log_file();
3589
rli->cur_log_old_open_count= rli->relay_log.get_open_count();
3590
assert(rli->cur_log_fd == -1);
3593
Read pointer has to be at the start since we are the only
3595
We must keep the LOCK_log to read the 4 first bytes, as this is a hot
3596
log (same as when we call read_log_event() above: for a hot log we
3599
if (check_binlog_magic(cur_log,&errmsg))
3601
if (!hot_log) pthread_mutex_unlock(log_lock);
3604
if (!hot_log) pthread_mutex_unlock(log_lock);
3607
if (!hot_log) pthread_mutex_unlock(log_lock);
3609
if we get here, the log was not hot, so we will have to open it
3610
ourselves. We are sure that the log is still not hot now (a log can get
3611
from hot to cold, but not from cold to hot). No need for LOCK_log.
3614
if (global_system_variables.log_warnings)
3615
sql_print_information(_("next log '%s' is not active"),
3616
rli->linfo.log_file_name);
3618
// open_binlog() will check the magic header
3619
if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
3626
Read failed with a non-EOF error.
3627
TODO: come up with something better to handle this error
3630
pthread_mutex_unlock(log_lock);
3631
sql_print_error(_("Slave SQL thread: I/O error reading "
3632
"event(errno: %d cur_log->error: %d)"),
3633
my_errno,cur_log->error);
3634
// set read position to the beginning of the event
3635
my_b_seek(cur_log,rli->event_relay_log_pos);
3636
/* otherwise, we have had a partial read */
3637
errmsg = _("Aborting slave SQL thread because of partial event read");
3638
break; // To end of function
3641
if (!errmsg && global_system_variables.log_warnings)
3643
sql_print_information(_("Error reading relay log event: %s"),
3644
_("slave SQL thread was killed"));
3650
sql_print_error(_("Error reading relay log event: %s"), errmsg);
3655
Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
3656
because of size is simpler because when we do it we already have all relevant
3657
locks; here we don't, so this function is mainly taking locks).
3658
Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
3662
void rotate_relay_log(Master_info* mi)
3664
Relay_log_info* rli= &mi->rli;
3666
/* We don't lock rli->run_lock. This would lead to deadlocks. */
3667
pthread_mutex_lock(&mi->run_lock);
3670
We need to test inited because otherwise, new_file() will attempt to lock
3671
LOCK_log, which may not be inited (if we're not a slave).
3678
/* If the relay log is closed, new_file() will do nothing. */
3679
rli->relay_log.new_file();
3682
We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
3683
be counted, so imagine a succession of FLUSH LOGS and assume the slave
3684
threads are started:
3685
relay_log_space decreases by the size of the deleted relay log, but does
3686
not increase, so flush-after-flush we may become negative, which is wrong.
3687
Even if this will be corrected as soon as a query is replicated on the
3688
slave (because the I/O thread will then call harvest_bytes_written() which
3689
will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
3690
output in SHOW SLAVE STATUS meanwhile. So we harvest now.
3691
If the log is closed, then this will just harvest the last writes, probably
3692
0 as they probably have been harvested.
3694
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3696
pthread_mutex_unlock(&mi->run_lock);
3702
Detects, based on master's version (as found in the relay log), if master
3704
@param rli Relay_log_info which tells the master's version
3705
@param bug_id Number of the bug as found in bugs.mysql.com
3706
@param report bool report error message, default TRUE
3707
@return true if master has the bug, FALSE if it does not.
3709
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
3711
struct st_version_range_for_one_bug {
3713
const uchar introduced_in[3]; // first version with bug
3714
const uchar fixed_in[3]; // first version with fix
3716
static struct st_version_range_for_one_bug versions_for_all_bugs[]=
3718
{24432, { 5, 0, 24 }, { 5, 0, 38 } },
3719
{24432, { 5, 1, 12 }, { 5, 1, 17 } },
3720
{33029, { 5, 0, 0 }, { 5, 0, 58 } },
3721
{33029, { 5, 1, 0 }, { 5, 1, 12 } },
3723
const uchar *master_ver=
3724
rli->relay_log.description_event_for_exec->server_version_split;
3726
assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3729
i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3731
const uchar *introduced_in= versions_for_all_bugs[i].introduced_in,
3732
*fixed_in= versions_for_all_bugs[i].fixed_in;
3733
if ((versions_for_all_bugs[i].bug_id == bug_id) &&
3734
(memcmp(introduced_in, master_ver, 3) <= 0) &&
3735
(memcmp(fixed_in, master_ver, 3) > 0))
3740
// a short message for SHOW SLAVE STATUS (message length constraints)
3741
my_printf_error(ER_UNKNOWN_ERROR,
3742
_("master may suffer from"
3743
" http://bugs.mysql.com/bug.php?id=%u"
3744
" so slave stops; check error log on slave"
3745
" for more info"), MYF(0), bug_id);
3746
// a verbose message for the error log
3747
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
3748
_("According to the master's version ('%s'),"
3749
" it is probable that master suffers from this bug:"
3750
" http://bugs.mysql.com/bug.php?id=%u"
3751
" and thus replicating the current binary log event"
3752
" may make the slave's data become different from the"
3754
" To take no risk, slave refuses to replicate"
3755
" this event and stops."
3756
" We recommend that all updates be stopped on the"
3757
" master and slave, that the data of both be"
3758
" manually synchronized,"
3759
" that master's binary logs be deleted,"
3760
" that master be upgraded to a version at least"
3761
" equal to '%d.%d.%d'. Then replication can be"
3763
rli->relay_log.description_event_for_exec->server_version,
3765
fixed_in[0], fixed_in[1], fixed_in[2]);
3773
BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
3774
exclusive, if one statement in a SP generated AUTO_INCREMENT value
3775
by the top statement, all statements after it would be considered
3776
generated AUTO_INCREMENT value by the top statement, and a
3777
erroneous INSERT_ID value might be associated with these statement,
3778
which could cause duplicate entry error and stop the slave.
3780
Detect buggy master to work around.
3782
bool rpl_master_erroneous_autoinc(THD *thd)
3784
if (active_mi && active_mi->rli.sql_thd == thd)
3786
Relay_log_info *rli= &active_mi->rli;
3787
return rpl_master_has_bug(rli, 33029, false);
3792
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
3793
template class I_List_iterator<i_string>;
3794
template class I_List_iterator<i_string_pair>;
3798
@} (end of group Replication)
3801
#endif /* HAVE_REPLICATION */