1
/* Copyright (C) 2000-2003 DRIZZLE AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
@addtogroup Replication
23
@brief Code to run the io thread and the sql thread on the
26
#include <drizzled/server_includes.h>
28
#include <storage/myisam/myisam.h>
32
#include "rpl_filter.h"
33
#include "repl_failsafe.h"
34
#include <mysys/thr_alarm.h>
35
#include <libdrizzle/sql_common.h>
36
#include <libdrizzle/errmsg.h>
37
#include <mysys/mysys_err.h>
38
#include <drizzled/drizzled_error_messages.h>
40
#ifdef HAVE_REPLICATION
42
#include "rpl_tblmap.h"
44
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
46
#define MAX_SLAVE_RETRY_PAUSE 5
47
bool use_slave_mask = 0;
48
MY_BITMAP slave_error_mask;
50
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
52
char* slave_load_tmpdir = 0;
53
Master_info *active_mi= 0;
54
bool replicate_same_server_id;
55
uint64_t relay_log_space_limit = 0;
58
When slave thread exits, we need to remember the temporary tables so we
59
can re-use them on slave start.
61
TODO: move the vars below under Master_info
64
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
65
int32_t events_till_abort = -1;
67
enum enum_slave_reconnect_actions
69
SLAVE_RECON_ACT_REG= 0,
70
SLAVE_RECON_ACT_DUMP= 1,
71
SLAVE_RECON_ACT_EVENT= 2,
75
enum enum_slave_reconnect_messages
77
SLAVE_RECON_MSG_WAIT= 0,
78
SLAVE_RECON_MSG_KILLED_WAITING= 1,
79
SLAVE_RECON_MSG_AFTER= 2,
80
SLAVE_RECON_MSG_FAILED= 3,
81
SLAVE_RECON_MSG_COMMAND= 4,
82
SLAVE_RECON_MSG_KILLED_AFTER= 5,
86
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
89
N_("Waiting to reconnect after a failed registration on master"),
90
N_("Slave I/O thread killed while waitnig to reconnect after a "
91
"failed registration on master"),
92
N_("Reconnecting after a failed registration on master"),
93
N_("failed registering on master, reconnecting to try again, "
94
"log '%s' at postion %s"),
96
N_("Slave I/O thread killed during or after reconnect")
99
N_("Waiting to reconnect after a failed binlog dump request"),
100
N_("Slave I/O thread killed while retrying master dump"),
101
N_("Reconnecting after a failed binlog dump request"),
102
N_("failed dump request, reconnecting to try again, "
103
"log '%s' at postion %s"),
105
N_("Slave I/O thread killed during or after reconnect")
108
N_("Waiting to reconnect after a failed master event read"),
109
N_("Slave I/O thread killed while waiting to reconnect "
110
"after a failed read"),
111
N_("Reconnecting after a failed master event read"),
112
N_("Slave I/O thread: Failed reading log event, "
113
"reconnecting to retry, log '%s' at postion %s"),
115
N_("Slave I/O thread killed during or after a "
116
"reconnect done to recover from failed read")
121
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
123
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
124
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
125
static bool wait_for_relay_log_space(Relay_log_info* rli);
126
static inline bool io_slave_killed(THD* thd,Master_info* mi);
127
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
128
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
129
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi);
130
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
131
bool suppress_warnings);
132
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
133
bool reconnect, bool suppress_warnings);
134
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
135
void* thread_killed_arg);
136
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
137
static Log_event* next_event(Relay_log_info* rli);
138
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
139
static int32_t terminate_slave_thread(THD *thd,
140
pthread_mutex_t* term_lock,
141
pthread_cond_t* term_cond,
142
volatile uint32_t *slave_running,
144
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
147
Find out which replications threads are running
151
mask Return value here
152
mi master_info for slave
153
inverse If set, returns which threads are not running
156
Get a bit mask for which threads are running so that we can later restart
160
mask If inverse == 0, running threads
161
If inverse == 1, stopped threads
164
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
166
bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
167
register int32_t tmp_mask=0;
170
tmp_mask |= SLAVE_IO;
172
tmp_mask |= SLAVE_SQL;
174
tmp_mask^= (SLAVE_IO | SLAVE_SQL);
184
void lock_slave_threads(Master_info* mi)
186
//TODO: see if we can do this without dual mutex
187
pthread_mutex_lock(&mi->run_lock);
188
pthread_mutex_lock(&mi->rli.run_lock);
194
unlock_slave_threads()
197
void unlock_slave_threads(Master_info* mi)
199
//TODO: see if we can do this without dual mutex
200
pthread_mutex_unlock(&mi->rli.run_lock);
201
pthread_mutex_unlock(&mi->run_lock);
206
/* Initialize slave structures */
211
This is called when mysqld starts. Before client connections are
212
accepted. However bootstrap may conflict with us if it does START SLAVE.
213
So it's safer to take the lock.
215
pthread_mutex_lock(&LOCK_active_mi);
217
TODO: re-write this to interate through the list of files
220
active_mi= new Master_info;
223
If master_host is not specified, try to read it from the master_info file.
224
If master_host is specified, create the master_info file if it doesn't
229
sql_print_error(_("Failed to allocate memory for the master info structure"));
233
if (init_master_info(active_mi,master_info_file,relay_log_info_file,
234
1, (SLAVE_IO | SLAVE_SQL)))
236
sql_print_error(_("Failed to initialize the master info structure"));
240
/* If server id is not set, start_slave_thread() will say it */
242
if (active_mi->host[0] && !opt_skip_slave_start)
244
if (start_slave_threads(1 /* need mutex */,
245
0 /* no wait for start*/,
249
SLAVE_IO | SLAVE_SQL))
251
sql_print_error(_("Failed to create slave threads"));
255
pthread_mutex_unlock(&LOCK_active_mi);
259
pthread_mutex_unlock(&LOCK_active_mi);
265
Init function to set up array for errors that should be skipped for slave
268
init_slave_skip_errors()
269
arg List of errors numbers to skip, separated with ','
272
Called from get_options() in mysqld.cc on start-up
275
void init_slave_skip_errors(const char* arg)
279
if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
281
fprintf(stderr, "Badly out of memory, please check your system status\n");
285
for (;my_isspace(system_charset_info,*arg);++arg)
287
if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
289
bitmap_set_all(&slave_error_mask);
295
if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
297
if (err_code < MAX_SLAVE_ERROR)
298
bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
299
while (!my_isdigit(system_charset_info,*p) && *p)
306
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
309
return(0); /* successfully do nothing */
310
int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
311
pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
313
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
316
if ((error=terminate_slave_thread(mi->io_thd,io_lock,
323
if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
325
mi->rli.abort_slave=1;
326
if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
328
&mi->rli.slave_running,
338
Wait for a slave thread to terminate.
340
This function is called after requesting the thread to terminate
341
(by setting @c abort_slave member of @c Relay_log_info or @c
342
Master_info structure to 1). Termination of the thread is
343
controlled with the the predicate <code>*slave_running</code>.
345
Function will acquire @c term_lock before waiting on the condition
346
unless @c skip_lock is true in which case the mutex should be owned
347
by the caller of this function and will remain acquired after
348
return from the function.
351
Associated lock to use when waiting for @c term_cond
354
Condition that is signalled when the thread has terminated
357
Pointer to predicate to check for slave thread termination
360
If @c true the lock will not be acquired before waiting on
361
the condition. In this case, it is assumed that the calling
362
function acquires the lock before calling this function.
367
terminate_slave_thread(THD *thd,
368
pthread_mutex_t* term_lock,
369
pthread_cond_t* term_cond,
370
volatile uint32_t *slave_running,
376
pthread_mutex_lock(term_lock);
378
safe_mutex_assert_owner(term_lock);
383
pthread_mutex_unlock(term_lock);
384
return(ER_SLAVE_NOT_RUNNING);
387
THD_CHECK_SENTRY(thd);
390
Is is critical to test if the slave is running. Otherwise, we might
391
be referening freed memory trying to kick it
394
while (*slave_running) // Should always be true
396
pthread_mutex_lock(&thd->LOCK_delete);
397
#ifndef DONT_USE_THR_ALARM
399
Error codes from pthread_kill are:
400
EINVAL: invalid signal number (can't happen)
401
ESRCH: thread already killed (can happen, should be ignored)
403
int32_t err= pthread_kill(thd->real_id, thr_client_alarm);
404
assert(err != EINVAL);
406
thd->awake(THD::NOT_KILLED);
407
pthread_mutex_unlock(&thd->LOCK_delete);
410
There is a small chance that slave thread might miss the first
411
alarm. To protect againts it, resend the signal until it reacts
413
struct timespec abstime;
414
set_timespec(abstime,2);
415
error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
416
assert(error == ETIMEDOUT || error == 0);
419
assert(*slave_running == 0);
422
pthread_mutex_unlock(term_lock);
427
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
428
pthread_mutex_t *cond_lock,
429
pthread_cond_t *start_cond,
430
volatile uint32_t *slave_running,
431
volatile uint32_t *slave_run_id,
441
pthread_mutex_lock(start_lock);
445
pthread_cond_broadcast(start_cond);
447
pthread_mutex_unlock(start_lock);
448
sql_print_error(_("Server id not set, will not start slave"));
449
return(ER_BAD_SLAVE);
455
pthread_cond_broadcast(start_cond);
457
pthread_mutex_unlock(start_lock);
458
return(ER_SLAVE_MUST_STOP);
460
start_id= *slave_run_id;
463
struct sched_param tmp_sched_param;
465
memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
466
tmp_sched_param.sched_priority= CONNECT_PRIOR;
467
(void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
469
if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
472
pthread_mutex_unlock(start_lock);
473
return(ER_SLAVE_THREAD);
475
if (start_cond && cond_lock) // caller has cond_lock
477
THD* thd = current_thd;
478
while (start_id == *slave_run_id)
480
const char* old_msg = thd->enter_cond(start_cond,cond_lock,
481
"Waiting for slave thread to start");
482
pthread_cond_wait(start_cond,cond_lock);
483
thd->exit_cond(old_msg);
484
pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
486
return(thd->killed_errno());
490
pthread_mutex_unlock(start_lock);
496
start_slave_threads()
499
SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
500
sense to do that for starting a slave--we always care if it actually
501
started the threads that were not previously running
504
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
506
const char* master_info_fname __attribute__((unused)),
507
const char* slave_info_fname __attribute__((unused)),
510
pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
511
pthread_cond_t* cond_io=0,*cond_sql=0;
514
if (need_slave_mutex)
516
lock_io = &mi->run_lock;
517
lock_sql = &mi->rli.run_lock;
521
cond_io = &mi->start_cond;
522
cond_sql = &mi->rli.start_cond;
523
lock_cond_io = &mi->run_lock;
524
lock_cond_sql = &mi->rli.run_lock;
527
if (thread_mask & SLAVE_IO)
528
error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
530
&mi->slave_running, &mi->slave_run_id,
531
mi, 1); //high priority, to read the most possible
532
if (!error && (thread_mask & SLAVE_SQL))
534
error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
536
&mi->rli.slave_running, &mi->rli.slave_run_id,
539
terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
546
static int32_t end_slave_on_walk(Master_info* mi, uchar* /*unused*/)
555
Free all resources used by slave
564
This is called when the server terminates, in close_connections().
565
It terminates slave threads. However, some CHANGE MASTER etc may still be
566
running presently. If a START SLAVE was in progress, the mutex lock below
567
will make us wait until slave threads have started, and START SLAVE
568
returns, then we terminate them here.
570
pthread_mutex_lock(&LOCK_active_mi);
574
TODO: replace the line below with
575
list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
576
once multi-master code is ready.
578
terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
579
end_master_info(active_mi);
583
pthread_mutex_unlock(&LOCK_active_mi);
588
static bool io_slave_killed(THD* thd, Master_info* mi)
590
assert(mi->io_thd == thd);
591
assert(mi->slave_running); // tracking buffer overrun
592
return(mi->abort_slave || abort_loop || thd->killed);
596
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
598
assert(rli->sql_thd == thd);
599
assert(rli->slave_running == 1);// tracking buffer overrun
600
if (abort_loop || thd->killed || rli->abort_slave)
603
If we are in an unsafe situation (stopping could corrupt replication),
604
we give one minute to the slave SQL thread of grace before really
605
terminating, in the hope that it will be able to read more events and
606
the unsafe situation will soon be left. Note that this one minute starts
607
from the last time anything happened in the slave SQL thread. So it's
608
really one minute of idleness, we don't timeout if the slave SQL thread
611
if (rli->last_event_start_time == 0)
613
if (difftime(time(0), rli->last_event_start_time) > 60)
615
rli->report(ERROR_LEVEL, 0,
616
_("SQL thread had to stop in an unsafe situation, in "
617
"the middle of applying updates to a "
618
"non-transactional table without any primary key. "
619
"There is a risk of duplicate updates when the slave "
620
"SQL thread is restarted. Please check your tables' "
621
"contents after restart."));
630
skip_load_data_infile()
633
This is used to tell a 3.23 master to break send_file()
636
void skip_load_data_infile(NET *net)
638
(void)net_request_file(net, "/dev/null");
639
(void)my_net_read(net); // discard response
640
(void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok
645
bool net_request_file(NET* net, const char* fname)
647
return(net_write_command(net, 251, (uchar*) fname, strlen(fname),
652
From other comments and tests in code, it looks like
653
sometimes Query_log_event and Load_log_event can have db == 0
654
(see rewrite_db() above for example)
655
(cases where this happens are unclear; it may be when the master is 3.23).
658
const char *print_slave_db_safe(const char* db)
660
return((db ? db : ""));
663
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
664
const char *default_val)
668
if ((length=my_b_gets(f,var, max_size)))
670
char* last_p = var + length -1;
672
*last_p = 0; // if we stopped on newline, kill it
676
If we truncated a line or stopped on last char, remove all chars
677
up to and including newline.
680
while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
684
else if (default_val)
686
strmake(var, default_val, max_size-1);
693
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
698
if (my_b_gets(f, buf, sizeof(buf)))
703
else if (default_val)
711
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
716
if (my_b_gets(f, buf, sizeof(buf)))
718
if (sscanf(buf, "%f", var) != 1)
723
else if (default_val != 0.0)
731
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
733
if (io_slave_killed(thd, mi))
735
if (info && global_system_variables.log_warnings)
736
sql_print_information(info);
744
Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
745
relying on the binlog's version. This is not perfect: imagine an upgrade
746
of the master without waiting that all slaves are in sync with the master;
747
then a slave could be fooled about the binlog's format. This is what happens
748
when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
749
slaves are fooled. So we do this only to distinguish between 3.23 and more
750
recent masters (it's too late to change things for 3.23).
757
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
760
String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
761
char err_buff[MAX_SLAVE_ERRMSG];
762
const char* errmsg= 0;
764
DRIZZLE_RES *master_res= 0;
765
DRIZZLE_ROW master_row;
769
Free old description_event_for_queue (that is needed if we are in
772
delete mi->rli.relay_log.description_event_for_queue;
773
mi->rli.relay_log.description_event_for_queue= 0;
775
if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
777
errmsg = _("Master reported unrecognized DRIZZLE version");
778
err_code= ER_SLAVE_FATAL_ERROR;
779
sprintf(err_buff, ER(err_code), errmsg);
780
err_msg.append(err_buff);
785
Note the following switch will bug when we have DRIZZLE branch 30 ;)
787
switch (*drizzle->server_version)
792
errmsg = _("Master reported unrecognized DRIZZLE version");
793
err_code= ER_SLAVE_FATAL_ERROR;
794
sprintf(err_buff, ER(err_code), errmsg);
795
err_msg.append(err_buff);
798
mi->rli.relay_log.description_event_for_queue= new
799
Format_description_log_event(1, drizzle->server_version);
802
mi->rli.relay_log.description_event_for_queue= new
803
Format_description_log_event(3, drizzle->server_version);
807
Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
808
take the early steps (like tests for "is this a 3.23 master") which we
809
have to take before we receive the real master's Format_desc which will
810
override this one. Note that the Format_desc we create below is garbage
811
(it has the format of the *slave*); it's only good to help know if the
812
master is 3.23, 4.0, etc.
814
mi->rli.relay_log.description_event_for_queue= new
815
Format_description_log_event(4, drizzle->server_version);
821
This does not mean that a 5.0 slave will be able to read a 6.0 master; but
822
as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
823
can't read a 6.0 master, this will show up when the slave can't read some
824
events sent by the master, and there will be error messages.
827
if (err_msg.length() != 0)
830
/* as we are here, we tried to allocate the event */
831
if (!mi->rli.relay_log.description_event_for_queue)
833
errmsg= _("default Format_description_log_event");
834
err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
835
sprintf(err_buff, ER(err_code), errmsg);
836
err_msg.append(err_buff);
841
Compare the master and slave's clock. Do not die if master's clock is
842
unavailable (very old master not supporting UNIX_TIMESTAMP()?).
845
if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
846
(master_res= drizzle_store_result(drizzle)) &&
847
(master_row= drizzle_fetch_row(master_res)))
849
mi->clock_diff_with_master=
850
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
852
else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
854
mi->clock_diff_with_master= 0; /* The "most sensible" value */
855
sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
856
"do not trust column Seconds_Behind_Master of SHOW "
857
"SLAVE STATUS. Error: %s (%d)"),
858
drizzle_error(drizzle), drizzle_errno(drizzle));
861
drizzle_free_result(master_res);
864
Check that the master's server id and ours are different. Because if they
865
are equal (which can result from a simple copy of master's datadir to slave,
866
thus copying some my.cnf), replication will work but all events will be
868
Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
870
Note: we could have put a @@SERVER_ID in the previous SELECT
871
UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
873
if (!drizzle_real_query(drizzle,
874
STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
875
(master_res= drizzle_store_result(drizzle)))
877
if ((master_row= drizzle_fetch_row(master_res)) &&
878
(::server_id == strtoul(master_row[1], 0, 10)) &&
879
!mi->rli.replicate_same_server_id)
882
_("The slave I/O thread stops because master and slave have equal "
883
"DRIZZLE server ids; these ids must be different "
884
"for replication to work (or "
885
"the --replicate-same-server-id option must be used "
886
"on slave but this does"
887
"not always make sense; please check the manual before using it).");
888
err_code= ER_SLAVE_FATAL_ERROR;
889
sprintf(err_buff, ER(err_code), errmsg);
890
err_msg.append(err_buff);
892
drizzle_free_result(master_res);
898
Check that the master's global character_set_server and ours are the same.
899
Not fatal if query fails (old master?).
900
Note that we don't check for equality of global character_set_client and
901
collation_connection (neither do we prevent their setting in
902
set_var.cc). That's because from what I (Guilhem) have tested, the global
903
values of these 2 are never used (new connections don't use them).
904
We don't test equality of global collation_database either as it's is
905
going to be deprecated (made read-only) in 4.1 very soon.
906
The test is only relevant if master < 5.0.3 (we'll test only if it's older
907
than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
908
charset info in each binlog event.
909
We don't do it for 3.23 because masters <3.23.50 hang on
910
SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
911
test only if master is 4.x.
914
/* redundant with rest of code but safer against later additions */
915
if (*drizzle->server_version == '3')
918
if ((*drizzle->server_version == '4') &&
919
!drizzle_real_query(drizzle,
920
STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
921
(master_res= drizzle_store_result(drizzle)))
923
if ((master_row= drizzle_fetch_row(master_res)) &&
924
strcmp(master_row[0], global_system_variables.collation_server->name))
927
_("The slave I/O thread stops because master and slave have"
928
" different values for the COLLATION_SERVER global variable."
929
" The values must be equal for replication to work");
930
err_code= ER_SLAVE_FATAL_ERROR;
931
sprintf(err_buff, ER(err_code), errmsg);
932
err_msg.append(err_buff);
934
drizzle_free_result(master_res);
940
Perform analogous check for time zone. Theoretically we also should
941
perform check here to verify that SYSTEM time zones are the same on
942
slave and master, but we can't rely on value of @@system_time_zone
943
variable (it is time zone abbreviation) since it determined at start
944
time and so could differ for slave and master even if they are really
945
in the same system time zone. So we are omiting this check and just
946
relying on documentation. Also according to Monty there are many users
947
who are using replication between servers in various time zones. Hence
948
such check will broke everything for them. (And now everything will
949
work for them because by default both their master and slave will have
951
This check is only necessary for 4.x masters (and < 5.0.4 masters but
954
if ((*drizzle->server_version == '4') &&
955
!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
956
(master_res= drizzle_store_result(drizzle)))
958
if ((master_row= drizzle_fetch_row(master_res)) &&
959
strcmp(master_row[0],
960
global_system_variables.time_zone->get_name()->ptr()))
963
_("The slave I/O thread stops because master and slave have"
964
" different values for the TIME_ZONE global variable."
965
" The values must be equal for replication to work");
966
err_code= ER_SLAVE_FATAL_ERROR;
967
sprintf(err_buff, ER(err_code), errmsg);
968
err_msg.append(err_buff);
970
drizzle_free_result(master_res);
976
if (mi->heartbeat_period != 0.0)
979
const char query_format[]= "SET @master_heartbeat_period= %s";
980
char query[sizeof(query_format) - 2 + sizeof(llbuf)];
982
the period is an uint64_t of nano-secs.
984
llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
985
sprintf(query, query_format, llbuf);
987
if (drizzle_real_query(drizzle, query, strlen(query))
988
&& !check_io_slave_killed(mi->io_thd, mi, NULL))
990
err_msg.append("The slave I/O thread stops because querying master with '");
991
err_msg.append(query);
992
err_msg.append("' failed;");
993
err_msg.append(" error: ");
994
err_code= drizzle_errno(drizzle);
995
err_msg.qs_append(err_code);
996
err_msg.append(" '");
997
err_msg.append(drizzle_error(drizzle));
999
drizzle_free_result(drizzle_store_result(drizzle));
1002
drizzle_free_result(drizzle_store_result(drizzle));
1006
if (err_msg.length() != 0)
1008
sql_print_error(err_msg.ptr());
1009
assert(err_code != 0);
1010
mi->report(ERROR_LEVEL, err_code, err_msg.ptr());
1018
static bool wait_for_relay_log_space(Relay_log_info* rli)
1020
bool slave_killed=0;
1021
Master_info* mi = rli->mi;
1022
const char *save_proc_info;
1023
THD* thd = mi->io_thd;
1025
pthread_mutex_lock(&rli->log_space_lock);
1026
save_proc_info= thd->enter_cond(&rli->log_space_cond,
1027
&rli->log_space_lock,
1028
_("Waiting for the slave SQL thread "
1029
"to free enough relay log space"));
1030
while (rli->log_space_limit < rli->log_space_total &&
1031
!(slave_killed=io_slave_killed(thd,mi)) &&
1032
!rli->ignore_log_space_limit)
1033
pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1034
thd->exit_cond(save_proc_info);
1035
return(slave_killed);
1040
Builds a Rotate from the ignored events' info and writes it to relay log.
1043
write_ignored_events_info_to_relay_log()
1044
thd pointer to I/O thread's thd
1048
Slave I/O thread, going to die, must leave a durable trace of the
1049
ignored events' end position for the use of the slave SQL thread, by
1050
calling this function. Only that thread can call it (see assertion).
1052
static void write_ignored_events_info_to_relay_log(THD *thd __attribute__((unused)),
1055
Relay_log_info *rli= &mi->rli;
1056
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1058
assert(thd == mi->io_thd);
1059
pthread_mutex_lock(log_lock);
1060
if (rli->ign_master_log_name_end[0])
1062
Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1063
0, rli->ign_master_log_pos_end,
1064
Rotate_log_event::DUP_NAME);
1065
rli->ign_master_log_name_end[0]= 0;
1066
/* can unlock before writing as slave SQL thd will soon see our Rotate */
1067
pthread_mutex_unlock(log_lock);
1068
if (likely((bool)ev))
1070
ev->server_id= 0; // don't be ignored by slave SQL thread
1071
if (unlikely(rli->relay_log.append(ev)))
1072
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
1073
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
1074
_("failed to write a Rotate event"
1075
" to the relay log, SHOW SLAVE STATUS may be"
1077
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1078
if (flush_master_info(mi, 1))
1079
sql_print_error(_("Failed to flush master info file"));
1083
mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
1084
ER(ER_SLAVE_CREATE_EVENT_FAILURE),
1085
_("Rotate_event (out of memory?),"
1086
" SHOW SLAVE STATUS may be inaccurate"));
1089
pthread_mutex_unlock(log_lock);
1094
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
1095
bool *suppress_warnings)
1097
uchar buf[1024], *pos= buf;
1098
uint32_t report_host_len, report_user_len=0, report_password_len=0;
1100
*suppress_warnings= false;
1103
report_host_len= strlen(report_host);
1105
report_user_len= strlen(report_user);
1106
if (report_password)
1107
report_password_len= strlen(report_password);
1108
/* 30 is a good safety margin */
1109
if (report_host_len + report_user_len + report_password_len + 30 >
1111
return(0); // safety
1113
int4store(pos, server_id); pos+= 4;
1114
pos= net_store_data(pos, (uchar*) report_host, report_host_len);
1115
pos= net_store_data(pos, (uchar*) report_user, report_user_len);
1116
pos= net_store_data(pos, (uchar*) report_password, report_password_len);
1117
int2store(pos, (uint16_t) report_port); pos+= 2;
1118
int4store(pos, rpl_recovery_rank); pos+= 4;
1119
/* The master will fill in master_id */
1120
int4store(pos, 0); pos+= 4;
1122
if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
1124
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1126
*suppress_warnings= true; // Suppress reconnect warning
1128
else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
1131
snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle),
1132
drizzle_errno(drizzle));
1133
mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1134
ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1142
bool show_master_info(THD* thd, Master_info* mi)
1144
// TODO: fix this for multi-master
1145
List<Item> field_list;
1146
Protocol *protocol= thd->protocol;
1148
field_list.push_back(new Item_empty_string("Slave_IO_State",
1150
field_list.push_back(new Item_empty_string("Master_Host",
1152
field_list.push_back(new Item_empty_string("Master_User",
1154
field_list.push_back(new Item_return_int("Master_Port", 7,
1155
DRIZZLE_TYPE_LONG));
1156
field_list.push_back(new Item_return_int("Connect_Retry", 10,
1157
DRIZZLE_TYPE_LONG));
1158
field_list.push_back(new Item_empty_string("Master_Log_File",
1160
field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
1161
DRIZZLE_TYPE_LONGLONG));
1162
field_list.push_back(new Item_empty_string("Relay_Log_File",
1164
field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
1165
DRIZZLE_TYPE_LONGLONG));
1166
field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
1168
field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
1169
field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1170
field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
1171
field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
1172
field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
1173
field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
1174
field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
1175
field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
1177
field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
1178
field_list.push_back(new Item_empty_string("Last_Error", 20));
1179
field_list.push_back(new Item_return_int("Skip_Counter", 10,
1180
DRIZZLE_TYPE_LONG));
1181
field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
1182
DRIZZLE_TYPE_LONGLONG));
1183
field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
1184
DRIZZLE_TYPE_LONGLONG));
1185
field_list.push_back(new Item_empty_string("Until_Condition", 6));
1186
field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1187
field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1188
DRIZZLE_TYPE_LONGLONG));
1189
field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
1190
field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
1191
sizeof(mi->ssl_ca)));
1192
field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
1193
sizeof(mi->ssl_capath)));
1194
field_list.push_back(new Item_empty_string("Master_SSL_Cert",
1195
sizeof(mi->ssl_cert)));
1196
field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
1197
sizeof(mi->ssl_cipher)));
1198
field_list.push_back(new Item_empty_string("Master_SSL_Key",
1199
sizeof(mi->ssl_key)));
1200
field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1201
DRIZZLE_TYPE_LONGLONG));
1202
field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
1204
field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
1205
field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1206
field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
1207
field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
1209
if (protocol->send_fields(&field_list,
1210
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1215
String *packet= &thd->packet;
1216
protocol->prepare_for_resend();
1219
slave_running can be accessed without run_lock but not other
1220
non-volotile members like mi->io_thd, which is guarded by the mutex.
1222
pthread_mutex_lock(&mi->run_lock);
1223
protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
1224
pthread_mutex_unlock(&mi->run_lock);
1226
pthread_mutex_lock(&mi->data_lock);
1227
pthread_mutex_lock(&mi->rli.data_lock);
1228
protocol->store(mi->host, &my_charset_bin);
1229
protocol->store(mi->user, &my_charset_bin);
1230
protocol->store((uint32_t) mi->port);
1231
protocol->store((uint32_t) mi->connect_retry);
1232
protocol->store(mi->master_log_name, &my_charset_bin);
1233
protocol->store((uint64_t) mi->master_log_pos);
1234
protocol->store(mi->rli.group_relay_log_name +
1235
dirname_length(mi->rli.group_relay_log_name),
1237
protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1238
protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
1239
protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1240
"Yes" : "No", &my_charset_bin);
1241
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1242
protocol->store(rpl_filter->get_do_db());
1243
protocol->store(rpl_filter->get_ignore_db());
1246
String tmp(buf, sizeof(buf), &my_charset_bin);
1247
rpl_filter->get_do_table(&tmp);
1248
protocol->store(&tmp);
1249
rpl_filter->get_ignore_table(&tmp);
1250
protocol->store(&tmp);
1251
rpl_filter->get_wild_do_table(&tmp);
1252
protocol->store(&tmp);
1253
rpl_filter->get_wild_ignore_table(&tmp);
1254
protocol->store(&tmp);
1256
protocol->store(mi->rli.last_error().number);
1257
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1258
protocol->store((uint32_t) mi->rli.slave_skip_counter);
1259
protocol->store((uint64_t) mi->rli.group_master_log_pos);
1260
protocol->store((uint64_t) mi->rli.log_space_total);
1263
mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
1264
( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
1265
"Relay"), &my_charset_bin);
1266
protocol->store(mi->rli.until_log_name, &my_charset_bin);
1267
protocol->store((uint64_t) mi->rli.until_log_pos);
1269
protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
1270
protocol->store(mi->ssl_ca, &my_charset_bin);
1271
protocol->store(mi->ssl_capath, &my_charset_bin);
1272
protocol->store(mi->ssl_cert, &my_charset_bin);
1273
protocol->store(mi->ssl_cipher, &my_charset_bin);
1274
protocol->store(mi->ssl_key, &my_charset_bin);
1277
Seconds_Behind_Master: if SQL thread is running and I/O thread is
1278
connected, we can compute it otherwise show NULL (i.e. unknown).
1280
if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
1281
mi->rli.slave_running)
1283
long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
1284
- mi->clock_diff_with_master);
1286
Apparently on some systems time_diff can be <0. Here are possible
1287
reasons related to MySQL:
1288
- the master is itself a slave of another master whose time is ahead.
1289
- somebody used an explicit SET TIMESTAMP on the master.
1290
Possible reason related to granularity-to-second of time functions
1291
(nothing to do with MySQL), which can explain a value of -1:
1292
assume the master's and slave's time are perfectly synchronized, and
1293
that at slave's connection time, when the master's timestamp is read,
1294
it is at the very end of second 1, and (a very short time later) when
1295
the slave's timestamp is read it is at the very beginning of second
1296
2. Then the recorded value for master is 1 and the recorded value for
1297
slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1298
between timestamp of slave and rli->last_master_timestamp is 0
1299
(i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1300
This confuses users, so we don't go below 0: hence the max().
1302
last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1303
special marker to say "consider we have caught up".
1305
protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1306
max((long)0, time_diff) : 0));
1310
protocol->store_null();
1312
protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
1315
protocol->store(mi->last_error().number);
1317
protocol->store(mi->last_error().message, &my_charset_bin);
1319
protocol->store(mi->rli.last_error().number);
1321
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1323
pthread_mutex_unlock(&mi->rli.data_lock);
1324
pthread_mutex_unlock(&mi->data_lock);
1326
if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
1334
void set_slave_thread_options(THD* thd)
1337
It's nonsense to constrain the slave threads with max_join_size; if a
1338
query succeeded on master, we HAVE to execute it. So set
1339
OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
1340
(and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
1341
SELECT examining more than 4 billion rows would still fail (yes, because
1342
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1343
only for client threads.
1345
uint64_t options= thd->options | OPTION_BIG_SELECTS;
1346
if (opt_log_slave_updates)
1347
options|= OPTION_BIN_LOG;
1349
options&= ~OPTION_BIN_LOG;
1350
thd->options= options;
1351
thd->variables.completion_type= 0;
1355
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
1357
thd->variables.character_set_client=
1358
global_system_variables.character_set_client;
1359
thd->variables.collation_connection=
1360
global_system_variables.collation_connection;
1361
thd->variables.collation_server=
1362
global_system_variables.collation_server;
1363
thd->update_charset();
1366
We use a const cast here since the conceptual (and externally
1367
visible) behavior of the function is to set the default charset of
1368
the thread. That the cache has to be invalidated is a secondary
1371
const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
1379
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
1381
int32_t simulate_error= 0;
1382
thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
1383
SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1384
thd->security_ctx->skip_grants();
1385
my_net_init(&thd->net, 0);
1387
Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1388
slave threads, since a replication event can become this much larger
1389
than the corresponding packet (query) sent from client to master.
1391
thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1392
+ MAX_LOG_EVENT_HEADER; /* note, incr over the global not session var */
1393
thd->slave_thread = 1;
1394
thd->enable_slow_log= opt_log_slow_slave_statements;
1395
set_slave_thread_options(thd);
1396
thd->client_capabilities = CLIENT_LOCAL_FILES;
1397
pthread_mutex_lock(&LOCK_thread_count);
1398
thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
1399
pthread_mutex_unlock(&LOCK_thread_count);
1401
simulate_error|= (1 << SLAVE_THD_IO);
1402
simulate_error|= (1 << SLAVE_THD_SQL);
1403
if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
1410
if (thd_type == SLAVE_THD_SQL)
1411
thd_proc_info(thd, "Waiting for the next event in relay log");
1413
thd_proc_info(thd, "Waiting for master update");
1414
thd->version=refresh_version;
1420
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1421
void* thread_killed_arg)
1424
thr_alarm_t alarmed;
1426
thr_alarm_init(&alarmed);
1427
time_t start_time= my_time(0);
1428
time_t end_time= start_time+sec;
1430
while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1434
The only reason we are asking for alarm is so that
1435
we will be woken up in case of murder, so if we do not get killed,
1436
set the alarm so it goes off after we wake up naturally
1438
thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
1440
thr_end_alarm(&alarmed);
1442
if ((*thread_killed)(thd,thread_killed_arg))
1444
start_time= my_time(0);
1450
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
1451
bool *suppress_warnings)
1453
uchar buf[FN_REFLEN + 10];
1455
int32_t binlog_flags = 0; // for now
1456
char* logname = mi->master_log_name;
1458
*suppress_warnings= false;
1460
// TODO if big log files: Change next to int8store()
1461
int4store(buf, (uint32_t) mi->master_log_pos);
1462
int2store(buf + 4, binlog_flags);
1463
int4store(buf + 6, server_id);
1464
len = (uint32_t) strlen(logname);
1465
memcpy(buf + 10, logname,len);
1466
if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
1469
Something went wrong, so we will just reconnect and retry later
1470
in the future, we should do a better error analysis, but for
1471
now we just fill up the error log :-)
1473
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1474
*suppress_warnings= true; // Suppress reconnect warning
1476
sql_print_error(_("Error on COM_BINLOG_DUMP: %d %s, will retry in %d secs"),
1477
drizzle_errno(drizzle), drizzle_error(drizzle),
1486
Read one event from the master
1490
DRIZZLE DRIZZLE connection
1491
mi Master connection information
1492
suppress_warnings TRUE when a normal net read timeout has caused us to
1493
try a reconnect. We do not want to print anything to
1494
the error log in this case because this a anormal
1495
event in an idle server.
1498
'packet_error' Error
1499
number Length of packet
1502
static uint32_t read_event(DRIZZLE *drizzle,
1503
Master_info *mi __attribute__((unused)),
1504
bool* suppress_warnings)
1508
*suppress_warnings= false;
1510
my_real_read() will time us out
1511
We check if we were told to die, and if not, try reading again
1513
if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
1514
return(packet_error);
1516
len = cli_safe_read(drizzle);
1517
if (len == packet_error || (int32_t) len < 1)
1519
if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1522
We are trying a normal reconnect after a read timeout;
1523
we suppress prints to .err file as long as the reconnect
1524
happens without problems
1526
*suppress_warnings= true;
1529
sql_print_error(_("Error reading packet from server: %s ( server_errno=%d)"),
1530
drizzle_error(drizzle), drizzle_errno(drizzle));
1531
return(packet_error);
1534
/* Check if eof packet */
1535
if (len < 8 && drizzle->net.read_pos[0] == 254)
1537
sql_print_information(_("Slave: received end packet from server, apparent "
1538
"master shutdown: %s"),
1539
drizzle_error(drizzle));
1540
return(packet_error);
1547
int32_t check_expected_error(THD* thd __attribute__((unused)),
1548
Relay_log_info const *rli __attribute__((unused)),
1549
int32_t expected_error)
1551
switch (expected_error) {
1552
case ER_NET_READ_ERROR:
1553
case ER_NET_ERROR_ON_WRITE:
1554
case ER_QUERY_INTERRUPTED:
1555
case ER_SERVER_SHUTDOWN:
1556
case ER_NEW_ABORTING_CONNECTION:
1565
Check if the current error is of temporary nature of not.
1566
Some errors are temporary in nature, such as
1567
ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. Ndb also signals
1568
that the error is temporary by pushing a warning with the error code
1569
ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1571
static int32_t has_temporary_error(THD *thd)
1573
if (thd->is_fatal_error)
1576
if (thd->main_da.is_error())
1579
my_error(ER_LOCK_DEADLOCK, MYF(0));
1583
If there is no message in THD, we can't say if it's a temporary
1584
error or not. This is currently the case for Incident_log_event,
1585
which sets no message. Return FALSE.
1587
if (!thd->is_error())
1591
Temporary error codes:
1592
currently, InnoDB deadlock detected by InnoDB or lock
1593
wait timeout (innodb_lock_wait_timeout exceeded
1595
if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
1596
thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
1604
Applies the given event and advances the relay log position.
1606
In essence, this function does:
1609
ev->apply_event(rli);
1610
ev->update_pos(rli);
1613
But it also does some maintainance, such as skipping events if
1614
needed and reporting errors.
1616
If the @c skip flag is set, then it is tested whether the event
1617
should be skipped, by looking at the slave_skip_counter and the
1618
server id. The skip flag should be set when calling this from a
1619
replication thread but not set when executing an explicit BINLOG
1624
@retval 1 Error calling ev->apply_event().
1626
@retval 2 No error calling ev->apply_event(), but error calling
1629
int32_t apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
1632
int32_t exec_res= 0;
1635
Execute the event to change the database and update the binary
1636
log coordinates, but first we set some data that is needed for
1639
The event will be executed unless it is supposed to be skipped.
1641
Queries originating from this server must be skipped. Low-level
1642
events (Format_description_log_event, Rotate_log_event,
1643
Stop_log_event) from this server must also be skipped. But for
1644
those we don't want to modify 'group_master_log_pos', because
1645
these events did not exist on the master.
1646
Format_description_log_event is not completely skipped.
1648
Skip queries specified by the user in 'slave_skip_counter'. We
1649
can't however skip events that has something to do with the log
1652
Filtering on own server id is extremely important, to ignore
1653
execution of events created by the creation/rotation of the relay
1654
log (remember that now the relay log starts with its Format_desc,
1658
thd->server_id = ev->server_id; // use the original server id for logging
1659
thd->set_time(); // time the query
1660
thd->lex->current_select= 0;
1662
ev->when= my_time(0);
1663
ev->thd = thd; // because up to this point, ev->thd == 0
1667
int32_t reason= ev->shall_skip(rli);
1668
if (reason == Log_event::EVENT_SKIP_COUNT)
1669
--rli->slave_skip_counter;
1670
pthread_mutex_unlock(&rli->data_lock);
1671
if (reason == Log_event::EVENT_SKIP_NOT)
1672
exec_res= ev->apply_event(rli);
1675
exec_res= ev->apply_event(rli);
1679
int32_t error= ev->update_pos(rli);
1681
The update should not fail, so print an error message and
1682
return an error code.
1684
TODO: Replace this with a decent error message when merged
1685
with BUG#24954 (which adds several new error message).
1690
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
1691
_("It was not possible to update the positions"
1692
" of the relay log information: the slave may"
1693
" be in an inconsistent state."
1694
" Stopped in %s position %s"),
1695
rli->group_relay_log_name,
1696
llstr(rli->group_relay_log_pos, buf));
1701
return(exec_res ? 1 : 0);
1706
Top-level function for executing the next event from the relay log.
1708
This function reads the event from the relay log, executes it, and
1709
advances the relay log position. It also handles errors, etc.
1711
This function may fail to apply the event for the following reasons:
1713
- The position specfied by the UNTIL condition of the START SLAVE
1716
- It was not possible to read the event from the log.
1718
- The slave is killed.
1720
- An error occurred when applying the event, and the event has been
1721
tried slave_trans_retries times. If the event has been retried
1722
fewer times, 0 is returned.
1724
- init_master_info or init_relay_log_pos failed. (These are called
1725
if a failure occurs when applying the event.)</li>
1727
- An error occurred when updating the binlog position.
1729
@retval 0 The event was applied.
1731
@retval 1 The event was not applied.
1733
static int32_t exec_relay_log_event(THD* thd, Relay_log_info* rli)
1736
We acquire this mutex since we need it for all operations except
1737
event execution. But we will release it in places where we will
1738
wait for something for example inside of next_event().
1740
pthread_mutex_lock(&rli->data_lock);
1742
Log_event * ev = next_event(rli);
1744
assert(rli->sql_thd==thd);
1746
if (sql_slave_killed(thd,rli))
1748
pthread_mutex_unlock(&rli->data_lock);
1757
This tests if the position of the beginning of the current event
1758
hits the UNTIL barrier.
1760
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
1761
rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
1762
rli->group_master_log_pos :
1763
ev->log_pos - ev->data_written))
1766
sql_print_information(_("Slave SQL thread stopped because it reached its"
1767
" UNTIL position %s"),
1768
llstr(rli->until_pos(), buf));
1770
Setting abort_slave flag because we do not want additional message about
1771
error in query execution to be printed.
1773
rli->abort_slave= 1;
1774
pthread_mutex_unlock(&rli->data_lock);
1778
exec_res= apply_event_and_update_pos(ev, thd, rli, true);
1781
Format_description_log_event should not be deleted because it will be
1782
used to read info about the relay log's format; it will be deleted when
1783
the SQL thread does not need it, i.e. when this thread terminates.
1785
if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
1791
update_log_pos failed: this should not happen, so we don't
1797
if (slave_trans_retries)
1799
int32_t temp_err= 0;
1800
if (exec_res && (temp_err= has_temporary_error(thd)))
1804
We were in a transaction which has been rolled back because of a
1806
let's seek back to BEGIN log event and retry it all again.
1807
Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
1808
there is no rollback since 5.0.13 (ref: manual).
1809
We have to not only seek but also
1810
a) init_master_info(), to seek back to hot relay log's start for later
1811
(for when we will come back to this hot log after re-processing the
1812
possibly existing old logs where BEGIN is: check_binlog_magic() will
1813
then need the cache to be at position 0 (see comments at beginning of
1814
init_master_info()).
1815
b) init_relay_log_pos(), because the BEGIN may be an older relay log.
1817
if (rli->trans_retries < slave_trans_retries)
1819
if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
1820
sql_print_error(_("Failed to initialize the master info structure"));
1821
else if (init_relay_log_pos(rli,
1822
rli->group_relay_log_name,
1823
rli->group_relay_log_pos,
1825
sql_print_error(_("Error initializing relay log position: %s"),
1830
end_trans(thd, ROLLBACK);
1831
/* chance for concurrent connection to get more locks */
1832
safe_sleep(thd, min(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1833
(CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1834
pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1835
rli->trans_retries++;
1836
rli->retried_trans++;
1837
pthread_mutex_unlock(&rli->data_lock);
1841
sql_print_error(_("Slave SQL thread retried transaction %lu time(s) "
1842
"in vain, giving up. Consider raising the value of "
1843
"the slave_transaction_retries variable."),
1844
slave_trans_retries);
1846
else if ((exec_res && !temp_err) ||
1847
(opt_using_transactions &&
1848
rli->group_relay_log_pos == rli->event_relay_log_pos))
1851
Only reset the retry counter if the entire group succeeded
1852
or failed with a non-transient error. On a successful
1853
event, the execution will proceed as usual; in the case of a
1854
non-transient error, the slave will stop with an error.
1856
rli->trans_retries= 0; // restart from fresh
1861
pthread_mutex_unlock(&rli->data_lock);
1862
rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
1863
ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
1864
_("Could not parse relay log event entry. The possible reasons "
1865
"are: the master's binary log is corrupted (you can check this "
1866
"by running 'mysqlbinlog' on the binary log), the slave's "
1867
"relay log is corrupted (you can check this by running "
1868
"'mysqlbinlog' on the relay log), a network problem, or a bug "
1869
"in the master's or slave's DRIZZLE code. If you want to check "
1870
"the master's binary log or slave's relay log, you will be "
1871
"able to know their names by issuing 'SHOW SLAVE STATUS' "
1878
@brief Try to reconnect slave IO thread.
1880
@details Terminates current connection to master, sleeps for
1881
@c mi->connect_retry msecs and initiates new connection with
1882
@c safe_reconnect(). Variable pointed by @c retry_count is increased -
1883
if it exceeds @c master_retry_count then connection is not re-established
1884
and function signals error.
1885
Unless @c suppres_warnings is TRUE, a warning is put in the server error log
1886
when reconnecting. The warning message and messages used to report errors
1887
are taken from @c messages array. In case @c master_retry_count is exceeded,
1888
no messages are added to the log.
1890
@param[in] thd Thread context.
1891
@param[in] DRIZZLE DRIZZLE connection.
1892
@param[in] mi Master connection information.
1893
@param[in,out] retry_count Number of attempts to reconnect.
1894
@param[in] suppress_warnings TRUE when a normal net read timeout
1895
has caused to reconnecting.
1896
@param[in] messages Messages to print/log, see
1897
reconnect_messages[] array.
1900
@retval 1 There was an error.
1903
static int32_t try_to_reconnect(THD *thd, DRIZZLE *drizzle, Master_info *mi,
1904
uint32_t *retry_count, bool suppress_warnings,
1905
const char *messages[SLAVE_RECON_MSG_MAX])
1907
mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1908
thd->proc_info= _(messages[SLAVE_RECON_MSG_WAIT]);
1909
#ifdef SIGNAL_WITH_VIO_CLOSE
1910
thd->clear_active_vio();
1912
end_server(drizzle);
1913
if ((*retry_count)++)
1915
if (*retry_count > master_retry_count)
1916
return 1; // Don't retry forever
1917
safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1920
if (check_io_slave_killed(thd, mi,
1921
_(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1923
thd->proc_info = _(messages[SLAVE_RECON_MSG_AFTER]);
1924
if (!suppress_warnings)
1926
char buf[256], llbuff[22];
1927
snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1928
IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
1930
Raise a warining during registering on master/requesting dump.
1931
Log a message reading event.
1933
if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
1935
mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1936
ER(ER_SLAVE_MASTER_COM_FAILURE),
1937
_(messages[SLAVE_RECON_MSG_COMMAND]), buf);
1941
sql_print_information(buf);
1944
if (safe_reconnect(thd, drizzle, mi, 1) || io_slave_killed(thd, mi))
1946
if (global_system_variables.log_warnings)
1947
sql_print_information(_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1954
/* Slave I/O Thread entry point */
1956
pthread_handler_t handle_slave_io(void *arg)
1958
THD *thd; // needs to be first for thread_stack
1960
Master_info *mi = (Master_info*)arg;
1961
Relay_log_info *rli= &mi->rli;
1963
uint32_t retry_count;
1964
bool suppress_warnings;
1965
uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
1972
pthread_mutex_lock(&mi->run_lock);
1973
/* Inform waiting threads that slave has started */
1976
mi->events_till_disconnect = disconnect_slave_event_count;
1979
THD_CHECK_SENTRY(thd);
1982
pthread_detach_this_thread();
1983
thd->thread_stack= (char*) &thd; // remember where our stack is
1984
if (init_slave_thread(thd, SLAVE_THD_IO))
1986
pthread_cond_broadcast(&mi->start_cond);
1987
pthread_mutex_unlock(&mi->run_lock);
1988
sql_print_error(_("Failed during slave I/O thread initialization"));
1991
pthread_mutex_lock(&LOCK_thread_count);
1992
threads.append(thd);
1993
pthread_mutex_unlock(&LOCK_thread_count);
1994
mi->slave_running = 1;
1995
mi->abort_slave = 0;
1996
pthread_mutex_unlock(&mi->run_lock);
1997
pthread_cond_broadcast(&mi->start_cond);
1999
if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
2001
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2002
ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
2006
thd_proc_info(thd, "Connecting to master");
2007
// we can get killed during safe_connect
2008
if (!safe_connect(thd, drizzle, mi))
2010
sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
2011
"replication started in log '%s' at position %s"),
2012
mi->user, mi->host, mi->port,
2014
llstr(mi->master_log_pos,llbuff));
2016
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
2017
thread, since a replication event can become this much larger than
2018
the corresponding packet (query) sent from client to master.
2020
drizzle->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
2024
sql_print_information(_("Slave I/O thread killed while connecting to master"));
2030
// TODO: the assignment below should be under mutex (5.0)
2031
mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
2032
thd->slave_net = &drizzle->net;
2033
thd_proc_info(thd, "Checking master version");
2034
if (get_master_version_and_clock(drizzle, mi))
2037
if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
2040
Register ourselves with the master.
2042
thd_proc_info(thd, "Registering slave on master");
2043
if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2045
if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
2046
"while registering slave on master"))
2048
sql_print_error(_("Slave I/O thread couldn't register on master"));
2049
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2050
reconnect_messages[SLAVE_RECON_ACT_REG]))
2057
if (!retry_count_reg)
2060
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2061
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2062
reconnect_messages[SLAVE_RECON_ACT_REG]))
2068
while (!io_slave_killed(thd,mi))
2070
thd_proc_info(thd, "Requesting binlog dump");
2071
if (request_dump(drizzle, mi, &suppress_warnings))
2073
sql_print_error(_("Failed on request_dump()"));
2074
if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while \
2075
requesting master dump")) ||
2076
try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2077
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2081
if (!retry_count_dump)
2084
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2085
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2086
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2091
while (!io_slave_killed(thd,mi))
2095
We say "waiting" because read_event() will wait if there's nothing to
2096
read. But if there's something to read, it will not wait. The
2097
important thing is to not confuse users by saying "reading" whereas
2098
we're in fact receiving nothing.
2100
thd_proc_info(thd, _("Waiting for master to send event"));
2101
event_len= read_event(drizzle, mi, &suppress_warnings);
2102
if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while "
2105
if (!retry_count_event)
2107
retry_count_event++;
2108
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2109
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2110
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2115
if (event_len == packet_error)
2117
uint32_t drizzle_error_number= drizzle_errno(drizzle);
2118
switch (drizzle_error_number) {
2119
case CR_NET_PACKET_TOO_LARGE:
2120
sql_print_error(_("Log entry on master is longer than "
2121
"max_allowed_packet (%ld) on "
2122
"slave. If the entry is correct, restart the "
2123
"server with a higher value of "
2124
"max_allowed_packet"),
2125
thd->variables.max_allowed_packet);
2127
case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2128
sql_print_error(ER(drizzle_error_number), drizzle_error_number,
2129
drizzle_error(drizzle));
2131
case EE_OUTOFMEMORY:
2132
case ER_OUTOFMEMORY:
2134
_("Stopping slave I/O thread due to out-of-memory error from master"));
2137
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2138
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2141
} // if (event_len == packet_error)
2143
retry_count=0; // ok event, reset retry counter
2144
thd_proc_info(thd, _("Queueing master event to the relay log"));
2145
if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2149
if (flush_master_info(mi, 1))
2151
sql_print_error(_("Failed to flush master info file"));
2155
See if the relay logs take too much space.
2156
We don't lock mi->rli.log_space_lock here; this dirty read saves time
2157
and does not introduce any problem:
2158
- if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
2159
the clean value is 0), then we are reading only one more event as we
2160
should, and we'll block only at the next event. No big deal.
2161
- if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
2162
the clean value is 1), then we are going into wait_for_relay_log_space()
2163
for no reason, but this function will do a clean read, notice the clean
2164
value and exit immediately.
2166
if (rli->log_space_limit && rli->log_space_limit <
2167
rli->log_space_total &&
2168
!rli->ignore_log_space_limit)
2169
if (wait_for_relay_log_space(rli))
2171
sql_print_error(_("Slave I/O thread aborted while waiting for "
2172
"relay log space"));
2180
// print the current replication position
2181
sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2183
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2184
VOID(pthread_mutex_lock(&LOCK_thread_count));
2185
thd->query = thd->db = 0; // extra safety
2186
thd->query_length= thd->db_length= 0;
2187
VOID(pthread_mutex_unlock(&LOCK_thread_count));
2191
Here we need to clear the active VIO before closing the
2192
connection with the master. The reason is that THD::awake()
2193
might be called from terminate_slave_thread() because somebody
2194
issued a STOP SLAVE. If that happends, the close_active_vio()
2195
can be called in the middle of closing the VIO associated with
2196
the 'mysql' object, causing a crash.
2198
#ifdef SIGNAL_WITH_VIO_CLOSE
2199
thd->clear_active_vio();
2201
drizzle_close(drizzle);
2204
write_ignored_events_info_to_relay_log(thd, mi);
2205
thd_proc_info(thd, _("Waiting for slave mutex on exit"));
2206
pthread_mutex_lock(&mi->run_lock);
2208
/* Forget the relay log's format */
2209
delete mi->rli.relay_log.description_event_for_queue;
2210
mi->rli.relay_log.description_event_for_queue= 0;
2211
// TODO: make rpl_status part of Master_info
2212
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2213
assert(thd->net.buff != 0);
2214
net_end(&thd->net); // destructor will not free it, because net.vio is 0
2215
close_thread_tables(thd);
2216
pthread_mutex_lock(&LOCK_thread_count);
2217
THD_CHECK_SENTRY(thd);
2219
pthread_mutex_unlock(&LOCK_thread_count);
2221
mi->slave_running= 0;
2224
Note: the order of the two following calls (first broadcast, then unlock)
2225
is important. Otherwise a killer_thread can execute between the calls and
2226
delete the mi structure leading to a crash! (see BUG#25306 for details)
2228
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
2229
pthread_mutex_unlock(&mi->run_lock);
2232
return(0); // Can't return anything here
2236
/* Slave SQL Thread entry point */
2238
pthread_handler_t handle_slave_sql(void *arg)
2240
THD *thd; /* needs to be first for thread_stack */
2241
char llbuff[22],llbuff1[22];
2243
Relay_log_info* rli = &((Master_info*)arg)->rli;
2248
assert(rli->inited);
2249
pthread_mutex_lock(&rli->run_lock);
2250
assert(!rli->slave_running);
2252
rli->events_till_abort = abort_slave_event_count;
2255
thd->thread_stack = (char*)&thd; // remember where our stack is
2258
/* Inform waiting threads that slave has started */
2259
rli->slave_run_id++;
2260
rli->slave_running = 1;
2262
pthread_detach_this_thread();
2263
if (init_slave_thread(thd, SLAVE_THD_SQL))
2266
TODO: this is currently broken - slave start and change master
2267
will be stuck if we fail here
2269
pthread_cond_broadcast(&rli->start_cond);
2270
pthread_mutex_unlock(&rli->run_lock);
2271
sql_print_error(_("Failed during slave thread initialization"));
2274
thd->init_for_queries();
2275
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
2276
pthread_mutex_lock(&LOCK_thread_count);
2277
threads.append(thd);
2278
pthread_mutex_unlock(&LOCK_thread_count);
2280
We are going to set slave_running to 1. Assuming slave I/O thread is
2281
alive and connected, this is going to make Seconds_Behind_Master be 0
2282
i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
2283
the moment we start we can think we are caught up, and the next second we
2284
start receiving data so we realize we are not caught up and
2285
Seconds_Behind_Master grows. No big deal.
2287
rli->abort_slave = 0;
2288
pthread_mutex_unlock(&rli->run_lock);
2289
pthread_cond_broadcast(&rli->start_cond);
2292
Reset errors for a clean start (otherwise, if the master is idle, the SQL
2293
thread may execute no Query_log_event, so the error will remain even
2294
though there's no problem anymore). Do not reset the master timestamp
2295
(imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
2296
as we are not sure that we are going to receive a query, we want to
2297
remember the last master timestamp (to say how many seconds behind we are
2299
But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
2303
//tell the I/O thread to take relay_log_space_limit into account from now on
2304
pthread_mutex_lock(&rli->log_space_lock);
2305
rli->ignore_log_space_limit= 0;
2306
pthread_mutex_unlock(&rli->log_space_lock);
2307
rli->trans_retries= 0; // start from "no error"
2309
if (init_relay_log_pos(rli,
2310
rli->group_relay_log_name,
2311
rli->group_relay_log_pos,
2312
1 /*need data lock*/, &errmsg,
2313
1 /*look for a description_event*/))
2315
sql_print_error(_("Error initializing relay log position: %s"),
2319
THD_CHECK_SENTRY(thd);
2320
assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2322
Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2323
correct position when it's called just after my_b_seek() (the questionable
2324
stuff is those "seek is done on next read" comments in the my_b_seek()
2326
The crude reality is that this assertion randomly fails whereas
2327
replication seems to work fine. And there is no easy explanation why it
2328
fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
2329
init_relay_log_pos() called above). Maybe the assertion would be
2330
meaningful if we held rli->data_lock between the my_b_seek() and the
2333
assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2334
assert(rli->sql_thd == thd);
2336
if (global_system_variables.log_warnings)
2337
sql_print_information(_("Slave SQL thread initialized, "
2338
"starting replication in log '%s' at "
2339
"position %s, relay log '%s' position: %s"),
2341
llstr(rli->group_master_log_pos,llbuff),
2342
rli->group_relay_log_name,
2343
llstr(rli->group_relay_log_pos,llbuff1));
2345
/* execute init_slave variable */
2346
if (sys_init_slave.value_length)
2348
execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
2349
if (thd->is_slave_error)
2351
sql_print_error(_("Slave SQL thread aborted. "
2352
"Can't execute init_slave query"));
2358
First check until condition - probably there is nothing to execute. We
2359
do not want to wait for next event in this case.
2361
pthread_mutex_lock(&rli->data_lock);
2362
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
2363
rli->is_until_satisfied(rli->group_master_log_pos))
2366
sql_print_information(_("Slave SQL thread stopped because it reached its"
2367
" UNTIL position %s"), llstr(rli->until_pos(), buf));
2368
pthread_mutex_unlock(&rli->data_lock);
2371
pthread_mutex_unlock(&rli->data_lock);
2373
/* Read queries from the IO/THREAD until this thread is killed */
2375
while (!sql_slave_killed(thd,rli))
2377
thd_proc_info(thd, _("Reading event from the relay log"));
2378
assert(rli->sql_thd == thd);
2379
THD_CHECK_SENTRY(thd);
2380
if (exec_relay_log_event(thd,rli))
2382
// do not scare the user if SQL thread was simply killed or stopped
2383
if (!sql_slave_killed(thd,rli))
2386
retrieve as much info as possible from the thd and, error
2387
codes and warnings and print this to the error log as to
2388
allow the user to locate the error
2390
uint32_t const last_errno= rli->last_error().number;
2392
if (thd->is_error())
2394
char const *const errmsg= thd->main_da.message();
2396
if (last_errno == 0)
2398
rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), errmsg);
2400
else if (last_errno != thd->main_da.sql_errno())
2402
sql_print_error(_("Slave (additional info): %s Error_code: %d"),
2403
errmsg, thd->main_da.sql_errno());
2407
/* Print any warnings issued */
2408
List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
2411
Added controlled slave thread cancel for replication
2412
of user-defined variables.
2414
bool udf_error = false;
2417
if (err->code == ER_CANT_OPEN_LIBRARY)
2419
sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
2422
sql_print_error(_("Error loading user-defined library, slave SQL "
2423
"thread aborted. Install the missing library, "
2424
"and restart the slave SQL thread with "
2425
"\"SLAVE START\". We stopped at log '%s' "
2427
RPL_LOG_NAME, llstr(rli->group_master_log_pos,
2430
sql_print_error(_("Error running query, slave SQL thread aborted. "
2431
"Fix the problem, and restart "
2432
"the slave SQL thread with \"SLAVE START\". "
2433
"We stopped at log '%s' position %s"),
2435
llstr(rli->group_master_log_pos, llbuff));
2441
/* Thread stopped. Print the current replication position to the log */
2442
sql_print_information(_("Slave SQL thread exiting, replication stopped in "
2443
"log '%s' at position %s"),
2445
llstr(rli->group_master_log_pos,llbuff));
2450
Some events set some playgrounds, which won't be cleared because thread
2451
stops. Stopping of this thread may not be known to these events ("stop"
2452
request is detected only by the present function, not by events), so we
2453
must "proactively" clear playgrounds:
2455
rli->cleanup_context(thd, 1);
2456
VOID(pthread_mutex_lock(&LOCK_thread_count));
2458
Some extra safety, which should not been needed (normally, event deletion
2459
should already have done these assignments (each event which sets these
2460
variables is supposed to set them to 0 before terminating)).
2462
thd->query= thd->db= thd->catalog= 0;
2463
thd->query_length= thd->db_length= 0;
2464
VOID(pthread_mutex_unlock(&LOCK_thread_count));
2465
thd_proc_info(thd, "Waiting for slave mutex on exit");
2466
pthread_mutex_lock(&rli->run_lock);
2467
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
2468
pthread_mutex_lock(&rli->data_lock);
2469
assert(rli->slave_running == 1); // tracking buffer overrun
2470
/* When master_pos_wait() wakes up it will check this and terminate */
2471
rli->slave_running= 0;
2472
/* Forget the relay log's format */
2473
delete rli->relay_log.description_event_for_exec;
2474
rli->relay_log.description_event_for_exec= 0;
2475
/* Wake up master_pos_wait() */
2476
pthread_mutex_unlock(&rli->data_lock);
2477
pthread_cond_broadcast(&rli->data_cond);
2478
rli->ignore_log_space_limit= 0; /* don't need any lock */
2479
/* we die so won't remember charset - re-update them on next thread start */
2480
rli->cached_charset_invalidate();
2481
rli->save_temporary_tables = thd->temporary_tables;
2484
TODO: see if we can do this conditionally in next_event() instead
2485
to avoid unneeded position re-init
2487
thd->temporary_tables = 0; // remove tempation from destructor to close them
2488
assert(thd->net.buff != 0);
2489
net_end(&thd->net); // destructor will not free it, because we are weird
2490
assert(rli->sql_thd == thd);
2491
THD_CHECK_SENTRY(thd);
2493
pthread_mutex_lock(&LOCK_thread_count);
2494
THD_CHECK_SENTRY(thd);
2496
pthread_mutex_unlock(&LOCK_thread_count);
2498
Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
2499
is important. Otherwise a killer_thread can execute between the calls and
2500
delete the mi structure leading to a crash! (see BUG#25306 for details)
2502
pthread_cond_broadcast(&rli->stop_cond);
2503
pthread_mutex_unlock(&rli->run_lock); // tell the world we are done
2507
return(0); // Can't return anything here
2512
process_io_create_file()
2515
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
2519
bool cev_not_written;
2520
THD *thd = mi->io_thd;
2521
NET *net = &mi->drizzle->net;
2523
if (unlikely(!cev->is_valid()))
2526
if (!rpl_filter->db_ok(cev->db))
2528
skip_load_data_infile(net);
2531
assert(cev->inited_from_old);
2532
thd->file_id = cev->file_id = mi->file_id++;
2533
thd->server_id = cev->server_id;
2534
cev_not_written = 1;
2536
if (unlikely(net_request_file(net,cev->fname)))
2538
sql_print_error(_("Slave I/O: failed requesting download of '%s'"),
2544
This dummy block is so we could instantiate Append_block_log_event
2545
once and then modify it slightly instead of doing it multiple times
2549
Append_block_log_event aev(thd,0,0,0,0);
2553
if (unlikely((num_bytes=my_net_read(net)) == packet_error))
2555
sql_print_error(_("Network read error downloading '%s' from master"),
2559
if (unlikely(!num_bytes)) /* eof */
2561
/* 3.23 master wants it */
2562
net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0);
2564
If we wrote Create_file_log_event, then we need to write
2565
Execute_load_log_event. If we did not write Create_file_log_event,
2566
then this is an empty file and we can just do as if the LOAD DATA
2567
INFILE had not existed, i.e. write nothing.
2569
if (unlikely(cev_not_written))
2571
Execute_load_log_event xev(thd,0,0);
2572
xev.log_pos = cev->log_pos;
2573
if (unlikely(mi->rli.relay_log.append(&xev)))
2575
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2576
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2577
_("error writing Exec_load event to relay log"));
2580
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2583
if (unlikely(cev_not_written))
2585
cev->block = net->read_pos;
2586
cev->block_len = num_bytes;
2587
if (unlikely(mi->rli.relay_log.append(cev)))
2589
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2590
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2591
_("error writing Create_file event to relay log"));
2595
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2599
aev.block = net->read_pos;
2600
aev.block_len = num_bytes;
2601
aev.log_pos = cev->log_pos;
2602
if (unlikely(mi->rli.relay_log.append(&aev)))
2604
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2605
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2606
_("error writing Append_block event to relay log"));
2609
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
2620
Start using a new binary log on the master
2624
mi master_info for the slave
2625
rev The rotate log event read from the binary log
2628
Updates the master info with the place in the next binary
2629
log where we should start reading.
2630
Rotate the relay log to avoid mixed-format relay logs.
2633
We assume we already locked mi->data_lock
2637
1 Log event is illegal
2641
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
2643
safe_mutex_assert_owner(&mi->data_lock);
2645
if (unlikely(!rev->is_valid()))
2648
/* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2649
memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
2650
mi->master_log_pos= rev->pos;
2652
If we do not do this, we will be getting the first
2653
rotate event forever, so we need to not disconnect after one.
2655
if (disconnect_slave_event_count)
2656
mi->events_till_disconnect++;
2659
If description_event_for_queue is format <4, there is conversion in the
2660
relay log to the slave's format (4). And Rotate can mean upgrade or
2661
nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
2662
no need to reset description_event_for_queue now. And if it's nothing (same
2663
master version as before), no need (still using the slave's format).
2665
if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
2667
delete mi->rli.relay_log.description_event_for_queue;
2668
/* start from format 3 (DRIZZLE 4.0) again */
2669
mi->rli.relay_log.description_event_for_queue= new
2670
Format_description_log_event(3);
2673
Rotate the relay log makes binlog format detection easier (at next slave
2674
start or mysqlbinlog)
2676
rotate_relay_log(mi); /* will take the right mutexes */
2681
Reads a 3.23 event and converts it to the slave's format. This code was
2682
copied from DRIZZLE 4.0.
2684
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
2687
const char *errmsg = 0;
2689
bool ignore_event= 0;
2691
Relay_log_info *rli= &mi->rli;
2694
If we get Load event, we need to pass a non-reusable buffer
2695
to read_log_event, so we do a trick
2697
if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
2699
if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
2701
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2702
ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
2705
memcpy(tmp_buf,buf,event_len);
2707
Create_file constructor wants a 0 as last char of buffer, this 0 will
2708
serve as the string-termination char for the file's name (which is at the
2710
We must increment event_len, otherwise the event constructor will not see
2711
this end 0, which leads to segfault.
2713
tmp_buf[event_len++]=0;
2714
int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
2715
buf = (const char*)tmp_buf;
2718
This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
2719
send the loaded file, and write it to the relay log in the form of
2720
Append_block/Exec_load (the SQL thread needs the data, as that thread is not
2721
connected to the master).
2723
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2724
mi->rli.relay_log.description_event_for_queue);
2727
sql_print_error(_("Read invalid event from master: '%s', "
2728
"master could be corrupt but a more likely cause "
2729
"of this is a bug"),
2731
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
2735
pthread_mutex_lock(&mi->data_lock);
2736
ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
2737
switch (ev->get_type_code()) {
2743
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2746
pthread_mutex_unlock(&mi->data_lock);
2751
case CREATE_FILE_EVENT:
2753
Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
2754
queue_old_event() which is for 3.23 events which don't comprise
2755
CREATE_FILE_EVENT. This is because read_log_event() above has just
2756
transformed LOAD_EVENT into CREATE_FILE_EVENT.
2759
/* We come here when and only when tmp_buf != 0 */
2760
assert(tmp_buf != 0);
2762
ev->log_pos+= inc_pos;
2763
int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2765
mi->master_log_pos += inc_pos;
2766
pthread_mutex_unlock(&mi->data_lock);
2767
my_free((char*)tmp_buf, MYF(0));
2774
if (likely(!ignore_event))
2778
Don't do it for fake Rotate events (see comment in
2779
Log_event::Log_event(const char* buf...) in log_event.cc).
2781
ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
2782
if (unlikely(rli->relay_log.append(ev)))
2785
pthread_mutex_unlock(&mi->data_lock);
2788
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2791
mi->master_log_pos+= inc_pos;
2792
pthread_mutex_unlock(&mi->data_lock);
2797
Reads a 4.0 event and converts it to the slave's format. This code was copied
2798
from queue_binlog_ver_1_event(), with some affordable simplifications.
2800
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
2803
const char *errmsg = 0;
2806
Relay_log_info *rli= &mi->rli;
2808
/* read_log_event() will adjust log_pos to be end_log_pos */
2809
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2810
mi->rli.relay_log.description_event_for_queue);
2813
sql_print_error(_("Read invalid event from master: '%s', "
2814
"master could be corrupt but a more likely cause of "
2817
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
2820
pthread_mutex_lock(&mi->data_lock);
2821
switch (ev->get_type_code()) {
2825
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2828
pthread_mutex_unlock(&mi->data_lock);
2837
if (unlikely(rli->relay_log.append(ev)))
2840
pthread_mutex_unlock(&mi->data_lock);
2843
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2845
mi->master_log_pos+= inc_pos;
2847
pthread_mutex_unlock(&mi->data_lock);
2854
Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
2855
(exactly, slave's) format. To do the conversion, we create a 5.0 event from
2856
the 3.23/4.0 bytes, then write this event to the relay log.
2859
Test this code before release - it has to be tested on a separate
2860
setup with 3.23 master or 4.0 master
2863
static int32_t queue_old_event(Master_info *mi, const char *buf,
2866
switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
2869
return(queue_binlog_ver_1_event(mi,buf,event_len));
2871
return(queue_binlog_ver_3_event(mi,buf,event_len));
2872
default: /* unsupported format; eg version 2 */
2880
If the event is 3.23/4.0, passes it to queue_old_event() which will convert
2881
it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
2882
no format conversion, it's pure read/write of bytes.
2883
So a 5.0.0 slave's relay log can contain events in the slave's format or in
2887
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
2891
uint32_t inc_pos= 0;
2892
Relay_log_info *rli= &mi->rli;
2893
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
2896
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
2897
buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
2898
return(queue_old_event(mi,buf,event_len));
2900
pthread_mutex_lock(&mi->data_lock);
2902
switch (buf[EVENT_TYPE_OFFSET]) {
2905
We needn't write this event to the relay log. Indeed, it just indicates a
2906
master server shutdown. The only thing this does is cleaning. But
2907
cleaning is already done on a per-master-thread basis (as the master
2908
server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
2909
prepared statements' deletion are TODO only when we binlog prep stmts).
2911
We don't even increment mi->master_log_pos, because we may be just after
2912
a Rotate event. Btw, in a few milliseconds we are going to have a Start
2913
event from the next binlog (unless the master is presently running
2919
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
2920
if (unlikely(process_io_rotate(mi,&rev)))
2922
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2926
Now the I/O thread has just changed its mi->master_log_name, so
2927
incrementing mi->master_log_pos is nonsense.
2932
case FORMAT_DESCRIPTION_EVENT:
2935
Create an event, and save it (when we rotate the relay log, we will have
2936
to write this event again).
2939
We are the only thread which reads/writes description_event_for_queue.
2940
The relay_log struct does not move (though some members of it can
2941
change), so we needn't any lock (no rli->data_lock, no log lock).
2943
Format_description_log_event* tmp;
2945
if (!(tmp= (Format_description_log_event*)
2946
Log_event::read_log_event(buf, event_len, &errmsg,
2947
mi->rli.relay_log.description_event_for_queue)))
2949
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2952
delete mi->rli.relay_log.description_event_for_queue;
2953
mi->rli.relay_log.description_event_for_queue= tmp;
2955
Though this does some conversion to the slave's format, this will
2956
preserve the master's binlog format version, and number of event types.
2959
If the event was not requested by the slave (the slave did not ask for
2960
it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
2962
inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
2966
case HEARTBEAT_LOG_EVENT:
2969
HB (heartbeat) cannot come before RL (Relay)
2972
Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
2975
error= ER_SLAVE_HEARTBEAT_FAILURE;
2976
error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2977
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2978
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2979
error_msg.append(STRING_WITH_LEN(" log_pos "));
2980
llstr(hb.log_pos, llbuf);
2981
error_msg.append(llbuf, strlen(llbuf));
2984
mi->received_heartbeats++;
2986
compare local and event's versions of log_file, log_pos.
2988
Heartbeat is sent only after an event corresponding to the corrdinates
2989
the heartbeat carries.
2990
Slave can not have a difference in coordinates except in the only
2991
special case when mi->master_log_name, master_log_pos have never
2992
been updated by Rotate event i.e when slave does not have any history
2993
with the master (and thereafter mi->master_log_pos is NULL).
2995
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2997
if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
2998
&& mi->master_log_name != NULL)
2999
|| mi->master_log_pos != hb.log_pos)
3001
/* missed events of heartbeat from the past */
3002
error= ER_SLAVE_HEARTBEAT_FAILURE;
3003
error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
3004
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
3005
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
3006
error_msg.append(STRING_WITH_LEN(" log_pos "));
3007
llstr(hb.log_pos, llbuf);
3008
error_msg.append(llbuf, strlen(llbuf));
3011
goto skip_relay_logging;
3021
If this event is originating from this server, don't queue it.
3022
We don't check this for 3.23 events because it's simpler like this; 3.23
3023
will be filtered anyway by the SQL slave thread which also tests the
3024
server id (we must also keep this test in the SQL thread, in case somebody
3025
upgrades a 4.0 slave which has a not-filtered relay log).
3027
ANY event coming from ourselves can be ignored: it is obvious for queries;
3028
for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
3029
(--log-slave-updates would not log that) unless this slave is also its
3030
direct master (an unsupported, useless setup!).
3033
pthread_mutex_lock(log_lock);
3035
if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
3036
!mi->rli.replicate_same_server_id)
3039
Do not write it to the relay log.
3040
a) We still want to increment mi->master_log_pos, so that we won't
3041
re-read this event from the master if the slave IO thread is now
3042
stopped/restarted (more efficient if the events we are ignoring are big
3044
b) We want to record that we are skipping events, for the information of
3045
the slave SQL thread, otherwise that thread may let
3046
rli->group_relay_log_pos stay too small if the last binlog's event is
3048
But events which were generated by this slave and which do not exist in
3049
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
3052
if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
3053
buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3054
buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3056
mi->master_log_pos+= inc_pos;
3057
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
3058
assert(rli->ign_master_log_name_end[0]);
3059
rli->ign_master_log_pos_end= mi->master_log_pos;
3061
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3065
/* write the event to the relay log */
3066
if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3068
mi->master_log_pos+= inc_pos;
3069
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3073
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
3075
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
3077
pthread_mutex_unlock(log_lock);
3082
pthread_mutex_unlock(&mi->data_lock);
3084
mi->report(ERROR_LEVEL, error, ER(error),
3085
(error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
3086
_("could not queue event from master") :
3092
void end_relay_log_info(Relay_log_info* rli)
3096
if (rli->info_fd >= 0)
3098
end_io_cache(&rli->info_file);
3099
(void) my_close(rli->info_fd, MYF(MY_WME));
3102
if (rli->cur_log_fd >= 0)
3104
end_io_cache(&rli->cache_buf);
3105
(void)my_close(rli->cur_log_fd, MYF(MY_WME));
3106
rli->cur_log_fd = -1;
3109
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
3110
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3112
Delete the slave's temporary tables from memory.
3113
In the future there will be other actions than this, to ensure persistance
3114
of slave's temp tables after shutdown.
3116
rli->close_temporary_tables();
3121
Try to connect until successful or slave killed
3125
thd Thread handler for slave
3126
DRIZZLE DRIZZLE connection handle
3127
mi Replication handle
3134
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi)
3136
return(connect_to_master(thd, drizzle, mi, 0, 0));
3145
Try to connect until successful or slave killed or we have retried
3146
master_retry_count times
3149
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3150
bool reconnect, bool suppress_warnings)
3152
int32_t slave_was_killed;
3153
int32_t last_errno= -2; // impossible error
3154
uint32_t err_count=0;
3157
mi->events_till_disconnect = disconnect_slave_event_count;
3158
uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
3159
if (opt_slave_compressed_protocol)
3160
client_flag=CLIENT_COMPRESS; /* We will use compression */
3162
drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3163
drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3165
drizzle_options(drizzle, DRIZZLE_SET_CHARSET_NAME, default_charset_info->csname);
3166
/* This one is not strictly needed but we have it here for completeness */
3167
drizzle_options(drizzle, DRIZZLE_SET_CHARSET_DIR, (char *) charsets_dir);
3169
while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3170
(reconnect ? drizzle_reconnect(drizzle) != 0 :
3171
drizzle_connect(drizzle, mi->host, mi->user, mi->password, 0,
3172
mi->port, 0, client_flag) == 0))
3174
/* Don't repeat last error */
3175
if ((int32_t)drizzle_errno(drizzle) != last_errno)
3177
last_errno=drizzle_errno(drizzle);
3178
suppress_warnings= 0;
3179
mi->report(ERROR_LEVEL, last_errno,
3180
_("error %s to master '%s@%s:%d'"
3181
" - retry-time: %d retries: %u"),
3182
(reconnect ? _("reconnecting") : _("connecting")),
3183
mi->user, mi->host, mi->port,
3184
mi->connect_retry, master_retry_count);
3187
By default we try forever. The reason is that failure will trigger
3188
master election, so if the user did not set master_retry_count we
3189
do not want to have election triggered on the first failure to
3192
if (++err_count == master_retry_count)
3196
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
3199
safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3203
if (!slave_was_killed)
3207
if (!suppress_warnings && global_system_variables.log_warnings)
3208
sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3209
"replication resumed in log '%s' at "
3210
"position %s"), mi->user,
3213
llstr(mi->master_log_pos,llbuff));
3217
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
3218
general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d",
3219
mi->user, mi->host, mi->port);
3221
#ifdef SIGNAL_WITH_VIO_CLOSE
3222
thd->set_active_vio(drizzle->net.vio);
3225
drizzle->reconnect= 1;
3226
return(slave_was_killed);
3234
Try to connect until successful or slave killed or we have retried
3235
master_retry_count times
3238
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3239
bool suppress_warnings)
3241
return(connect_to_master(thd, drizzle, mi, 1, suppress_warnings));
3246
Store the file and position where the execute-slave thread are in the
3250
flush_relay_log_info()
3251
rli Relay log information
3254
- As this is only called by the slave thread, we don't need to
3255
have a lock on this.
3256
- If there is an active transaction, then we don't update the position
3257
in the relay log. This is to ensure that we re-execute statements
3258
if we die in the middle of an transaction that was rolled back.
3259
- As a transaction never spans binary logs, we don't have to handle the
3260
case where we do a relay-log-rotation in the middle of the transaction.
3261
If this would not be the case, we would have to ensure that we
3262
don't delete the relay log file where the transaction started when
3263
we switch to a new relay log file.
3266
- Change the log file information to a binary format to avoid calling
3274
bool flush_relay_log_info(Relay_log_info* rli)
3278
if (unlikely(rli->no_storage))
3281
IO_CACHE *file = &rli->info_file;
3282
char buff[FN_REFLEN*2+22*2+4], *pos;
3284
my_b_seek(file, 0L);
3285
pos=stpcpy(buff, rli->group_relay_log_name);
3287
pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
3289
pos=stpcpy(pos, rli->group_master_log_name);
3291
pos=int64_t2str(rli->group_master_log_pos, pos, 10);
3293
if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
3295
if (flush_io_cache(file))
3298
/* Flushing the relay log is done by the slave I/O thread */
3304
Called when we notice that the current "hot" log got rotated under our feet.
3307
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
3309
assert(rli->cur_log != &rli->cache_buf);
3310
assert(rli->cur_log_fd == -1);
3312
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3313
if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
3317
We want to start exactly where we was before:
3318
relay_log_pos Current log pos
3319
pending Number of bytes already processed from the event
3321
rli->event_relay_log_pos= max(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3322
my_b_seek(cur_log,rli->event_relay_log_pos);
3327
static Log_event* next_event(Relay_log_info* rli)
3330
IO_CACHE* cur_log = rli->cur_log;
3331
pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3332
const char* errmsg=0;
3333
THD* thd = rli->sql_thd;
3337
if (abort_slave_event_count && !rli->events_till_abort--)
3341
For most operations we need to protect rli members with data_lock,
3342
so we assume calling function acquired this mutex for us and we will
3343
hold it for the most of the loop below However, we will release it
3344
whenever it is worth the hassle, and in the cases when we go into a
3345
pthread_cond_wait() with the non-data_lock mutex
3347
safe_mutex_assert_owner(&rli->data_lock);
3349
while (!sql_slave_killed(thd,rli))
3352
We can have two kinds of log reading:
3354
rli->cur_log points at the IO_CACHE of relay_log, which
3355
is actively being updated by the I/O thread. We need to be careful
3356
in this case and make sure that we are not looking at a stale log that
3357
has already been rotated. If it has been, we reopen the log.
3359
The other case is much simpler:
3360
We just have a read only log that nobody else will be updating.
3363
if ((hot_log = (cur_log != &rli->cache_buf)))
3365
assert(rli->cur_log_fd == -1); // foreign descriptor
3366
pthread_mutex_lock(log_lock);
3369
Reading xxx_file_id is safe because the log will only
3370
be rotated when we hold relay_log.LOCK_log
3372
if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
3374
// The master has switched to a new log file; Reopen the old log file
3375
cur_log=reopen_relay_log(rli, &errmsg);
3376
pthread_mutex_unlock(log_lock);
3377
if (!cur_log) // No more log files
3379
hot_log=0; // Using old binary log
3383
As there is no guarantee that the relay is open (for example, an I/O
3384
error during a write by the slave I/O thread may have closed it), we
3387
if (!my_b_inited(cur_log))
3389
assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3390
assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
3393
Relay log is always in new format - if the master is 3.23, the
3394
I/O thread will convert the format for us.
3395
A problem: the description event may be in a previous relay log. So if
3396
the slave has been shutdown meanwhile, we would have to look in old relay
3397
logs, which may even have been deleted. So we need to write this
3398
description event at the beginning of the relay log.
3399
When the relay log is created when the I/O thread starts, easy: the
3400
master will send the description event and we will queue it.
3401
But if the relay log is created by new_file(): then the solution is:
3402
DRIZZLE_BIN_LOG::open() will write the buffered description event.
3404
if ((ev=Log_event::read_log_event(cur_log,0,
3405
rli->relay_log.description_event_for_exec)))
3408
assert(thd==rli->sql_thd);
3410
read it while we have a lock, to avoid a mutex lock in
3411
inc_event_relay_log_pos()
3413
rli->future_event_relay_log_pos= my_b_tell(cur_log);
3415
pthread_mutex_unlock(log_lock);
3418
assert(thd==rli->sql_thd);
3419
if (opt_reckless_slave) // For mysql-test
3421
if (cur_log->error < 0)
3423
errmsg = "slave SQL thread aborted because of I/O error";
3425
pthread_mutex_unlock(log_lock);
3428
if (!cur_log->error) /* EOF */
3431
On a hot log, EOF means that there are no more updates to
3432
process and we must block until I/O thread adds some and
3433
signals us to continue
3438
We say in Seconds_Behind_Master that we have "caught up". Note that
3439
for example if network link is broken but I/O slave thread hasn't
3440
noticed it (slave_net_timeout not elapsed), then we'll say "caught
3441
up" whereas we're not really caught up. Fixing that would require
3442
internally cutting timeout in smaller pieces in network read, no
3443
thanks. Another example: SQL has caught up on I/O, now I/O has read
3444
a new event and is queuing it; the false "0" will exist until SQL
3445
finishes executing the new event; it will be look abnormal only if
3446
the events have old timestamps (then you get "many", 0, "many").
3448
Transient phases like this can be fixed with implemeting
3449
Heartbeat event which provides the slave the status of the
3450
master at time the master does not have any new update to send.
3451
Seconds_Behind_Master would be zero only when master has no
3452
more updates in binlog for slave. The heartbeat can be sent
3453
in a (small) fraction of slave_net_timeout. Until it's done
3454
rli->last_master_timestamp is temporarely (for time of
3455
waiting for the following event) reset whenever EOF is
3458
time_t save_timestamp= rli->last_master_timestamp;
3459
rli->last_master_timestamp= 0;
3461
assert(rli->relay_log.get_open_count() ==
3462
rli->cur_log_old_open_count);
3464
if (rli->ign_master_log_name_end[0])
3466
/* We generate and return a Rotate, to make our positions advance */
3467
ev= new Rotate_log_event(rli->ign_master_log_name_end,
3468
0, rli->ign_master_log_pos_end,
3469
Rotate_log_event::DUP_NAME);
3470
rli->ign_master_log_name_end[0]= 0;
3471
pthread_mutex_unlock(log_lock);
3474
errmsg= "Slave SQL thread failed to create a Rotate event "
3475
"(out of memory?), SHOW SLAVE STATUS may be inaccurate";
3478
ev->server_id= 0; // don't be ignored by slave SQL thread
3483
We can, and should release data_lock while we are waiting for
3484
update. If we do not, show slave status will block
3486
pthread_mutex_unlock(&rli->data_lock);
3490
- the I/O thread has reached log_space_limit
3491
- the SQL thread has read all relay logs, but cannot purge for some
3493
* it has already purged all logs except the current one
3494
* there are other logs than the current one but they're involved in
3495
a transaction that finishes in the current one (or is not finished)
3497
Wake up the possibly waiting I/O thread, and set a boolean asking
3498
the I/O thread to temporarily ignore the log_space_limit
3499
constraint, because we do not want the I/O thread to block because of
3500
space (it's ok if it blocks for any other reason (e.g. because the
3501
master does not send anything). Then the I/O thread stops waiting
3502
and reads more events.
3503
The SQL thread decides when the I/O thread should take log_space_limit
3504
into account again : ignore_log_space_limit is reset to 0
3505
in purge_first_log (when the SQL thread purges the just-read relay
3506
log), and also when the SQL thread starts. We should also reset
3507
ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
3508
fact, no need as RESET SLAVE requires that the slave
3509
be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
3512
pthread_mutex_lock(&rli->log_space_lock);
3513
// prevent the I/O thread from blocking next times
3514
rli->ignore_log_space_limit= 1;
3516
If the I/O thread is blocked, unblock it. Ok to broadcast
3517
after unlock, because the mutex is only destroyed in
3518
~Relay_log_info(), i.e. when rli is destroyed, and rli will
3519
not be destroyed before we exit the present function.
3521
pthread_mutex_unlock(&rli->log_space_lock);
3522
pthread_cond_broadcast(&rli->log_space_cond);
3523
// Note that wait_for_update_relay_log unlocks lock_log !
3524
rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
3525
// re-acquire data lock since we released it earlier
3526
pthread_mutex_lock(&rli->data_lock);
3527
rli->last_master_timestamp= save_timestamp;
3531
If the log was not hot, we need to move to the next log in
3532
sequence. The next log could be hot or cold, we deal with both
3533
cases separately after doing some common initialization
3535
end_io_cache(cur_log);
3536
assert(rli->cur_log_fd >= 0);
3537
my_close(rli->cur_log_fd, MYF(MY_WME));
3538
rli->cur_log_fd = -1;
3540
if (relay_log_purge)
3543
purge_first_log will properly set up relay log coordinates in rli.
3544
If the group's coordinates are equal to the event's coordinates
3545
(i.e. the relay log was not rotated in the middle of a group),
3546
we can purge this relay log too.
3547
We do uint64_t and string comparisons, this may be slow but
3548
- purging the last relay log is nice (it can save 1GB of disk), so we
3549
like to detect the case where we can do it, and given this,
3550
- I see no better detection method
3551
- purge_first_log is not called that often
3553
if (rli->relay_log.purge_first_log
3555
rli->group_relay_log_pos == rli->event_relay_log_pos
3556
&& !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
3558
errmsg = "Error purging processed logs";
3565
If hot_log is set, then we already have a lock on
3566
LOCK_log. If not, we have to get the lock.
3568
According to Sasha, the only time this code will ever be executed
3569
is if we are recovering from a bug.
3571
if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
3573
errmsg = "error switching to the next log";
3576
rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3577
strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
3578
sizeof(rli->event_relay_log_name)-1);
3579
flush_relay_log_info(rli);
3583
Now we want to open this next log. To know if it's a hot log (the one
3584
being written by the I/O thread now) or a cold log, we can use
3585
is_active(); if it is hot, we use the I/O cache; if it's cold we open
3586
the file normally. But if is_active() reports that the log is hot, this
3587
may change between the test and the consequence of the test. So we may
3588
open the I/O cache whereas the log is now cold, which is nonsense.
3589
To guard against this, we need to have LOCK_log.
3592
if (!hot_log) /* if hot_log, we already have this mutex */
3593
pthread_mutex_lock(log_lock);
3594
if (rli->relay_log.is_active(rli->linfo.log_file_name))
3597
if (global_system_variables.log_warnings)
3598
sql_print_information(_("next log '%s' is currently active"),
3599
rli->linfo.log_file_name);
3601
rli->cur_log= cur_log= rli->relay_log.get_log_file();
3602
rli->cur_log_old_open_count= rli->relay_log.get_open_count();
3603
assert(rli->cur_log_fd == -1);
3606
Read pointer has to be at the start since we are the only
3608
We must keep the LOCK_log to read the 4 first bytes, as this is a hot
3609
log (same as when we call read_log_event() above: for a hot log we
3612
if (check_binlog_magic(cur_log,&errmsg))
3614
if (!hot_log) pthread_mutex_unlock(log_lock);
3617
if (!hot_log) pthread_mutex_unlock(log_lock);
3620
if (!hot_log) pthread_mutex_unlock(log_lock);
3622
if we get here, the log was not hot, so we will have to open it
3623
ourselves. We are sure that the log is still not hot now (a log can get
3624
from hot to cold, but not from cold to hot). No need for LOCK_log.
3627
if (global_system_variables.log_warnings)
3628
sql_print_information(_("next log '%s' is not active"),
3629
rli->linfo.log_file_name);
3631
// open_binlog() will check the magic header
3632
if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
3639
Read failed with a non-EOF error.
3640
TODO: come up with something better to handle this error
3643
pthread_mutex_unlock(log_lock);
3644
sql_print_error(_("Slave SQL thread: I/O error reading "
3645
"event(errno: %d cur_log->error: %d)"),
3646
my_errno,cur_log->error);
3647
// set read position to the beginning of the event
3648
my_b_seek(cur_log,rli->event_relay_log_pos);
3649
/* otherwise, we have had a partial read */
3650
errmsg = _("Aborting slave SQL thread because of partial event read");
3651
break; // To end of function
3654
if (!errmsg && global_system_variables.log_warnings)
3656
sql_print_information(_("Error reading relay log event: %s"),
3657
_("slave SQL thread was killed"));
3663
sql_print_error(_("Error reading relay log event: %s"), errmsg);
3668
Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
3669
because of size is simpler because when we do it we already have all relevant
3670
locks; here we don't, so this function is mainly taking locks).
3671
Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
3675
void rotate_relay_log(Master_info* mi)
3677
Relay_log_info* rli= &mi->rli;
3679
/* We don't lock rli->run_lock. This would lead to deadlocks. */
3680
pthread_mutex_lock(&mi->run_lock);
3683
We need to test inited because otherwise, new_file() will attempt to lock
3684
LOCK_log, which may not be inited (if we're not a slave).
3691
/* If the relay log is closed, new_file() will do nothing. */
3692
rli->relay_log.new_file();
3695
We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
3696
be counted, so imagine a succession of FLUSH LOGS and assume the slave
3697
threads are started:
3698
relay_log_space decreases by the size of the deleted relay log, but does
3699
not increase, so flush-after-flush we may become negative, which is wrong.
3700
Even if this will be corrected as soon as a query is replicated on the
3701
slave (because the I/O thread will then call harvest_bytes_written() which
3702
will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
3703
output in SHOW SLAVE STATUS meanwhile. So we harvest now.
3704
If the log is closed, then this will just harvest the last writes, probably
3705
0 as they probably have been harvested.
3707
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3709
pthread_mutex_unlock(&mi->run_lock);
3715
Detects, based on master's version (as found in the relay log), if master
3717
@param rli Relay_log_info which tells the master's version
3718
@param bug_id Number of the bug as found in bugs.mysql.com
3719
@param report bool report error message, default TRUE
3720
@return true if master has the bug, FALSE if it does not.
3722
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
3724
struct st_version_range_for_one_bug {
3726
const uchar introduced_in[3]; // first version with bug
3727
const uchar fixed_in[3]; // first version with fix
3729
static struct st_version_range_for_one_bug versions_for_all_bugs[]=
3731
{24432, { 5, 0, 24 }, { 5, 0, 38 } },
3732
{24432, { 5, 1, 12 }, { 5, 1, 17 } },
3733
{33029, { 5, 0, 0 }, { 5, 0, 58 } },
3734
{33029, { 5, 1, 0 }, { 5, 1, 12 } },
3736
const uchar *master_ver=
3737
rli->relay_log.description_event_for_exec->server_version_split;
3739
assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3742
i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3744
const uchar *introduced_in= versions_for_all_bugs[i].introduced_in,
3745
*fixed_in= versions_for_all_bugs[i].fixed_in;
3746
if ((versions_for_all_bugs[i].bug_id == bug_id) &&
3747
(memcmp(introduced_in, master_ver, 3) <= 0) &&
3748
(memcmp(fixed_in, master_ver, 3) > 0))
3753
// a short message for SHOW SLAVE STATUS (message length constraints)
3754
my_printf_error(ER_UNKNOWN_ERROR,
3755
_("master may suffer from"
3756
" http://bugs.mysql.com/bug.php?id=%u"
3757
" so slave stops; check error log on slave"
3758
" for more info"), MYF(0), bug_id);
3759
// a verbose message for the error log
3760
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
3761
_("According to the master's version ('%s'),"
3762
" it is probable that master suffers from this bug:"
3763
" http://bugs.mysql.com/bug.php?id=%u"
3764
" and thus replicating the current binary log event"
3765
" may make the slave's data become different from the"
3767
" To take no risk, slave refuses to replicate"
3768
" this event and stops."
3769
" We recommend that all updates be stopped on the"
3770
" master and slave, that the data of both be"
3771
" manually synchronized,"
3772
" that master's binary logs be deleted,"
3773
" that master be upgraded to a version at least"
3774
" equal to '%d.%d.%d'. Then replication can be"
3776
rli->relay_log.description_event_for_exec->server_version,
3778
fixed_in[0], fixed_in[1], fixed_in[2]);
3786
BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
3787
exclusive, if one statement in a SP generated AUTO_INCREMENT value
3788
by the top statement, all statements after it would be considered
3789
generated AUTO_INCREMENT value by the top statement, and a
3790
erroneous INSERT_ID value might be associated with these statement,
3791
which could cause duplicate entry error and stop the slave.
3793
Detect buggy master to work around.
3795
bool rpl_master_erroneous_autoinc(THD *thd)
3797
if (active_mi && active_mi->rli.sql_thd == thd)
3799
Relay_log_info *rli= &active_mi->rli;
3800
return rpl_master_has_bug(rli, 33029, false);
3805
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
3806
template class I_List_iterator<i_string>;
3807
template class I_List_iterator<i_string_pair>;
3811
@} (end of group Replication)
3814
#endif /* HAVE_REPLICATION */