26
26
#include <drizzled/server_includes.h>
28
28
#include <storage/myisam/myisam.h>
32
#include "rpl_filter.h"
33
#include "repl_failsafe.h"
29
#include <drizzled/replication/mi.h>
30
#include <drizzled/replication/rli.h>
31
#include <drizzled/replication/replication.h>
32
#include <libdrizzle/libdrizzle.h>
33
#include <mysys/hash.h>
34
34
#include <mysys/thr_alarm.h>
35
#include <libdrizzle/sql_common.h>
36
35
#include <libdrizzle/errmsg.h>
37
36
#include <mysys/mysys_err.h>
38
#include <drizzled/drizzled_error_messages.h>
40
#ifdef HAVE_REPLICATION
42
#include "rpl_tblmap.h"
44
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
37
#include <drizzled/error.h>
38
#include <drizzled/sql_parse.h>
39
#include <drizzled/gettext.h>
41
#include <drizzled/session.h>
42
#include <drizzled/log_event.h>
43
#include <drizzled/item/empty_string.h>
44
#include <drizzled/item/return_int.h>
46
#if TIME_WITH_SYS_TIME
47
# include <sys/time.h>
51
# include <sys/time.h>
57
#include <drizzled/tztime.h>
59
#include <drizzled/replication/tblmap.h>
46
61
#define MAX_SLAVE_RETRY_PAUSE 5
47
62
bool use_slave_mask = 0;
48
63
MY_BITMAP slave_error_mask;
50
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
65
typedef bool (*CHECK_KILLED_FUNC)(Session*,void*);
52
67
char* slave_load_tmpdir = 0;
53
68
Master_info *active_mi= 0;
89
104
N_("Waiting to reconnect after a failed registration on master"),
90
N_("Slave I/O thread killed while waitnig to reconnect after a "
105
N_("Slave I/O thread killed while waiting to reconnect after a "
91
106
"failed registration on master"),
92
107
N_("Reconnecting after a failed registration on master"),
93
108
N_("failed registering on master, reconnecting to try again, "
94
"log '%s' at postion %s"),
109
"log '%s' at position %s"),
95
110
"COM_REGISTER_SLAVE",
96
111
N_("Slave I/O thread killed during or after reconnect")
110
125
"after a failed read"),
111
126
N_("Reconnecting after a failed master event read"),
112
127
N_("Slave I/O thread: Failed reading log event, "
113
"reconnecting to retry, log '%s' at postion %s"),
128
"reconnecting to retry, log '%s' at position %s"),
115
130
N_("Slave I/O thread killed during or after a "
116
131
"reconnect done to recover from failed read")
121
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
136
typedef enum { SLAVE_Session_IO, SLAVE_Session_SQL} SLAVE_Session_TYPE;
123
138
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
124
139
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
125
140
static bool wait_for_relay_log_space(Relay_log_info* rli);
126
static inline bool io_slave_killed(THD* thd,Master_info* mi);
127
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
128
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
129
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi);
130
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
141
static inline bool io_slave_killed(Session* session,Master_info* mi);
142
static inline bool sql_slave_killed(Session* session,Relay_log_info* rli);
143
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type);
144
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi);
145
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
131
146
bool suppress_warnings);
132
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
147
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
133
148
bool reconnect, bool suppress_warnings);
134
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
149
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
135
150
void* thread_killed_arg);
136
151
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
137
152
static Log_event* next_event(Relay_log_info* rli);
138
153
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
139
static int32_t terminate_slave_thread(THD *thd,
154
static int32_t terminate_slave_thread(Session *session,
140
155
pthread_mutex_t* term_lock,
141
156
pthread_cond_t* term_cond,
142
157
volatile uint32_t *slave_running,
144
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
159
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info);
147
162
Find out which replications threads are running
394
408
while (*slave_running) // Should always be true
396
pthread_mutex_lock(&thd->LOCK_delete);
410
pthread_mutex_lock(&session->LOCK_delete);
397
411
#ifndef DONT_USE_THR_ALARM
399
413
Error codes from pthread_kill are:
400
414
EINVAL: invalid signal number (can't happen)
401
415
ESRCH: thread already killed (can happen, should be ignored)
403
int32_t err= pthread_kill(thd->real_id, thr_client_alarm);
417
int32_t err= pthread_kill(session->real_id, thr_client_alarm);
404
418
assert(err != EINVAL);
406
thd->awake(THD::NOT_KILLED);
407
pthread_mutex_unlock(&thd->LOCK_delete);
420
session->awake(Session::NOT_KILLED);
421
pthread_mutex_unlock(&session->LOCK_delete);
410
424
There is a small chance that slave thread might miss the first
475
489
if (start_cond && cond_lock) // caller has cond_lock
477
THD* thd = current_thd;
491
Session* session = current_session;
478
492
while (start_id == *slave_run_id)
480
const char* old_msg = thd->enter_cond(start_cond,cond_lock,
494
const char* old_msg = session->enter_cond(start_cond,cond_lock,
481
495
"Waiting for slave thread to start");
482
496
pthread_cond_wait(start_cond,cond_lock);
483
thd->exit_cond(old_msg);
497
session->exit_cond(old_msg);
484
498
pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
486
return(thd->killed_errno());
500
return(session->killed_errno());
504
518
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
506
const char* master_info_fname __attribute__((unused)),
507
const char* slave_info_fname __attribute__((unused)),
519
Master_info* mi, const char*, const char*,
508
520
int32_t thread_mask)
510
522
pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
588
static bool io_slave_killed(THD* thd, Master_info* mi)
600
static bool io_slave_killed(Session* session, Master_info* mi)
590
assert(mi->io_thd == thd);
602
assert(mi->io_session == session);
591
603
assert(mi->slave_running); // tracking buffer overrun
592
return(mi->abort_slave || abort_loop || thd->killed);
604
return(mi->abort_slave || abort_loop || session->killed);
596
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
608
static bool sql_slave_killed(Session* session, Relay_log_info* rli)
598
assert(rli->sql_thd == thd);
610
assert(rli->sql_session == session);
599
611
assert(rli->slave_running == 1);// tracking buffer overrun
600
if (abort_loop || thd->killed || rli->abort_slave)
612
if (abort_loop || session->killed || rli->abort_slave)
603
615
If we are in an unsafe situation (stopping could corrupt replication),
638
650
(void)net_request_file(net, "/dev/null");
639
651
(void)my_net_read(net); // discard response
640
(void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok
652
(void)net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0); // ok
645
657
bool net_request_file(NET* net, const char* fname)
647
return(net_write_command(net, 251, (uchar*) fname, strlen(fname),
659
return(net_write_command(net, 251, (unsigned char*) fname, strlen(fname),
660
(unsigned char*) "", 0));
731
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
743
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info)
733
if (io_slave_killed(thd, mi))
745
if (io_slave_killed(session, mi))
735
747
if (info && global_system_variables.log_warnings)
736
sql_print_information(info);
748
sql_print_information("%s",info);
847
859
(master_row= drizzle_fetch_row(master_res)))
849
861
mi->clock_diff_with_master=
850
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
862
(long) (time(NULL) - strtoul(master_row[0], 0, 10));
852
else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
864
else if (!check_io_slave_killed(mi->io_session, mi, NULL))
854
866
mi->clock_diff_with_master= 0; /* The "most sensible" value */
855
867
sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
979
991
const char query_format[]= "SET @master_heartbeat_period= %s";
980
992
char query[sizeof(query_format) - 2 + sizeof(llbuf)];
982
the period is an uint64_t of nano-secs.
994
the period is an uint64_t of nano-secs.
984
996
llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
985
997
sprintf(query, query_format, llbuf);
987
999
if (drizzle_real_query(drizzle, query, strlen(query))
988
&& !check_io_slave_killed(mi->io_thd, mi, NULL))
1000
&& !check_io_slave_killed(mi->io_session, mi, NULL))
990
1002
err_msg.append("The slave I/O thread stops because querying master with '");
991
1003
err_msg.append(query);
1020
1032
bool slave_killed=0;
1021
1033
Master_info* mi = rli->mi;
1022
1034
const char *save_proc_info;
1023
THD* thd = mi->io_thd;
1035
Session* session = mi->io_session;
1025
1037
pthread_mutex_lock(&rli->log_space_lock);
1026
save_proc_info= thd->enter_cond(&rli->log_space_cond,
1038
save_proc_info= session->enter_cond(&rli->log_space_cond,
1027
1039
&rli->log_space_lock,
1028
1040
_("Waiting for the slave SQL thread "
1029
1041
"to free enough relay log space"));
1030
1042
while (rli->log_space_limit < rli->log_space_total &&
1031
!(slave_killed=io_slave_killed(thd,mi)) &&
1043
!(slave_killed=io_slave_killed(session,mi)) &&
1032
1044
!rli->ignore_log_space_limit)
1033
1045
pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1034
thd->exit_cond(save_proc_info);
1046
session->exit_cond(save_proc_info);
1035
1047
return(slave_killed);
1049
1061
ignored events' end position for the use of the slave SQL thread, by
1050
1062
calling this function. Only that thread can call it (see assertion).
1052
static void write_ignored_events_info_to_relay_log(THD *thd __attribute__((unused)),
1064
static void write_ignored_events_info_to_relay_log(Session *session,
1053
1065
Master_info *mi)
1055
1067
Relay_log_info *rli= &mi->rli;
1056
1068
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1058
assert(thd == mi->io_thd);
1070
assert(session == mi->io_session);
1059
1071
pthread_mutex_lock(log_lock);
1060
1072
if (rli->ign_master_log_name_end[0])
1094
1106
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
1095
1107
bool *suppress_warnings)
1097
uchar buf[1024], *pos= buf;
1109
unsigned char buf[1024], *pos= buf;
1098
1110
uint32_t report_host_len, report_user_len=0, report_password_len=0;
1100
1112
*suppress_warnings= false;
1101
1113
if (!report_host)
1103
1115
report_host_len= strlen(report_host);
1105
report_user_len= strlen(report_user);
1106
if (report_password)
1107
report_password_len= strlen(report_password);
1108
1116
/* 30 is a good safety margin */
1109
1117
if (report_host_len + report_user_len + report_password_len + 30 >
1111
1119
return(0); // safety
1113
1121
int4store(pos, server_id); pos+= 4;
1114
pos= net_store_data(pos, (uchar*) report_host, report_host_len);
1115
pos= net_store_data(pos, (uchar*) report_user, report_user_len);
1116
pos= net_store_data(pos, (uchar*) report_password, report_password_len);
1122
pos= net_store_data(pos, (unsigned char*) report_host, report_host_len);
1123
pos= net_store_data(pos, NULL, report_user_len);
1124
pos= net_store_data(pos, NULL, report_password_len);
1117
1125
int2store(pos, (uint16_t) report_port); pos+= 2;
1118
int4store(pos, rpl_recovery_rank); pos+= 4;
1126
int4store(pos, 0); pos+= 4;
1119
1127
/* The master will fill in master_id */
1120
1128
int4store(pos, 0); pos+= 4;
1126
1134
*suppress_warnings= true; // Suppress reconnect warning
1128
else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
1136
else if (!check_io_slave_killed(mi->io_session, mi, NULL))
1131
snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle),
1139
snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle),
1132
1140
drizzle_errno(drizzle));
1133
1141
mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1134
1142
ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1168
1176
field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
1169
1177
field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1170
field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
1171
field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
1172
field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
1173
field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
1174
field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
1175
field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
1177
1178
field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
1178
1179
field_list.push_back(new Item_empty_string("Last_Error", 20));
1179
1180
field_list.push_back(new Item_return_int("Skip_Counter", 10,
1186
1187
field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1187
1188
field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1188
1189
DRIZZLE_TYPE_LONGLONG));
1189
field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
1190
field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
1191
sizeof(mi->ssl_ca)));
1192
field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
1193
sizeof(mi->ssl_capath)));
1194
field_list.push_back(new Item_empty_string("Master_SSL_Cert",
1195
sizeof(mi->ssl_cert)));
1196
field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
1197
sizeof(mi->ssl_cipher)));
1198
field_list.push_back(new Item_empty_string("Master_SSL_Key",
1199
sizeof(mi->ssl_key)));
1200
1190
field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1201
1191
DRIZZLE_TYPE_LONGLONG));
1202
field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
1204
1192
field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
1205
1193
field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1206
1194
field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
1213
1201
if (mi->host[0])
1215
String *packet= &thd->packet;
1203
String *packet= &session->packet;
1216
1204
protocol->prepare_for_resend();
1219
1207
slave_running can be accessed without run_lock but not other
1220
non-volotile members like mi->io_thd, which is guarded by the mutex.
1208
non-volotile members like mi->io_session, which is guarded by the mutex.
1222
1210
pthread_mutex_lock(&mi->run_lock);
1223
protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
1211
protocol->store(mi->io_session ? mi->io_session->get_proc_info() : "", &my_charset_bin);
1224
1212
pthread_mutex_unlock(&mi->run_lock);
1226
1214
pthread_mutex_lock(&mi->data_lock);
1227
1215
pthread_mutex_lock(&mi->rli.data_lock);
1228
protocol->store(mi->host, &my_charset_bin);
1229
protocol->store(mi->user, &my_charset_bin);
1230
protocol->store((uint32_t) mi->port);
1231
protocol->store((uint32_t) mi->connect_retry);
1232
protocol->store(mi->master_log_name, &my_charset_bin);
1233
protocol->store((uint64_t) mi->master_log_pos);
1234
protocol->store(mi->rli.group_relay_log_name +
1235
dirname_length(mi->rli.group_relay_log_name),
1216
protocol->store(mi->getHostname(), &my_charset_bin);
1217
protocol->store(mi->getUsername(), &my_charset_bin);
1218
protocol->store((uint32_t) mi->getPort());
1219
protocol->store(mi->getConnectionRetry());
1220
protocol->store(mi->getLogName(), &my_charset_bin);
1221
protocol->store((uint64_t) mi->getLogPosition());
1222
protocol->store(mi->rli.group_relay_log_name.c_str() +
1223
dirname_length(mi->rli.group_relay_log_name.c_str()),
1236
1224
&my_charset_bin);
1237
1225
protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1238
protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
1226
protocol->store(mi->rli.group_master_log_name.c_str(), &my_charset_bin);
1239
1227
protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1240
1228
"Yes" : "No", &my_charset_bin);
1241
1229
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1242
protocol->store(rpl_filter->get_do_db());
1243
protocol->store(rpl_filter->get_ignore_db());
1246
String tmp(buf, sizeof(buf), &my_charset_bin);
1247
rpl_filter->get_do_table(&tmp);
1248
protocol->store(&tmp);
1249
rpl_filter->get_ignore_table(&tmp);
1250
protocol->store(&tmp);
1251
rpl_filter->get_wild_do_table(&tmp);
1252
protocol->store(&tmp);
1253
rpl_filter->get_wild_ignore_table(&tmp);
1254
protocol->store(&tmp);
1256
1231
protocol->store(mi->rli.last_error().number);
1257
1232
protocol->store(mi->rli.last_error().message, &my_charset_bin);
1266
1241
protocol->store(mi->rli.until_log_name, &my_charset_bin);
1267
1242
protocol->store((uint64_t) mi->rli.until_log_pos);
1269
protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
1270
protocol->store(mi->ssl_ca, &my_charset_bin);
1271
protocol->store(mi->ssl_capath, &my_charset_bin);
1272
protocol->store(mi->ssl_cert, &my_charset_bin);
1273
protocol->store(mi->ssl_cipher, &my_charset_bin);
1274
protocol->store(mi->ssl_key, &my_charset_bin);
1277
1245
Seconds_Behind_Master: if SQL thread is running and I/O thread is
1278
1246
connected, we can compute it otherwise show NULL (i.e. unknown).
1297
1265
slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1298
1266
between timestamp of slave and rli->last_master_timestamp is 0
1299
1267
(i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1300
This confuses users, so we don't go below 0: hence the max().
1268
This confuses users, so we don't go below 0: hence the cmax().
1302
1270
last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1303
1271
special marker to say "consider we have caught up".
1305
1273
protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1306
max((long)0, time_diff) : 0));
1274
cmax((long)0, time_diff) : 0));
1310
1278
protocol->store_null();
1312
protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
1314
1281
// Last_IO_Errno
1315
1282
protocol->store(mi->last_error().number);
1342
1309
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1343
1310
only for client threads.
1345
uint64_t options= thd->options | OPTION_BIG_SELECTS;
1312
uint64_t options= session->options | OPTION_BIG_SELECTS;
1346
1313
if (opt_log_slave_updates)
1347
1314
options|= OPTION_BIN_LOG;
1349
1316
options&= ~OPTION_BIN_LOG;
1350
thd->options= options;
1351
thd->variables.completion_type= 0;
1355
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
1357
thd->variables.character_set_client=
1358
global_system_variables.character_set_client;
1359
thd->variables.collation_connection=
1360
global_system_variables.collation_connection;
1361
thd->variables.collation_server=
1362
global_system_variables.collation_server;
1363
thd->update_charset();
1366
We use a const cast here since the conceptual (and externally
1367
visible) behavior of the function is to set the default charset of
1368
the thread. That the cache has to be invalidated is a secondary
1371
const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
1317
session->options= options;
1318
session->variables.completion_type= 0;
1376
1323
init_slave_thread()
1379
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
1326
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type)
1381
1328
int32_t simulate_error= 0;
1382
thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
1329
session->system_thread = (session_type == SLAVE_Session_SQL) ?
1383
1330
SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1384
thd->security_ctx->skip_grants();
1385
my_net_init(&thd->net, 0);
1331
session->security_ctx.skip_grants();
1332
my_net_init(&session->net, 0);
1387
1334
Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1388
1335
slave threads, since a replication event can become this much larger
1389
1336
than the corresponding packet (query) sent from client to master.
1391
thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1338
session->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1392
1339
+ MAX_LOG_EVENT_HEADER; /* note, incr over the global not session var */
1393
thd->slave_thread = 1;
1394
thd->enable_slow_log= opt_log_slow_slave_statements;
1395
set_slave_thread_options(thd);
1396
thd->client_capabilities = CLIENT_LOCAL_FILES;
1340
session->slave_thread = 1;
1341
set_slave_thread_options(session);
1342
session->client_capabilities = CLIENT_LOCAL_FILES;
1397
1343
pthread_mutex_lock(&LOCK_thread_count);
1398
thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
1344
session->thread_id= session->variables.pseudo_thread_id= thread_id++;
1399
1345
pthread_mutex_unlock(&LOCK_thread_count);
1401
simulate_error|= (1 << SLAVE_THD_IO);
1402
simulate_error|= (1 << SLAVE_THD_SQL);
1403
if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
1347
simulate_error|= (1 << SLAVE_Session_IO);
1348
simulate_error|= (1 << SLAVE_Session_SQL);
1349
if (init_thr_lock() || session->store_globals() || simulate_error & (1<< session_type))
1410
if (thd_type == SLAVE_THD_SQL)
1411
thd_proc_info(thd, "Waiting for the next event in relay log");
1356
if (session_type == SLAVE_Session_SQL)
1357
session->set_proc_info("Waiting for the next event in relay log");
1413
thd_proc_info(thd, "Waiting for master update");
1414
thd->version=refresh_version;
1359
session->set_proc_info("Waiting for master update");
1360
session->version=refresh_version;
1361
session->set_time();
1420
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1421
void* thread_killed_arg)
1365
/* Returns non zero on error */
1366
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1367
void* thread_killed_arg)
1423
1369
int32_t nap_time;
1424
1370
thr_alarm_t alarmed;
1426
1372
thr_alarm_init(&alarmed);
1427
time_t start_time= my_time(0);
1428
time_t end_time= start_time+sec;
1373
time_t start_time, end_time;
1375
start_time= time(NULL);
1376
if (start_time == (time_t)-1)
1378
end_time= start_time+sec;
1430
1380
while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1450
1403
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
1451
1404
bool *suppress_warnings)
1453
uchar buf[FN_REFLEN + 10];
1406
unsigned char buf[FN_REFLEN + 10];
1455
1408
int32_t binlog_flags = 0; // for now
1456
char* logname = mi->master_log_name;
1409
const char* logname = mi->getLogName();
1458
1411
*suppress_warnings= false;
1460
1413
// TODO if big log files: Change next to int8store()
1461
int4store(buf, (uint32_t) mi->master_log_pos);
1414
int4store(buf, (uint32_t) mi->getLogPosition());
1462
1415
int2store(buf + 4, binlog_flags);
1463
1416
int4store(buf + 6, server_id);
1464
1417
len = (uint32_t) strlen(logname);
1568
1520
that the error is temporary by pushing a warning with the error code
1569
1521
ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1571
static int32_t has_temporary_error(THD *thd)
1523
static int32_t has_temporary_error(Session *session)
1573
if (thd->is_fatal_error)
1525
if (session->is_fatal_error)
1576
if (thd->main_da.is_error())
1528
if (session->main_da.is_error())
1530
session->clear_error();
1579
1531
my_error(ER_LOCK_DEADLOCK, MYF(0));
1583
If there is no message in THD, we can't say if it's a temporary
1535
If there is no message in Session, we can't say if it's a temporary
1584
1536
error or not. This is currently the case for Incident_log_event,
1585
1537
which sets no message. Return FALSE.
1587
if (!thd->is_error())
1539
if (!session->is_error())
1655
1607
has a Rotate etc).
1658
thd->server_id = ev->server_id; // use the original server id for logging
1659
thd->set_time(); // time the query
1660
thd->lex->current_select= 0;
1610
session->server_id = ev->server_id; // use the original server id for logging
1611
session->set_time(); // time the query
1612
session->lex->current_select= 0;
1662
ev->when= my_time(0);
1663
ev->thd = thd; // because up to this point, ev->thd == 0
1615
ev->when= time(NULL);
1616
if(ev->when == (time_t)-1)
1620
ev->session = session; // because up to this point, ev->session == 0
1817
1774
if (rli->trans_retries < slave_trans_retries)
1819
if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
1776
if (rli->mi->init_master_info(0, 0, SLAVE_SQL))
1820
1777
sql_print_error(_("Failed to initialize the master info structure"));
1821
1778
else if (init_relay_log_pos(rli,
1822
rli->group_relay_log_name,
1779
rli->group_relay_log_name.c_str(),
1823
1780
rli->group_relay_log_pos,
1824
1781
1, &errmsg, 1))
1825
1782
sql_print_error(_("Error initializing relay log position: %s"),
1830
end_trans(thd, ROLLBACK);
1787
end_trans(session, ROLLBACK);
1831
1788
/* chance for concurrent connection to get more locks */
1832
safe_sleep(thd, min(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1789
safe_sleep(session, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1833
1790
(CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1834
1791
pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1835
1792
rli->trans_retries++;
1887
1844
are taken from @c messages array. In case @c master_retry_count is exceeded,
1888
1845
no messages are added to the log.
1890
@param[in] thd Thread context.
1847
@param[in] session Thread context.
1891
1848
@param[in] DRIZZLE DRIZZLE connection.
1892
1849
@param[in] mi Master connection information.
1893
1850
@param[in,out] retry_count Number of attempts to reconnect.
1894
@param[in] suppress_warnings TRUE when a normal net read timeout
1851
@param[in] suppress_warnings TRUE when a normal net read timeout
1895
1852
has caused to reconnecting.
1896
@param[in] messages Messages to print/log, see
1853
@param[in] messages Messages to print/log, see
1897
1854
reconnect_messages[] array.
1900
1857
@retval 1 There was an error.
1903
static int32_t try_to_reconnect(THD *thd, DRIZZLE *drizzle, Master_info *mi,
1904
uint32_t *retry_count, bool suppress_warnings,
1905
const char *messages[SLAVE_RECON_MSG_MAX])
1860
static int32_t try_to_reconnect(Session *session, DRIZZLE *drizzle, Master_info *mi,
1861
uint32_t *retry_count, bool suppress_warnings,
1862
const char *messages[SLAVE_RECON_MSG_MAX])
1907
1864
mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1908
thd->proc_info= _(messages[SLAVE_RECON_MSG_WAIT]);
1909
#ifdef SIGNAL_WITH_VIO_CLOSE
1910
thd->clear_active_vio();
1912
end_server(drizzle);
1865
session->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
1866
drizzle_disconnect(drizzle);
1913
1867
if ((*retry_count)++)
1915
1869
if (*retry_count > master_retry_count)
1916
1870
return 1; // Don't retry forever
1917
safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1871
safe_sleep(session, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1920
if (check_io_slave_killed(thd, mi,
1874
if (check_io_slave_killed(session, mi,
1921
1875
_(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1923
thd->proc_info = _(messages[SLAVE_RECON_MSG_AFTER]);
1877
session->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
1924
1878
if (!suppress_warnings)
1926
1880
char buf[256], llbuff[22];
1927
1881
snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1928
IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
1882
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
1930
1884
Raise a warining during registering on master/requesting dump.
1931
1885
Log a message reading event.
1941
sql_print_information(buf);
1895
sql_print_information("%s",buf);
1944
if (safe_reconnect(thd, drizzle, mi, 1) || io_slave_killed(thd, mi))
1898
if (safe_reconnect(session, drizzle, mi, 1) || io_slave_killed(session, mi))
1946
1900
if (global_system_variables.log_warnings)
1947
sql_print_information(_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1901
sql_print_information("%s",_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1976
1930
mi->events_till_disconnect = disconnect_slave_event_count;
1979
THD_CHECK_SENTRY(thd);
1932
session= new Session;
1933
Session_CHECK_SENTRY(session);
1934
mi->io_session = session;
1982
1936
pthread_detach_this_thread();
1983
thd->thread_stack= (char*) &thd; // remember where our stack is
1984
if (init_slave_thread(thd, SLAVE_THD_IO))
1937
session->thread_stack= (char*) &session; // remember where our stack is
1938
if (init_slave_thread(session, SLAVE_Session_IO))
1986
1940
pthread_cond_broadcast(&mi->start_cond);
1987
1941
pthread_mutex_unlock(&mi->run_lock);
2006
thd_proc_info(thd, "Connecting to master");
1960
session->set_proc_info("Connecting to master");
2007
1961
// we can get killed during safe_connect
2008
if (!safe_connect(thd, drizzle, mi))
1962
if (!safe_connect(session, drizzle, mi))
2010
1964
sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
2011
1965
"replication started in log '%s' at position %s"),
2012
mi->user, mi->host, mi->port,
1966
mi->getUsername(), mi->getHostname(), mi->getPort(),
2013
1967
IO_RPL_LOG_NAME,
2014
llstr(mi->master_log_pos,llbuff));
1968
llstr(mi->getLogPosition(), llbuff));
2016
1970
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
2017
1971
thread, since a replication event can become this much larger than
2018
1972
the corresponding packet (query) sent from client to master.
2020
drizzle->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
1974
drizzle->net.max_packet_size= session->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
2030
1984
// TODO: the assignment below should be under mutex (5.0)
2031
1985
mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
2032
thd->slave_net = &drizzle->net;
2033
thd_proc_info(thd, "Checking master version");
1986
session->slave_net = &drizzle->net;
1987
session->set_proc_info("Checking master version");
2034
1988
if (get_master_version_and_clock(drizzle, mi))
2037
1991
if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
2040
1994
Register ourselves with the master.
2042
thd_proc_info(thd, "Registering slave on master");
1996
session->set_proc_info("Registering slave on master");
2043
1997
if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2045
if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
1999
if (!check_io_slave_killed(session, mi, "Slave I/O thread killed "
2046
2000
"while registering slave on master"))
2048
2002
sql_print_error(_("Slave I/O thread couldn't register on master"));
2049
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2003
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2050
2004
reconnect_messages[SLAVE_RECON_ACT_REG]))
2059
2013
retry_count_reg++;
2060
2014
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2061
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2015
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2062
2016
reconnect_messages[SLAVE_RECON_ACT_REG]))
2064
2018
goto connected;
2068
while (!io_slave_killed(thd,mi))
2022
while (!io_slave_killed(session,mi))
2070
thd_proc_info(thd, "Requesting binlog dump");
2024
session->set_proc_info("Requesting binlog dump");
2071
2025
if (request_dump(drizzle, mi, &suppress_warnings))
2073
2027
sql_print_error(_("Failed on request_dump()"));
2074
if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while \
2028
if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while \
2075
2029
requesting master dump")) ||
2076
try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2030
try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2077
2031
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2079
2033
goto connected;
2083
2037
retry_count_dump++;
2084
2038
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2085
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2039
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2086
2040
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2088
2042
goto connected;
2091
while (!io_slave_killed(thd,mi))
2045
while (!io_slave_killed(session,mi))
2093
2047
uint32_t event_len;
2097
2051
important thing is to not confuse users by saying "reading" whereas
2098
2052
we're in fact receiving nothing.
2100
thd_proc_info(thd, _("Waiting for master to send event"));
2054
session->set_proc_info(_("Waiting for master to send event"));
2101
2055
event_len= read_event(drizzle, mi, &suppress_warnings);
2102
if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while "
2056
if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while "
2103
2057
"reading event")))
2105
2059
if (!retry_count_event)
2107
2061
retry_count_event++;
2108
2062
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2109
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2063
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2110
2064
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2112
2066
goto connected;
2118
2072
switch (drizzle_error_number) {
2119
2073
case CR_NET_PACKET_TOO_LARGE:
2120
2074
sql_print_error(_("Log entry on master is longer than "
2121
"max_allowed_packet (%ld) on "
2075
"max_allowed_packet (%u) on "
2122
2076
"slave. If the entry is correct, restart the "
2123
2077
"server with a higher value of "
2124
2078
"max_allowed_packet"),
2125
thd->variables.max_allowed_packet);
2079
session->variables.max_allowed_packet);
2127
2081
case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2128
2082
sql_print_error(ER(drizzle_error_number), drizzle_error_number,
2134
2088
_("Stopping slave I/O thread due to out-of-memory error from master"));
2137
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2091
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2138
2092
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2140
2094
goto connected;
2141
2095
} // if (event_len == packet_error)
2143
2097
retry_count=0; // ok event, reset retry counter
2144
thd_proc_info(thd, _("Queueing master event to the relay log"));
2098
session->set_proc_info(_("Queuing master event to the relay log"));
2145
2099
if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2149
if (flush_master_info(mi, 1))
2151
2105
sql_print_error(_("Failed to flush master info file"));
2180
2134
// print the current replication position
2181
2135
sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2182
2136
"position %s"),
2183
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2184
VOID(pthread_mutex_lock(&LOCK_thread_count));
2185
thd->query = thd->db = 0; // extra safety
2186
thd->query_length= thd->db_length= 0;
2187
VOID(pthread_mutex_unlock(&LOCK_thread_count));
2137
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
2138
pthread_mutex_lock(&LOCK_thread_count);
2139
session->query = session->db = 0; // extra safety
2140
session->query_length= session->db_length= 0;
2141
pthread_mutex_unlock(&LOCK_thread_count);
2191
2145
Here we need to clear the active VIO before closing the
2192
connection with the master. The reason is that THD::awake()
2146
connection with the master. The reason is that Session::awake()
2193
2147
might be called from terminate_slave_thread() because somebody
2194
2148
issued a STOP SLAVE. If that happends, the close_active_vio()
2195
2149
can be called in the middle of closing the VIO associated with
2196
2150
the 'mysql' object, causing a crash.
2198
#ifdef SIGNAL_WITH_VIO_CLOSE
2199
thd->clear_active_vio();
2201
2152
drizzle_close(drizzle);
2204
write_ignored_events_info_to_relay_log(thd, mi);
2205
thd_proc_info(thd, _("Waiting for slave mutex on exit"));
2155
write_ignored_events_info_to_relay_log(session, mi);
2156
session->set_proc_info(_("Waiting for slave mutex on exit"));
2206
2157
pthread_mutex_lock(&mi->run_lock);
2208
2159
/* Forget the relay log's format */
2209
2160
delete mi->rli.relay_log.description_event_for_queue;
2210
2161
mi->rli.relay_log.description_event_for_queue= 0;
2211
// TODO: make rpl_status part of Master_info
2212
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2213
assert(thd->net.buff != 0);
2214
net_end(&thd->net); // destructor will not free it, because net.vio is 0
2215
close_thread_tables(thd);
2162
assert(session->net.buff != 0);
2163
net_end(&session->net); // destructor will not free it, because net.vio is 0
2164
close_thread_tables(session);
2216
2165
pthread_mutex_lock(&LOCK_thread_count);
2217
THD_CHECK_SENTRY(thd);
2166
Session_CHECK_SENTRY(session);
2219
2168
pthread_mutex_unlock(&LOCK_thread_count);
2220
2169
mi->abort_slave= 0;
2221
2170
mi->slave_running= 0;
2224
2173
Note: the order of the two following calls (first broadcast, then unlock)
2225
2174
is important. Otherwise a killer_thread can execute between the calls and
2226
2175
delete the mi structure leading to a crash! (see BUG#25306 for details)
2228
2177
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
2229
2178
pthread_mutex_unlock(&mi->run_lock);
2230
2179
my_thread_end();
2252
2201
rli->events_till_abort = abort_slave_event_count;
2255
thd->thread_stack = (char*)&thd; // remember where our stack is
2203
session = new Session;
2204
session->thread_stack = (char*)&session; // remember where our stack is
2205
rli->sql_session= session;
2258
2207
/* Inform waiting threads that slave has started */
2259
2208
rli->slave_run_id++;
2260
2209
rli->slave_running = 1;
2262
2211
pthread_detach_this_thread();
2263
if (init_slave_thread(thd, SLAVE_THD_SQL))
2212
if (init_slave_thread(session, SLAVE_Session_SQL))
2266
2215
TODO: this is currently broken - slave start and change master
2307
2256
rli->trans_retries= 0; // start from "no error"
2309
2258
if (init_relay_log_pos(rli,
2310
rli->group_relay_log_name,
2259
rli->group_relay_log_name.c_str(),
2311
2260
rli->group_relay_log_pos,
2312
2261
1 /*need data lock*/, &errmsg,
2313
2262
1 /*look for a description_event*/))
2339
2288
"position %s, relay log '%s' position: %s"),
2341
2290
llstr(rli->group_master_log_pos,llbuff),
2342
rli->group_relay_log_name,
2291
rli->group_relay_log_name.c_str(),
2343
2292
llstr(rli->group_relay_log_pos,llbuff1));
2345
2294
/* execute init_slave variable */
2346
2295
if (sys_init_slave.value_length)
2348
execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
2349
if (thd->is_slave_error)
2297
execute_init_command(session, &sys_init_slave, &LOCK_sys_init_slave);
2298
if (session->is_slave_error)
2351
2300
sql_print_error(_("Slave SQL thread aborted. "
2352
2301
"Can't execute init_slave query"));
2373
2322
/* Read queries from the IO/THREAD until this thread is killed */
2375
while (!sql_slave_killed(thd,rli))
2324
while (!sql_slave_killed(session,rli))
2377
thd_proc_info(thd, _("Reading event from the relay log"));
2378
assert(rli->sql_thd == thd);
2379
THD_CHECK_SENTRY(thd);
2380
if (exec_relay_log_event(thd,rli))
2326
session->set_proc_info(_("Reading event from the relay log"));
2327
assert(rli->sql_session == session);
2328
Session_CHECK_SENTRY(session);
2329
if (exec_relay_log_event(session,rli))
2382
2331
// do not scare the user if SQL thread was simply killed or stopped
2383
if (!sql_slave_killed(thd,rli))
2332
if (!sql_slave_killed(session,rli))
2386
retrieve as much info as possible from the thd and, error
2335
retrieve as much info as possible from the session and, error
2387
2336
codes and warnings and print this to the error log as to
2388
2337
allow the user to locate the error
2390
2339
uint32_t const last_errno= rli->last_error().number;
2392
if (thd->is_error())
2341
if (session->is_error())
2394
char const *const errmsg= thd->main_da.message();
2343
char const *const errmsg= session->main_da.message();
2396
2345
if (last_errno == 0)
2398
rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), errmsg);
2347
rli->report(ERROR_LEVEL, session->main_da.sql_errno(), "%s", errmsg);
2400
else if (last_errno != thd->main_da.sql_errno())
2349
else if (last_errno != session->main_da.sql_errno())
2402
2351
sql_print_error(_("Slave (additional info): %s Error_code: %d"),
2403
errmsg, thd->main_da.sql_errno());
2352
errmsg, session->main_da.sql_errno());
2407
2356
/* Print any warnings issued */
2408
List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
2357
List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
2409
2358
DRIZZLE_ERROR *err;
2411
2360
Added controlled slave thread cancel for replication
2452
2401
request is detected only by the present function, not by events), so we
2453
2402
must "proactively" clear playgrounds:
2455
rli->cleanup_context(thd, 1);
2456
VOID(pthread_mutex_lock(&LOCK_thread_count));
2404
rli->cleanup_context(session, 1);
2405
pthread_mutex_lock(&LOCK_thread_count);
2458
2407
Some extra safety, which should not been needed (normally, event deletion
2459
2408
should already have done these assignments (each event which sets these
2460
2409
variables is supposed to set them to 0 before terminating)).
2462
thd->query= thd->db= thd->catalog= 0;
2463
thd->query_length= thd->db_length= 0;
2464
VOID(pthread_mutex_unlock(&LOCK_thread_count));
2465
thd_proc_info(thd, "Waiting for slave mutex on exit");
2411
session->query= session->db= session->catalog= 0;
2412
session->query_length= session->db_length= 0;
2413
pthread_mutex_unlock(&LOCK_thread_count);
2414
session->set_proc_info("Waiting for slave mutex on exit");
2466
2415
pthread_mutex_lock(&rli->run_lock);
2467
2416
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
2468
2417
pthread_mutex_lock(&rli->data_lock);
2476
2425
pthread_mutex_unlock(&rli->data_lock);
2477
2426
pthread_cond_broadcast(&rli->data_cond);
2478
2427
rli->ignore_log_space_limit= 0; /* don't need any lock */
2479
/* we die so won't remember charset - re-update them on next thread start */
2480
rli->cached_charset_invalidate();
2481
rli->save_temporary_tables = thd->temporary_tables;
2428
rli->save_temporary_tables = session->temporary_tables;
2484
2431
TODO: see if we can do this conditionally in next_event() instead
2485
2432
to avoid unneeded position re-init
2487
thd->temporary_tables = 0; // remove tempation from destructor to close them
2488
assert(thd->net.buff != 0);
2489
net_end(&thd->net); // destructor will not free it, because we are weird
2490
assert(rli->sql_thd == thd);
2491
THD_CHECK_SENTRY(thd);
2434
session->temporary_tables = 0; // remove tempation from destructor to close them
2435
assert(session->net.buff != 0);
2436
net_end(&session->net); // destructor will not free it, because we are weird
2437
assert(rli->sql_session == session);
2438
Session_CHECK_SENTRY(session);
2439
rli->sql_session= 0;
2493
2440
pthread_mutex_lock(&LOCK_thread_count);
2494
THD_CHECK_SENTRY(thd);
2441
Session_CHECK_SENTRY(session);
2496
2443
pthread_mutex_unlock(&LOCK_thread_count);
2498
2445
Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
2499
2446
is important. Otherwise a killer_thread can execute between the calls and
2500
2447
delete the mi structure leading to a crash! (see BUG#25306 for details)
2502
2449
pthread_cond_broadcast(&rli->stop_cond);
2503
2450
pthread_mutex_unlock(&rli->run_lock); // tell the world we are done
2505
2452
my_thread_end();
2506
2453
pthread_exit(0);
2507
2454
return(0); // Can't return anything here
2517
2464
int32_t error = 1;
2518
2465
uint32_t num_bytes;
2519
2466
bool cev_not_written;
2520
THD *thd = mi->io_thd;
2467
Session *session = mi->io_session;
2521
2468
NET *net = &mi->drizzle->net;
2523
2470
if (unlikely(!cev->is_valid()))
2526
if (!rpl_filter->db_ok(cev->db))
2528
skip_load_data_infile(net);
2531
2473
assert(cev->inited_from_old);
2532
thd->file_id = cev->file_id = mi->file_id++;
2533
thd->server_id = cev->server_id;
2474
session->file_id = cev->file_id = mi->file_id++;
2475
session->server_id = cev->server_id;
2534
2476
cev_not_written = 1;
2536
2478
if (unlikely(net_request_file(net,cev->fname)))
2559
2501
if (unlikely(!num_bytes)) /* eof */
2561
2503
/* 3.23 master wants it */
2562
net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0);
2504
net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0);
2564
2506
If we wrote Create_file_log_event, then we need to write
2565
2507
Execute_load_log_event. If we did not write Create_file_log_event,
3053
2994
buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3054
2995
buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3056
mi->master_log_pos+= inc_pos;
3057
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
2997
mi->incrementLogPosition(inc_pos);
2998
memcpy(rli->ign_master_log_name_end, mi->getLogName(), FN_REFLEN);
3058
2999
assert(rli->ign_master_log_name_end[0]);
3059
rli->ign_master_log_pos_end= mi->master_log_pos;
3000
rli->ign_master_log_pos_end= mi->getLogPosition();
3061
3002
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3134
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi)
3075
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi)
3136
return(connect_to_master(thd, drizzle, mi, 0, 0));
3077
return(connect_to_master(session, drizzle, mi, 0, 0));
3146
3087
master_retry_count times
3149
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3150
bool reconnect, bool suppress_warnings)
3090
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
3091
bool reconnect, bool suppress_warnings)
3152
3093
int32_t slave_was_killed;
3153
3094
int32_t last_errno= -2; // impossible error
3162
3103
drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3163
3104
drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3165
drizzle_options(drizzle, DRIZZLE_SET_CHARSET_NAME, default_charset_info->csname);
3166
/* This one is not strictly needed but we have it here for completeness */
3167
drizzle_options(drizzle, DRIZZLE_SET_CHARSET_DIR, (char *) charsets_dir);
3169
while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3106
while (!(slave_was_killed = io_slave_killed(session,mi)) &&
3170
3107
(reconnect ? drizzle_reconnect(drizzle) != 0 :
3171
drizzle_connect(drizzle, mi->host, mi->user, mi->password, 0,
3172
mi->port, 0, client_flag) == 0))
3108
drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
3109
mi->getPort(), 0, client_flag) == 0))
3174
3111
/* Don't repeat last error */
3175
3112
if ((int32_t)drizzle_errno(drizzle) != last_errno)
3180
3117
_("error %s to master '%s@%s:%d'"
3181
3118
" - retry-time: %d retries: %u"),
3182
3119
(reconnect ? _("reconnecting") : _("connecting")),
3183
mi->user, mi->host, mi->port,
3184
mi->connect_retry, master_retry_count);
3120
mi->getUsername(), mi->getHostname(), mi->getPort(),
3121
mi->getConnectionRetry(), master_retry_count);
3187
3124
By default we try forever. The reason is that failure will trigger
3207
3142
if (!suppress_warnings && global_system_variables.log_warnings)
3208
3143
sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3209
3144
"replication resumed in log '%s' at "
3210
"position %s"), mi->user,
3145
"position %s"), mi->getUsername(),
3146
mi->getHostname(), mi->getPort(),
3212
3147
IO_RPL_LOG_NAME,
3213
llstr(mi->master_log_pos,llbuff));
3217
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
3218
general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d",
3219
mi->user, mi->host, mi->port);
3221
#ifdef SIGNAL_WITH_VIO_CLOSE
3222
thd->set_active_vio(drizzle->net.vio);
3148
llstr(mi->getLogPosition(),llbuff));
3225
3151
drizzle->reconnect= 1;
3226
3152
return(slave_was_killed);
3235
3161
master_retry_count times
3238
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3164
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
3239
3165
bool suppress_warnings)
3241
return(connect_to_master(thd, drizzle, mi, 1, suppress_warnings));
3167
return(connect_to_master(session, drizzle, mi, 1, suppress_warnings));
3274
3200
bool flush_relay_log_info(Relay_log_info* rli)
3278
3204
if (unlikely(rli->no_storage))
3281
IO_CACHE *file = &rli->info_file;
3282
char buff[FN_REFLEN*2+22*2+4], *pos;
3284
my_b_seek(file, 0L);
3285
pos=stpcpy(buff, rli->group_relay_log_name);
3287
pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
3289
pos=stpcpy(pos, rli->group_master_log_name);
3291
pos=int64_t2str(rli->group_master_log_pos, pos, 10);
3293
if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
3295
if (flush_io_cache(file))
3298
/* Flushing the relay log is done by the slave I/O thread */
3310
3218
assert(rli->cur_log_fd == -1);
3312
3220
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3313
if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
3221
if ((rli->cur_log_fd=open_binlog(cur_log, rli->event_relay_log_name.c_str(), errmsg)) < 0)
3317
3224
We want to start exactly where we was before:
3318
3225
relay_log_pos Current log pos
3319
3226
pending Number of bytes already processed from the event
3321
rli->event_relay_log_pos= max(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3228
rli->event_relay_log_pos= cmax(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3322
3229
my_b_seek(cur_log,rli->event_relay_log_pos);
3323
3230
return(cur_log);
3733
3639
{33029, { 5, 0, 0 }, { 5, 0, 58 } },
3734
3640
{33029, { 5, 1, 0 }, { 5, 1, 12 } },
3736
const uchar *master_ver=
3642
const unsigned char *master_ver=
3737
3643
rli->relay_log.description_event_for_exec->server_version_split;
3739
3645
assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);