388
399
const char *errmsg = "Unknown error";
389
400
NET* net = &thd->net;
390
401
pthread_mutex_t *log_lock;
391
bool binlog_can_be_corrupted= false;
402
bool binlog_can_be_corrupted= FALSE;
404
int left_events = max_binlog_dump_events;
406
DBUG_ENTER("mysql_binlog_send");
407
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
393
memset(&log, 0, sizeof(log));
409
bzero((char*) &log,sizeof(log));
395
411
heartbeat_period from @master_heartbeat_period user variable
397
uint64_t heartbeat_period= get_heartbeat_period(thd);
413
ulonglong heartbeat_period= get_heartbeat_period(thd);
398
414
struct timespec heartbeat_buf;
399
415
struct event_coordinates coord_buf;
400
416
struct timespec *heartbeat_ts= NULL;
401
417
struct event_coordinates *coord= NULL;
402
if (heartbeat_period != 0L)
418
if (heartbeat_period != LL(0))
404
420
heartbeat_ts= &heartbeat_buf;
405
421
set_timespec_nsec(*heartbeat_ts, 0);
676
723
case LOG_READ_EOF:
726
DBUG_PRINT("wait",("waiting for data in binary log"));
679
727
if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
681
729
pthread_mutex_unlock(log_lock);
734
ulong hb_info_counter= 0;
689
assert(heartbeat_ts && heartbeat_period != 0L);
740
DBUG_ASSERT(heartbeat_ts && heartbeat_period != LL(0));
690
741
set_timespec_nsec(*heartbeat_ts, heartbeat_period);
692
743
ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
693
assert(ret == 0 || (heartbeat_period != 0L && coord != NULL));
744
DBUG_ASSERT(ret == 0 || heartbeat_period != LL(0) && coord != NULL);
694
745
if (ret == ETIMEDOUT || ret == ETIME)
748
if (hb_info_counter < 3)
750
sql_print_information("master sends heartbeat message");
752
if (hb_info_counter == 3)
753
sql_print_information("the rest of heartbeat info skipped ...");
696
756
if (send_heartbeat_event(net, packet, coord))
698
758
errmsg = "Failed on my_net_write()";
1131
1200
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1132
1201
unlock_slave_threads(mi);
1136
thd->set_proc_info("Changing master");
1205
thd_proc_info(thd, "Changing master");
1137
1206
LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1138
1207
// TODO: see if needs re-write
1139
if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
1208
if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
1141
1211
my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1142
1212
unlock_slave_threads(mi);
1157
1227
if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1229
mi->master_log_name[0] = 0;
1230
mi->master_log_pos= BIN_LOG_HEADER_SIZE;
1160
1233
if (lex_mi->log_file_name)
1161
mi->setLogName(lex_mi->log_file_name);
1234
strmake(mi->master_log_name, lex_mi->log_file_name,
1235
sizeof(mi->master_log_name)-1);
1162
1236
if (lex_mi->pos)
1164
mi->setLogPosition(lex_mi->pos);
1238
mi->master_log_pos= lex_mi->pos;
1240
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
1167
1242
if (lex_mi->host)
1168
mi->setHost(lex_mi->host, lex_mi->port);
1243
strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1169
1244
if (lex_mi->user)
1170
mi->setUsername(lex_mi->user);
1245
strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
1171
1246
if (lex_mi->password)
1172
mi->setPassword(lex_mi->password);
1247
strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
1249
mi->port = lex_mi->port;
1173
1250
if (lex_mi->connect_retry)
1174
1251
mi->connect_retry = lex_mi->connect_retry;
1175
1252
if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1176
1253
mi->heartbeat_period = lex_mi->heartbeat_period;
1178
mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
1255
mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
1179
1256
(slave_net_timeout/2.0));
1180
mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
1257
mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd
1258
if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1259
mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
1261
if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1262
mi->ssl_verify_server_cert=
1263
(lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
1266
strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
1267
if (lex_mi->ssl_capath)
1268
strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
1269
if (lex_mi->ssl_cert)
1270
strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
1271
if (lex_mi->ssl_cipher)
1272
strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
1273
if (lex_mi->ssl_key)
1274
strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
1275
if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
1276
lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
1277
lex_mi->ssl_verify_server_cert )
1278
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
1279
ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1182
1281
if (lex_mi->relay_log_name)
1184
1283
need_relay_log_purge= 0;
1185
mi->rli.event_relay_log_name.assign(lex_mi->relay_log_name);
1284
strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
1285
sizeof(mi->rli.group_relay_log_name)-1);
1286
strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
1287
sizeof(mi->rli.event_relay_log_name)-1);
1188
1290
if (lex_mi->relay_log_pos)
1213
1315
Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1214
not initialized), so we use a cmax().
1316
not initialized), so we use a max().
1215
1317
What happens to mi->rli.master_log_pos during the initialization stages
1216
1318
of replication is not 100% clear, so we guard against problems using
1219
mi->setLogPosition(((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1220
? BIN_LOG_HEADER_SIZE
1221
: mi->rli.group_master_log_pos));
1222
mi->setLogName(mi->rli.group_master_log_name.c_str());
1321
mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
1322
mi->rli.group_master_log_pos);
1323
strmake(mi->master_log_name, mi->rli.group_master_log_name,
1324
sizeof(mi->master_log_name)-1);
1225
1327
Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1226
1328
a slave before).
1330
if (flush_master_info(mi, 0))
1230
1332
my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1231
1333
unlock_slave_threads(mi);
1234
1336
if (need_relay_log_purge)
1236
1338
relay_log_purge= 1;
1237
thd->set_proc_info("Purging old relay logs");
1339
thd_proc_info(thd, "Purging old relay logs");
1238
1340
if (purge_relay_logs(&mi->rli, thd,
1239
1341
0 /* not only reset, but also reinit */,
1242
1344
my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
1243
1345
unlock_slave_threads(mi);
1270
1372
''/0: we have lost all copies of the original good coordinates.
1271
1373
That's why we always save good coords in rli.
1273
mi->rli.group_master_log_pos= mi->getLogPosition();
1274
mi->rli.group_master_log_name.assign(mi->getLogName());
1375
mi->rli.group_master_log_pos= mi->master_log_pos;
1376
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
1377
strmake(mi->rli.group_master_log_name,mi->master_log_name,
1378
sizeof(mi->rli.group_master_log_name)-1);
1276
if (mi->rli.group_master_log_name.size() == 0) // uninitialized case
1277
mi->rli.group_master_log_pos= 0;
1380
if (!mi->rli.group_master_log_name[0]) // uninitialized case
1381
mi->rli.group_master_log_pos=0;
1279
1383
pthread_mutex_lock(&mi->rli.data_lock);
1280
1384
mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1309
1413
return mysql_bin_log.reset_logs(thd);
1312
int cmp_master_pos(const char* log_file_name1, uint64_t log_pos1,
1313
const char* log_file_name2, uint64_t log_pos2)
1416
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
1417
const char* log_file_name2, ulonglong log_pos2)
1316
uint32_t log_file_name1_len= strlen(log_file_name1);
1317
uint32_t log_file_name2_len= strlen(log_file_name2);
1420
uint log_file_name1_len= strlen(log_file_name1);
1421
uint log_file_name2_len= strlen(log_file_name2);
1319
1423
// We assume that both log names match up to '.'
1320
1424
if (log_file_name1_len == log_file_name2_len)
1434
bool mysql_show_binlog_events(THD* thd)
1436
Protocol *protocol= thd->protocol;
1437
List<Item> field_list;
1438
const char *errmsg = 0;
1442
DBUG_ENTER("mysql_show_binlog_events");
1444
Log_event::init_show_field_list(&field_list);
1445
if (protocol->send_fields(&field_list,
1446
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1449
Format_description_log_event *description_event= new
1450
Format_description_log_event(3); /* MySQL 4.0 by default */
1453
Wait for handlers to insert any pending information
1454
into the binlog. For e.g. ndb which updates the binlog asynchronously
1455
this is needed so that the uses sees all its own commands in the binlog
1457
ha_binlog_wait(thd);
1459
if (mysql_bin_log.is_open())
1461
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1462
SELECT_LEX_UNIT *unit= &thd->lex->unit;
1463
ha_rows event_count, limit_start, limit_end;
1464
my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1465
char search_file_name[FN_REFLEN], *name;
1466
const char *log_file_name = lex_mi->log_file_name;
1467
pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1471
unit->set_limit(thd->lex->current_select);
1472
limit_start= unit->offset_limit_cnt;
1473
limit_end= unit->select_limit_cnt;
1475
name= search_file_name;
1477
mysql_bin_log.make_log_name(search_file_name, log_file_name);
1479
name=0; // Find first log
1481
linfo.index_file_offset = 0;
1483
if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1485
errmsg = "Could not find target log";
1489
pthread_mutex_lock(&LOCK_thread_count);
1490
thd->current_linfo = &linfo;
1491
pthread_mutex_unlock(&LOCK_thread_count);
1493
if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
1497
to account binlog event header size
1499
thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
1501
pthread_mutex_lock(log_lock);
1504
open_binlog() sought to position 4.
1505
Read the first event in case it's a Format_description_log_event, to
1506
know the format. If there's no such event, we are 3.23 or 4.x. This
1507
code, like before, can't read 3.23 binlogs.
1508
This code will fail on a mixed relay log (one which has Format_desc then
1509
Rotate then Format_desc).
1511
ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
1514
if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
1516
delete description_event;
1517
description_event= (Format_description_log_event*) ev;
1523
my_b_seek(&log, pos);
1525
if (!description_event->is_valid())
1527
errmsg="Invalid Format_description event; could be out of memory";
1531
for (event_count = 0;
1532
(ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
1533
description_event)); )
1535
if (event_count >= limit_start &&
1536
ev->net_send(protocol, linfo.log_file_name, pos))
1538
errmsg = "Net error";
1540
pthread_mutex_unlock(log_lock);
1544
pos = my_b_tell(&log);
1547
if (++event_count >= limit_end)
1551
if (event_count < limit_end && log.error)
1553
errmsg = "Wrong offset or I/O error";
1554
pthread_mutex_unlock(log_lock);
1558
pthread_mutex_unlock(log_lock);
1564
delete description_event;
1568
(void) my_close(file, MYF(MY_WME));
1572
my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
1573
"SHOW BINLOG EVENTS", errmsg);
1577
pthread_mutex_lock(&LOCK_thread_count);
1578
thd->current_linfo = 0;
1579
pthread_mutex_unlock(&LOCK_thread_count);
1330
1584
bool show_binlog_info(THD* thd)
1332
1586
Protocol *protocol= thd->protocol;
1587
DBUG_ENTER("show_binlog_info");
1333
1588
List<Item> field_list;
1334
1589
field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1335
1590
field_list.push_back(new Item_return_int("Position",20,
1336
DRIZZLE_TYPE_LONGLONG));
1591
MYSQL_TYPE_LONGLONG));
1337
1592
field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
1338
1593
field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
1340
1595
if (protocol->send_fields(&field_list,
1341
1596
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1343
1598
protocol->prepare_for_resend();
1345
1600
if (mysql_bin_log.is_open())
1455
1711
int log_loaded_block(IO_CACHE* file)
1713
DBUG_ENTER("log_loaded_block");
1457
1714
LOAD_FILE_INFO *lf_info;
1459
1716
/* buffer contains position where we started last read */
1460
unsigned char* buffer= (unsigned char*) my_b_get_buffer_start(file);
1461
uint32_t max_event_size= current_thd->variables.max_allowed_packet;
1717
uchar* buffer= (uchar*) my_b_get_buffer_start(file);
1718
uint max_event_size= current_thd->variables.max_allowed_packet;
1462
1719
lf_info= (LOAD_FILE_INFO*) file->arg;
1463
1720
if (lf_info->thd->current_stmt_binlog_row_based)
1465
1722
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1466
1723
lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1469
1726
for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1470
buffer += cmin(block_len, max_event_size),
1471
block_len -= cmin(block_len, max_event_size))
1727
buffer += min(block_len, max_event_size),
1728
block_len -= min(block_len, max_event_size))
1473
1730
lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1474
1731
if (lf_info->wrote_create_file)
1476
1733
Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
1477
cmin(block_len, max_event_size),
1734
min(block_len, max_event_size),
1478
1735
lf_info->log_delayed);
1479
1736
mysql_bin_log.write(&a);
1513
1771
class sys_var_sync_binlog_period :public sys_var_long_ptr
1516
sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
1774
sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
1517
1775
ulong *value_ptr)
1518
1776
:sys_var_long_ptr(chain, name_arg,value_ptr) {}
1519
1777
bool update(THD *thd, set_var *var);
1522
static void fix_slave_net_timeout(THD *thd,
1523
enum_var_type type __attribute__((unused)))
1780
static void fix_slave_net_timeout(THD *thd, enum_var_type type)
1782
DBUG_ENTER("fix_slave_net_timeout");
1783
#ifdef HAVE_REPLICATION
1525
1784
pthread_mutex_lock(&LOCK_active_mi);
1785
DBUG_PRINT("info",("slave_net_timeout=%lu mi->heartbeat_period=%.3f",
1787
(active_mi? active_mi->heartbeat_period : 0.0)));
1526
1788
if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1527
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1789
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1528
1790
ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1529
1791
"The currect value for master_heartbeat_period"
1530
1792
" exceeds the new value of `slave_net_timeout' sec."
1531
1793
" A sensible value for the period should be"
1532
1794
" less than the timeout.");
1533
1795
pthread_mutex_unlock(&LOCK_active_mi);
1537
1800
static sys_var_chain vars = { NULL, NULL };
1549
1812
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
1552
static int show_slave_skip_errors(THD *thd __attribute__((unused)),
1553
SHOW_VAR *var, char *buff)
1815
static SHOW_VAR fixed_vars[]= {
1816
{"log_slave_updates", (char*) &opt_log_slave_updates, SHOW_MY_BOOL},
1817
{"relay_log" , (char*) &opt_relay_logname, SHOW_CHAR_PTR},
1818
{"relay_log_index", (char*) &opt_relaylog_index_name, SHOW_CHAR_PTR},
1819
{"relay_log_info_file", (char*) &relay_log_info_file, SHOW_CHAR_PTR},
1820
{"relay_log_space_limit", (char*) &relay_log_space_limit, SHOW_LONGLONG},
1821
{"slave_load_tmpdir", (char*) &slave_load_tmpdir, SHOW_CHAR_PTR},
1822
{"slave_skip_errors", (char*) &show_slave_skip_errors, SHOW_FUNC},
1825
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff)
1555
1827
var->type=SHOW_CHAR;
1556
1828
var->value= buff;
1581
1853
if (var->value != buff)
1582
1854
buff--; // Remove last ','
1583
1855
if (i < MAX_SLAVE_ERROR)
1584
buff= my_stpcpy(buff, "..."); // Couldn't show all errors
1856
buff= strmov(buff, "..."); // Couldn't show all errors
1590
static st_show_var_func_container
1591
show_slave_skip_errors_cont = { &show_slave_skip_errors };
1594
static SHOW_VAR fixed_vars[]= {
1595
{"log_slave_updates", (char*) &opt_log_slave_updates, SHOW_MY_BOOL},
1596
{"relay_log" , (char*) &opt_relay_logname, SHOW_CHAR_PTR},
1597
{"relay_log_index", (char*) &opt_relaylog_index_name, SHOW_CHAR_PTR},
1598
{"relay_log_info_file", (char*) &relay_log_info_file, SHOW_CHAR_PTR},
1599
{"relay_log_space_limit", (char*) &relay_log_space_limit, SHOW_LONGLONG},
1600
{"slave_load_tmpdir", (char*) &slave_load_tmpdir, SHOW_CHAR_PTR},
1601
{"slave_skip_errors", (char*) &show_slave_skip_errors_cont, SHOW_FUNC},
1604
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((unused)),
1862
bool sys_var_slave_skip_counter::check(THD *thd, set_var *var)
1608
1865
pthread_mutex_lock(&LOCK_active_mi);