30
30
#include "rpl_rli.h"
31
31
#include "sql_repl.h"
32
32
#include "rpl_filter.h"
33
#include "repl_failsafe.h"
33
34
#include <mysys/thr_alarm.h>
34
35
#include <libdrizzle/sql_common.h>
35
36
#include <libdrizzle/errmsg.h>
36
37
#include <mysys/mysys_err.h>
37
38
#include <drizzled/drizzled_error_messages.h>
39
#if TIME_WITH_SYS_TIME
40
# include <sys/time.h>
44
# include <sys/time.h>
40
#ifdef HAVE_REPLICATION
51
42
#include "rpl_tblmap.h"
242
if (active_mi->init_master_info(master_info_file, relay_log_info_file, (SLAVE_IO | SLAVE_SQL)))
233
if (init_master_info(active_mi,master_info_file,relay_log_info_file,
234
1, (SLAVE_IO | SLAVE_SQL)))
244
236
sql_print_error(_("Failed to initialize the master info structure"));
292
284
use_slave_mask = 1;
293
285
for (;my_isspace(system_charset_info,*arg);++arg)
295
if (!my_strnncoll(system_charset_info,(unsigned char*)arg,4,(const unsigned char*)"all",4))
287
if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
297
289
bitmap_set_all(&slave_error_mask);
584
576
once multi-master code is ready.
586
578
terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
587
active_mi->end_master_info();
579
end_master_info(active_mi);
588
580
delete active_mi;
646
638
(void)net_request_file(net, "/dev/null");
647
639
(void)my_net_read(net); // discard response
648
(void)net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0); // ok
640
(void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok
653
645
bool net_request_file(NET* net, const char* fname)
655
return(net_write_command(net, 251, (unsigned char*) fname, strlen(fname),
656
(unsigned char*) "", 0));
647
return(net_write_command(net, 251, (uchar*) fname, strlen(fname),
1083
1075
" to the relay log, SHOW SLAVE STATUS may be"
1084
1076
" inaccurate"));
1085
1077
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1078
if (flush_master_info(mi, 1))
1087
1079
sql_print_error(_("Failed to flush master info file"));
1102
1094
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
1103
1095
bool *suppress_warnings)
1105
unsigned char buf[1024], *pos= buf;
1097
uchar buf[1024], *pos= buf;
1106
1098
uint32_t report_host_len, report_user_len=0, report_password_len=0;
1108
1100
*suppress_warnings= false;
1119
1111
return(0); // safety
1121
1113
int4store(pos, server_id); pos+= 4;
1122
pos= net_store_data(pos, (unsigned char*) report_host, report_host_len);
1123
pos= net_store_data(pos, (unsigned char*) report_user, report_user_len);
1124
pos= net_store_data(pos, (unsigned char*) report_password, report_password_len);
1114
pos= net_store_data(pos, (uchar*) report_host, report_host_len);
1115
pos= net_store_data(pos, (uchar*) report_user, report_user_len);
1116
pos= net_store_data(pos, (uchar*) report_password, report_password_len);
1125
1117
int2store(pos, (uint16_t) report_port); pos+= 2;
1126
int4store(pos, 0); pos+= 4;
1118
int4store(pos, rpl_recovery_rank); pos+= 4;
1127
1119
/* The master will fill in master_id */
1128
1120
int4store(pos, 0); pos+= 4;
1194
1186
field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1195
1187
field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1196
1188
DRIZZLE_TYPE_LONGLONG));
1189
field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
1190
field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
1191
sizeof(mi->ssl_ca)));
1192
field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
1193
sizeof(mi->ssl_capath)));
1194
field_list.push_back(new Item_empty_string("Master_SSL_Cert",
1195
sizeof(mi->ssl_cert)));
1196
field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
1197
sizeof(mi->ssl_cipher)));
1198
field_list.push_back(new Item_empty_string("Master_SSL_Key",
1199
sizeof(mi->ssl_key)));
1197
1200
field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1198
1201
DRIZZLE_TYPE_LONGLONG));
1202
field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
1199
1204
field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
1200
1205
field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1201
1206
field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
1221
1226
pthread_mutex_lock(&mi->data_lock);
1222
1227
pthread_mutex_lock(&mi->rli.data_lock);
1223
protocol->store(mi->getHostname(), &my_charset_bin);
1224
protocol->store(mi->getUsername(), &my_charset_bin);
1225
protocol->store((uint32_t) mi->getPort());
1226
protocol->store(mi->getConnectionRetry());
1227
protocol->store(mi->getLogName(), &my_charset_bin);
1228
protocol->store((uint64_t) mi->getLogPosition());
1229
protocol->store(mi->rli.group_relay_log_name.c_str() +
1230
dirname_length(mi->rli.group_relay_log_name.c_str()),
1228
protocol->store(mi->host, &my_charset_bin);
1229
protocol->store(mi->user, &my_charset_bin);
1230
protocol->store((uint32_t) mi->port);
1231
protocol->store((uint32_t) mi->connect_retry);
1232
protocol->store(mi->master_log_name, &my_charset_bin);
1233
protocol->store((uint64_t) mi->master_log_pos);
1234
protocol->store(mi->rli.group_relay_log_name +
1235
dirname_length(mi->rli.group_relay_log_name),
1231
1236
&my_charset_bin);
1232
1237
protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1233
protocol->store(mi->rli.group_master_log_name.c_str(), &my_charset_bin);
1238
protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
1234
1239
protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1235
1240
"Yes" : "No", &my_charset_bin);
1236
1241
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1261
1266
protocol->store(mi->rli.until_log_name, &my_charset_bin);
1262
1267
protocol->store((uint64_t) mi->rli.until_log_pos);
1269
protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
1270
protocol->store(mi->ssl_ca, &my_charset_bin);
1271
protocol->store(mi->ssl_capath, &my_charset_bin);
1272
protocol->store(mi->ssl_cert, &my_charset_bin);
1273
protocol->store(mi->ssl_cipher, &my_charset_bin);
1274
protocol->store(mi->ssl_key, &my_charset_bin);
1265
1277
Seconds_Behind_Master: if SQL thread is running and I/O thread is
1266
1278
connected, we can compute it otherwise show NULL (i.e. unknown).
1285
1297
slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1286
1298
between timestamp of slave and rli->last_master_timestamp is 0
1287
1299
(i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1288
This confuses users, so we don't go below 0: hence the cmax().
1300
This confuses users, so we don't go below 0: hence the max().
1290
1302
last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1291
1303
special marker to say "consider we have caught up".
1293
1305
protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1294
cmax((long)0, time_diff) : 0));
1306
max((long)0, time_diff) : 0));
1298
1310
protocol->store_null();
1312
protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
1301
1314
// Last_IO_Errno
1302
1315
protocol->store(mi->last_error().number);
1310
1323
pthread_mutex_unlock(&mi->rli.data_lock);
1311
1324
pthread_mutex_unlock(&mi->data_lock);
1313
if (my_net_write(&thd->net, (unsigned char*) thd->packet.ptr(), packet->length()))
1326
if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
1355
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
1357
thd->variables.character_set_client=
1358
global_system_variables.character_set_client;
1359
thd->variables.collation_connection=
1360
global_system_variables.collation_connection;
1361
thd->variables.collation_server=
1362
global_system_variables.collation_server;
1363
thd->update_charset();
1366
We use a const cast here since the conceptual (and externally
1367
visible) behavior of the function is to set the default charset of
1368
the thread. That the cache has to be invalidated is a secondary
1371
const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
1343
1376
init_slave_thread()
1358
1391
thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1359
1392
+ MAX_LOG_EVENT_HEADER; /* note, incr over the global not session var */
1360
1393
thd->slave_thread = 1;
1394
thd->enable_slow_log= opt_log_slow_slave_statements;
1361
1395
set_slave_thread_options(thd);
1362
1396
thd->client_capabilities = CLIENT_LOCAL_FILES;
1363
1397
pthread_mutex_lock(&LOCK_thread_count);
1374
1408
lex_start(thd);
1376
1410
if (thd_type == SLAVE_THD_SQL)
1377
thd->set_proc_info("Waiting for the next event in relay log");
1411
thd_proc_info(thd, "Waiting for the next event in relay log");
1379
thd->set_proc_info("Waiting for master update");
1413
thd_proc_info(thd, "Waiting for master update");
1380
1414
thd->version=refresh_version;
1381
1415
thd->set_time();
1416
1450
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
1417
1451
bool *suppress_warnings)
1419
unsigned char buf[FN_REFLEN + 10];
1453
uchar buf[FN_REFLEN + 10];
1421
1455
int32_t binlog_flags = 0; // for now
1422
const char* logname = mi->getLogName();
1456
char* logname = mi->master_log_name;
1424
1458
*suppress_warnings= false;
1426
1460
// TODO if big log files: Change next to int8store()
1427
int4store(buf, (uint32_t) mi->getLogPosition());
1461
int4store(buf, (uint32_t) mi->master_log_pos);
1428
1462
int2store(buf + 4, binlog_flags);
1429
1463
int4store(buf + 6, server_id);
1430
1464
len = (uint32_t) strlen(logname);
1658
1692
" of the relay log information: the slave may"
1659
1693
" be in an inconsistent state."
1660
1694
" Stopped in %s position %s"),
1661
rli->group_relay_log_name.c_str(),
1695
rli->group_relay_log_name,
1662
1696
llstr(rli->group_relay_log_pos, buf));
1783
1817
if (rli->trans_retries < slave_trans_retries)
1785
if (rli->mi->init_master_info(0, 0, SLAVE_SQL))
1819
if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
1786
1820
sql_print_error(_("Failed to initialize the master info structure"));
1787
1821
else if (init_relay_log_pos(rli,
1788
rli->group_relay_log_name.c_str(),
1822
rli->group_relay_log_name,
1789
1823
rli->group_relay_log_pos,
1790
1824
1, &errmsg, 1))
1791
1825
sql_print_error(_("Error initializing relay log position: %s"),
1796
1830
end_trans(thd, ROLLBACK);
1797
1831
/* chance for concurrent connection to get more locks */
1798
safe_sleep(thd, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1832
safe_sleep(thd, min(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1799
1833
(CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1800
1834
pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1801
1835
rli->trans_retries++;
1869
1903
static int32_t try_to_reconnect(THD *thd, DRIZZLE *drizzle, Master_info *mi,
1870
uint32_t *retry_count, bool suppress_warnings,
1871
const char *messages[SLAVE_RECON_MSG_MAX])
1904
uint32_t *retry_count, bool suppress_warnings,
1905
const char *messages[SLAVE_RECON_MSG_MAX])
1873
1907
mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1874
1908
thd->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
1875
drizzle_disconnect(drizzle);
1909
#ifdef SIGNAL_WITH_VIO_CLOSE
1910
thd->clear_active_vio();
1912
end_server(drizzle);
1876
1913
if ((*retry_count)++)
1878
1915
if (*retry_count > master_retry_count)
1889
1926
char buf[256], llbuff[22];
1890
1927
snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1891
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
1928
IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
1893
1930
Raise a warining during registering on master/requesting dump.
1894
1931
Log a message reading event.
1969
thd->set_proc_info("Connecting to master");
2006
thd_proc_info(thd, "Connecting to master");
1970
2007
// we can get killed during safe_connect
1971
2008
if (!safe_connect(thd, drizzle, mi))
1973
2010
sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
1974
2011
"replication started in log '%s' at position %s"),
1975
mi->getUsername(), mi->getHostname(), mi->getPort(),
2012
mi->user, mi->host, mi->port,
1976
2013
IO_RPL_LOG_NAME,
1977
llstr(mi->getLogPosition(), llbuff));
2014
llstr(mi->master_log_pos,llbuff));
1979
2016
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
1980
2017
thread, since a replication event can become this much larger than
1993
2030
// TODO: the assignment below should be under mutex (5.0)
1994
2031
mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
1995
2032
thd->slave_net = &drizzle->net;
1996
thd->set_proc_info("Checking master version");
2033
thd_proc_info(thd, "Checking master version");
1997
2034
if (get_master_version_and_clock(drizzle, mi))
2003
2040
Register ourselves with the master.
2005
thd->set_proc_info("Registering slave on master");
2042
thd_proc_info(thd, "Registering slave on master");
2006
2043
if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2008
2045
if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
2031
2068
while (!io_slave_killed(thd,mi))
2033
thd->set_proc_info("Requesting binlog dump");
2070
thd_proc_info(thd, "Requesting binlog dump");
2034
2071
if (request_dump(drizzle, mi, &suppress_warnings))
2036
2073
sql_print_error(_("Failed on request_dump()"));
2060
2097
important thing is to not confuse users by saying "reading" whereas
2061
2098
we're in fact receiving nothing.
2063
thd->set_proc_info(_("Waiting for master to send event"));
2100
thd_proc_info(thd, _("Waiting for master to send event"));
2064
2101
event_len= read_event(drizzle, mi, &suppress_warnings);
2065
2102
if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while "
2066
2103
"reading event")))
2104
2141
} // if (event_len == packet_error)
2106
2143
retry_count=0; // ok event, reset retry counter
2107
thd->set_proc_info(_("Queueing master event to the relay log"));
2144
thd_proc_info(thd, _("Queueing master event to the relay log"));
2108
2145
if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2149
if (flush_master_info(mi, 1))
2114
2151
sql_print_error(_("Failed to flush master info file"));
2143
2180
// print the current replication position
2144
2181
sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2145
2182
"position %s"),
2146
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
2147
pthread_mutex_lock(&LOCK_thread_count);
2183
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2184
VOID(pthread_mutex_lock(&LOCK_thread_count));
2148
2185
thd->query = thd->db = 0; // extra safety
2149
2186
thd->query_length= thd->db_length= 0;
2150
pthread_mutex_unlock(&LOCK_thread_count);
2187
VOID(pthread_mutex_unlock(&LOCK_thread_count));
2158
2195
can be called in the middle of closing the VIO associated with
2159
2196
the 'mysql' object, causing a crash.
2198
#ifdef SIGNAL_WITH_VIO_CLOSE
2199
thd->clear_active_vio();
2161
2201
drizzle_close(drizzle);
2164
2204
write_ignored_events_info_to_relay_log(thd, mi);
2165
thd->set_proc_info(_("Waiting for slave mutex on exit"));
2205
thd_proc_info(thd, _("Waiting for slave mutex on exit"));
2166
2206
pthread_mutex_lock(&mi->run_lock);
2168
2208
/* Forget the relay log's format */
2169
2209
delete mi->rli.relay_log.description_event_for_queue;
2170
2210
mi->rli.relay_log.description_event_for_queue= 0;
2211
// TODO: make rpl_status part of Master_info
2212
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2171
2213
assert(thd->net.buff != 0);
2172
2214
net_end(&thd->net); // destructor will not free it, because net.vio is 0
2173
2215
close_thread_tables(thd);
2265
2307
rli->trans_retries= 0; // start from "no error"
2267
2309
if (init_relay_log_pos(rli,
2268
rli->group_relay_log_name.c_str(),
2310
rli->group_relay_log_name,
2269
2311
rli->group_relay_log_pos,
2270
2312
1 /*need data lock*/, &errmsg,
2271
2313
1 /*look for a description_event*/))
2297
2339
"position %s, relay log '%s' position: %s"),
2299
2341
llstr(rli->group_master_log_pos,llbuff),
2300
rli->group_relay_log_name.c_str(),
2342
rli->group_relay_log_name,
2301
2343
llstr(rli->group_relay_log_pos,llbuff1));
2303
2345
/* execute init_slave variable */
2333
2375
while (!sql_slave_killed(thd,rli))
2335
thd->set_proc_info(_("Reading event from the relay log"));
2377
thd_proc_info(thd, _("Reading event from the relay log"));
2336
2378
assert(rli->sql_thd == thd);
2337
2379
THD_CHECK_SENTRY(thd);
2338
2380
if (exec_relay_log_event(thd,rli))
2411
2453
must "proactively" clear playgrounds:
2413
2455
rli->cleanup_context(thd, 1);
2414
pthread_mutex_lock(&LOCK_thread_count);
2456
VOID(pthread_mutex_lock(&LOCK_thread_count));
2416
2458
Some extra safety, which should not been needed (normally, event deletion
2417
2459
should already have done these assignments (each event which sets these
2420
2462
thd->query= thd->db= thd->catalog= 0;
2421
2463
thd->query_length= thd->db_length= 0;
2422
pthread_mutex_unlock(&LOCK_thread_count);
2423
thd->set_proc_info("Waiting for slave mutex on exit");
2464
VOID(pthread_mutex_unlock(&LOCK_thread_count));
2465
thd_proc_info(thd, "Waiting for slave mutex on exit");
2424
2466
pthread_mutex_lock(&rli->run_lock);
2425
2467
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
2426
2468
pthread_mutex_lock(&rli->data_lock);
2434
2476
pthread_mutex_unlock(&rli->data_lock);
2435
2477
pthread_cond_broadcast(&rli->data_cond);
2436
2478
rli->ignore_log_space_limit= 0; /* don't need any lock */
2479
/* we die so won't remember charset - re-update them on next thread start */
2480
rli->cached_charset_invalidate();
2437
2481
rli->save_temporary_tables = thd->temporary_tables;
2515
2559
if (unlikely(!num_bytes)) /* eof */
2517
2561
/* 3.23 master wants it */
2518
net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0);
2562
net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0);
2520
2564
If we wrote Create_file_log_event, then we need to write
2521
2565
Execute_load_log_event. If we did not write Create_file_log_event,
2604
2648
/* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2605
mi->setLogName(rev->new_log_ident);
2606
mi->setLogPosition(rev->pos);
2649
memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
2650
mi->master_log_pos= rev->pos;
2608
2652
If we do not do this, we will be getting the first
2609
2653
rotate event forever, so we need to not disconnect after one.
2684
2728
"master could be corrupt but a more likely cause "
2685
2729
"of this is a bug"),
2687
free((char*) tmp_buf);
2731
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
2691
2735
pthread_mutex_lock(&mi->data_lock);
2692
ev->log_pos= mi->getLogPosition(); /* 3.23 events don't contain log_pos */
2736
ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
2693
2737
switch (ev->get_type_code()) {
2694
2738
case STOP_EVENT:
2695
2739
ignore_event= 1;
2718
2762
ev->log_pos+= inc_pos;
2719
2763
int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2721
mi->incrementLogPosition(inc_pos);
2765
mi->master_log_pos += inc_pos;
2722
2766
pthread_mutex_unlock(&mi->data_lock);
2723
free((char*)tmp_buf);
2767
my_free((char*)tmp_buf, MYF(0));
2951
2995
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2953
if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
2954
|| mi->getLogPosition() != hb.log_pos)
2997
if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
2998
&& mi->master_log_name != NULL)
2999
|| mi->master_log_pos != hb.log_pos)
2956
3001
/* missed events of heartbeat from the past */
2957
3002
error= ER_SLAVE_HEARTBEAT_FAILURE;
3008
3053
buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3009
3054
buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3011
mi->incrementLogPosition(inc_pos);
3012
memcpy(rli->ign_master_log_name_end, mi->getLogName(), FN_REFLEN);
3056
mi->master_log_pos+= inc_pos;
3057
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
3013
3058
assert(rli->ign_master_log_name_end[0]);
3014
rli->ign_master_log_pos_end= mi->getLogPosition();
3059
rli->ign_master_log_pos_end= mi->master_log_pos;
3016
3061
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3020
3065
/* write the event to the relay log */
3021
3066
if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3023
mi->incrementLogPosition(inc_pos);
3068
mi->master_log_pos+= inc_pos;
3024
3069
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3104
3149
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3105
bool reconnect, bool suppress_warnings)
3150
bool reconnect, bool suppress_warnings)
3107
3152
int32_t slave_was_killed;
3108
3153
int32_t last_errno= -2; // impossible error
3117
3162
drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3118
3163
drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3165
drizzle_options(drizzle, DRIZZLE_SET_CHARSET_NAME, default_charset_info->csname);
3166
/* This one is not strictly needed but we have it here for completeness */
3167
drizzle_options(drizzle, DRIZZLE_SET_CHARSET_DIR, (char *) charsets_dir);
3120
3169
while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3121
3170
(reconnect ? drizzle_reconnect(drizzle) != 0 :
3122
drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
3123
mi->getPort(), 0, client_flag) == 0))
3171
drizzle_connect(drizzle, mi->host, mi->user, mi->password, 0,
3172
mi->port, 0, client_flag) == 0))
3125
3174
/* Don't repeat last error */
3126
3175
if ((int32_t)drizzle_errno(drizzle) != last_errno)
3131
3180
_("error %s to master '%s@%s:%d'"
3132
3181
" - retry-time: %d retries: %u"),
3133
3182
(reconnect ? _("reconnecting") : _("connecting")),
3134
mi->getUsername(), mi->getHostname(), mi->getPort(),
3135
mi->getConnectionRetry(), master_retry_count);
3183
mi->user, mi->host, mi->port,
3184
mi->connect_retry, master_retry_count);
3138
3187
By default we try forever. The reason is that failure will trigger
3143
3192
if (++err_count == master_retry_count)
3145
3194
slave_was_killed=1;
3196
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
3148
3199
safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3156
3207
if (!suppress_warnings && global_system_variables.log_warnings)
3157
3208
sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3158
3209
"replication resumed in log '%s' at "
3159
"position %s"), mi->getUsername(),
3160
mi->getHostname(), mi->getPort(),
3210
"position %s"), mi->user,
3161
3212
IO_RPL_LOG_NAME,
3162
llstr(mi->getLogPosition(),llbuff));
3213
llstr(mi->master_log_pos,llbuff));
3217
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
3218
general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d",
3219
mi->user, mi->host, mi->port);
3221
#ifdef SIGNAL_WITH_VIO_CLOSE
3222
thd->set_active_vio(drizzle->net.vio);
3165
3225
drizzle->reconnect= 1;
3166
3226
return(slave_was_killed);
3214
3274
bool flush_relay_log_info(Relay_log_info* rli)
3218
3278
if (unlikely(rli->no_storage))
3281
IO_CACHE *file = &rli->info_file;
3282
char buff[FN_REFLEN*2+22*2+4], *pos;
3284
my_b_seek(file, 0L);
3285
pos=stpcpy(buff, rli->group_relay_log_name);
3287
pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
3289
pos=stpcpy(pos, rli->group_master_log_name);
3291
pos=int64_t2str(rli->group_master_log_pos, pos, 10);
3293
if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
3295
if (flush_io_cache(file))
3298
/* Flushing the relay log is done by the slave I/O thread */
3232
3310
assert(rli->cur_log_fd == -1);
3234
3312
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3235
if ((rli->cur_log_fd=open_binlog(cur_log, rli->event_relay_log_name.c_str(), errmsg)) < 0)
3313
if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
3238
3317
We want to start exactly where we was before:
3239
3318
relay_log_pos Current log pos
3240
3319
pending Number of bytes already processed from the event
3242
rli->event_relay_log_pos= cmax(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3321
rli->event_relay_log_pos= max(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3243
3322
my_b_seek(cur_log,rli->event_relay_log_pos);
3244
3323
return(cur_log);
3474
3553
if (rli->relay_log.purge_first_log
3476
3555
rli->group_relay_log_pos == rli->event_relay_log_pos
3477
&& !strcmp(rli->group_relay_log_name.c_str(), rli->event_relay_log_name.c_str())))
3556
&& !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
3479
3558
errmsg = "Error purging processed logs";
3497
3576
rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3498
rli->event_relay_log_name.assign(rli->linfo.log_file_name);
3577
strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
3578
sizeof(rli->event_relay_log_name)-1);
3499
3579
flush_relay_log_info(rli);
3644
3724
struct st_version_range_for_one_bug {
3645
3725
uint32_t bug_id;
3646
const unsigned char introduced_in[3]; // first version with bug
3647
const unsigned char fixed_in[3]; // first version with fix
3726
const uchar introduced_in[3]; // first version with bug
3727
const uchar fixed_in[3]; // first version with fix
3649
3729
static struct st_version_range_for_one_bug versions_for_all_bugs[]=
3653
3733
{33029, { 5, 0, 0 }, { 5, 0, 58 } },
3654
3734
{33029, { 5, 1, 0 }, { 5, 1, 12 } },
3656
const unsigned char *master_ver=
3736
const uchar *master_ver=
3657
3737
rli->relay_log.description_event_for_exec->server_version_split;
3659
3739
assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3661
3741
for (uint32_t i= 0;
3662
3742
i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3664
const unsigned char *introduced_in= versions_for_all_bugs[i].introduced_in,
3744
const uchar *introduced_in= versions_for_all_bugs[i].introduced_in,
3665
3745
*fixed_in= versions_for_all_bugs[i].fixed_in;
3666
3746
if ((versions_for_all_bugs[i].bug_id == bug_id) &&
3667
3747
(memcmp(introduced_in, master_ver, 3) <= 0) &&