185
184
void lock_slave_threads(Master_info* mi)
187
DBUG_ENTER("lock_slave_threads");
189
186
//TODO: see if we can do this without dual mutex
190
187
pthread_mutex_lock(&mi->run_lock);
191
188
pthread_mutex_lock(&mi->rli.run_lock);
200
197
void unlock_slave_threads(Master_info* mi)
202
DBUG_ENTER("unlock_slave_threads");
204
199
//TODO: see if we can do this without dual mutex
205
200
pthread_mutex_unlock(&mi->rli.run_lock);
206
201
pthread_mutex_unlock(&mi->run_lock);
307
299
while (!my_isdigit(system_charset_info,*p) && *p)
314
306
int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
316
DBUG_ENTER("terminate_slave_threads");
319
DBUG_RETURN(0); /* successfully do nothing */
309
return(0); /* successfully do nothing */
320
310
int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
321
311
pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
323
313
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
325
DBUG_PRINT("info",("Terminating IO thread"));
326
315
mi->abort_slave=1;
327
316
if ((error=terminate_slave_thread(mi->io_thd,io_lock,
329
318
&mi->slave_running,
334
323
if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
336
DBUG_PRINT("info",("Terminating SQL thread"));
337
325
mi->rli.abort_slave=1;
338
326
if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
339
327
&mi->rli.stop_cond,
340
328
&mi->rli.slave_running,
416
400
EINVAL: invalid signal number (can't happen)
417
401
ESRCH: thread already killed (can happen, should be ignored)
419
IF_DBUG(int err= ) pthread_kill(thd->real_id, thr_client_alarm);
420
DBUG_ASSERT(err != EINVAL);
403
int err= pthread_kill(thd->real_id, thr_client_alarm);
404
assert(err != EINVAL);
422
406
thd->awake(THD::NOT_KILLED);
423
407
pthread_mutex_unlock(&thd->LOCK_delete);
429
413
struct timespec abstime;
430
414
set_timespec(abstime,2);
431
415
error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
432
DBUG_ASSERT(error == ETIMEDOUT || error == 0);
416
assert(error == ETIMEDOUT || error == 0);
435
DBUG_ASSERT(*slave_running == 0);
419
assert(*slave_running == 0);
438
422
pthread_mutex_unlock(term_lock);
472
455
pthread_cond_broadcast(start_cond);
474
457
pthread_mutex_unlock(start_lock);
475
DBUG_RETURN(ER_SLAVE_MUST_STOP);
458
return(ER_SLAVE_MUST_STOP);
477
460
start_id= *slave_run_id;
478
DBUG_PRINT("info",("Creating new slave thread"));
479
461
if (high_priority)
481
463
struct sched_param tmp_sched_param;
490
472
pthread_mutex_unlock(start_lock);
491
DBUG_RETURN(ER_SLAVE_THREAD);
473
return(ER_SLAVE_THREAD);
493
475
if (start_cond && cond_lock) // caller has cond_lock
495
477
THD* thd = current_thd;
496
478
while (start_id == *slave_run_id)
498
DBUG_PRINT("sleep",("Waiting for slave thread to start"));
499
480
const char* old_msg = thd->enter_cond(start_cond,cond_lock,
500
481
"Waiting for slave thread to start");
501
482
pthread_cond_wait(start_cond,cond_lock);
502
483
thd->exit_cond(old_msg);
503
484
pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
505
DBUG_RETURN(thd->killed_errno());
486
return(thd->killed_errno());
509
490
pthread_mutex_unlock(start_lock);
529
510
pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
530
511
pthread_cond_t* cond_io=0,*cond_sql=0;
532
DBUG_ENTER("start_slave_threads");
534
514
if (need_slave_mutex)
607
583
pthread_mutex_unlock(&LOCK_active_mi);
612
588
static bool io_slave_killed(THD* thd, Master_info* mi)
614
DBUG_ENTER("io_slave_killed");
616
DBUG_ASSERT(mi->io_thd == thd);
617
DBUG_ASSERT(mi->slave_running); // tracking buffer overrun
618
DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed);
590
assert(mi->io_thd == thd);
591
assert(mi->slave_running); // tracking buffer overrun
592
return(mi->abort_slave || abort_loop || thd->killed);
622
596
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
624
DBUG_ENTER("sql_slave_killed");
626
DBUG_ASSERT(rli->sql_thd == thd);
627
DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
598
assert(rli->sql_thd == thd);
599
assert(rli->slave_running == 1);// tracking buffer overrun
628
600
if (abort_loop || thd->killed || rli->abort_slave)
637
609
is actively working.
639
611
if (rli->last_event_start_time == 0)
641
DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving "
642
"it some grace period"));
643
613
if (difftime(time(0), rli->last_event_start_time) > 60)
645
615
rli->report(ERROR_LEVEL, 0,
666
636
void skip_load_data_infile(NET *net)
668
DBUG_ENTER("skip_load_data_infile");
670
638
(void)net_request_file(net, "/dev/null");
671
639
(void)my_net_read(net); // discard response
672
640
(void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok
677
645
bool net_request_file(NET* net, const char* fname)
679
DBUG_ENTER("net_request_file");
680
DBUG_RETURN(net_write_command(net, 251, (uchar*) fname, strlen(fname),
647
return(net_write_command(net, 251, (uchar*) fname, strlen(fname),
681
648
(uchar*) "", 0));
691
658
const char *print_slave_db_safe(const char* db)
693
DBUG_ENTER("*print_slave_db_safe");
695
DBUG_RETURN((db ? db : ""));
660
return((db ? db : ""));
698
663
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
699
664
const char *default_val)
702
DBUG_ENTER("init_strvar_from_file");
704
668
if ((length=my_b_gets(f,var, max_size)))
716
680
while (((c=my_b_get(f)) != '\n' && c != my_b_EOF));
720
684
else if (default_val)
722
686
strmake(var, default_val, max_size-1);
729
693
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
732
DBUG_ENTER("init_intvar_from_file");
735
698
if (my_b_gets(f, buf, sizeof(buf)))
737
700
*var = atoi(buf);
740
703
else if (default_val)
742
705
*var = default_val;
748
711
int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
751
DBUG_ENTER("init_floatvar_from_file");
754
716
if (my_b_gets(f, buf, sizeof(buf)))
756
718
if (sscanf(buf, "%f", var) != 1)
761
723
else if (default_val != 0.0)
763
725
*var = default_val;
769
731
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
1058
1019
Master_info* mi = rli->mi;
1059
1020
const char *save_proc_info;
1060
1021
THD* thd = mi->io_thd;
1061
DBUG_ENTER("wait_for_relay_log_space");
1063
1023
pthread_mutex_lock(&rli->log_space_lock);
1064
1024
save_proc_info= thd->enter_cond(&rli->log_space_cond,
1093
1053
Relay_log_info *rli= &mi->rli;
1094
1054
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1095
DBUG_ENTER("write_ignored_events_info_to_relay_log");
1097
DBUG_ASSERT(thd == mi->io_thd);
1056
assert(thd == mi->io_thd);
1098
1057
pthread_mutex_lock(log_lock);
1099
1058
if (rli->ign_master_log_name_end[0])
1101
DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
1102
1060
Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1103
1061
0, rli->ign_master_log_pos_end,
1104
1062
Rotate_log_event::DUP_NAME);
1137
1095
uchar buf[1024], *pos= buf;
1138
1096
uint report_host_len, report_user_len=0, report_password_len=0;
1139
DBUG_ENTER("register_slave_on_master");
1141
*suppress_warnings= FALSE;
1098
*suppress_warnings= false;
1142
1099
if (!report_host)
1144
1101
report_host_len= strlen(report_host);
1145
1102
if (report_user)
1146
1103
report_user_len= strlen(report_user);
1149
1106
/* 30 is a good safety margin */
1150
1107
if (report_host_len + report_user_len + report_password_len + 30 >
1152
DBUG_RETURN(0); // safety
1109
return(0); // safety
1154
1111
int4store(pos, server_id); pos+= 4;
1155
1112
pos= net_store_data(pos, (uchar*) report_host, report_host_len);
1165
1122
if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
1167
*suppress_warnings= TRUE; // Suppress reconnect warning
1124
*suppress_warnings= true; // Suppress reconnect warning
1169
1126
else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
1251
1207
if (protocol->send_fields(&field_list,
1252
1208
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1255
1211
if (mi->host[0])
1257
DBUG_PRINT("info",("host is set: '%s'", mi->host));
1258
1213
String *packet= &thd->packet;
1259
1214
protocol->prepare_for_resend();
1367
1322
pthread_mutex_unlock(&mi->data_lock);
1369
1324
if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
1377
1332
void set_slave_thread_options(THD* thd)
1379
DBUG_ENTER("set_slave_thread_options");
1381
1335
It's nonsense to constrain the slave threads with max_join_size; if a
1382
1336
query succeeded on master, we HAVE to execute it. So set
1393
1347
options&= ~OPTION_BIN_LOG;
1394
1348
thd->options= options;
1395
1349
thd->variables.completion_type= 0;
1399
1353
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
1401
DBUG_ENTER("set_slave_thread_default_charset");
1403
1355
thd->variables.character_set_client=
1404
1356
global_system_variables.character_set_client;
1405
1357
thd->variables.collation_connection=
1425
1377
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
1427
DBUG_ENTER("init_slave_thread");
1428
#if !defined(DBUG_OFF)
1429
1379
int simulate_error= 0;
1431
1380
thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
1432
1381
SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1433
1382
thd->security_ctx->skip_grants();
1447
1396
thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
1448
1397
pthread_mutex_unlock(&LOCK_thread_count);
1450
DBUG_EXECUTE_IF("simulate_io_slave_error_on_init",
1451
simulate_error|= (1 << SLAVE_THD_IO););
1452
DBUG_EXECUTE_IF("simulate_sql_slave_error_on_init",
1453
simulate_error|= (1 << SLAVE_THD_SQL););
1454
#if !defined(DBUG_OFF)
1399
simulate_error|= (1 << SLAVE_THD_IO);
1400
simulate_error|= (1 << SLAVE_THD_SQL);
1455
1401
if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
1457
if (init_thr_lock() || thd->store_globals())
1460
1403
thd->cleanup();
1463
1406
lex_start(thd);
1511
1453
int binlog_flags = 0; // for now
1512
1454
char* logname = mi->master_log_name;
1513
DBUG_ENTER("request_dump");
1515
*suppress_warnings= FALSE;
1456
*suppress_warnings= false;
1517
1458
// TODO if big log files: Change next to int8store()
1518
1459
int4store(buf, (ulong) mi->master_log_pos);
1528
1469
now we just fill up the error log :-)
1530
1471
if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
1531
*suppress_warnings= TRUE; // Suppress reconnect warning
1472
*suppress_warnings= true; // Suppress reconnect warning
1533
1474
sql_print_error("Error on COM_BINLOG_DUMP: %d %s, will retry in %d secs",
1534
1475
mysql_errno(mysql), mysql_error(mysql),
1535
1476
mi->connect_retry);
1561
1502
bool* suppress_warnings)
1564
DBUG_ENTER("read_event");
1566
*suppress_warnings= FALSE;
1506
*suppress_warnings= false;
1568
1508
my_real_read() will time us out
1569
1509
We check if we were told to die, and if not, try reading again
1572
1511
if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
1573
DBUG_RETURN(packet_error);
1512
return(packet_error);
1576
1514
len = cli_safe_read(mysql);
1577
1515
if (len == packet_error || (long) len < 1)
1583
1521
we suppress prints to .err file as long as the reconnect
1584
1522
happens without problems
1586
*suppress_warnings= TRUE;
1524
*suppress_warnings= true;
1589
1527
sql_print_error("Error reading packet from server: %s ( server_errno=%d)",
1590
1528
mysql_error(mysql), mysql_errno(mysql));
1591
DBUG_RETURN(packet_error);
1529
return(packet_error);
1594
1532
/* Check if eof packet */
1597
1535
sql_print_information("Slave: received end packet from server, apparent "
1598
1536
"master shutdown: %s",
1599
1537
mysql_error(mysql));
1600
DBUG_RETURN(packet_error);
1538
return(packet_error);
1603
DBUG_PRINT("exit", ("len: %lu net->read_pos[4]: %d",
1604
len, mysql->net.read_pos[4]));
1605
DBUG_RETURN(len - 1);
1610
1546
Relay_log_info const *rli __attribute__((__unused__)),
1611
1547
int expected_error)
1613
DBUG_ENTER("check_expected_error");
1615
1549
switch (expected_error) {
1616
1550
case ER_NET_READ_ERROR:
1617
1551
case ER_NET_ERROR_ON_WRITE:
1618
1552
case ER_QUERY_INTERRUPTED:
1619
1553
case ER_SERVER_SHUTDOWN:
1620
1554
case ER_NEW_ABORTING_CONNECTION:
1635
1569
static int has_temporary_error(THD *thd)
1637
DBUG_ENTER("has_temporary_error");
1639
1571
if (thd->is_fatal_error)
1642
DBUG_EXECUTE_IF("all_errors_are_temporary_errors",
1643
if (thd->main_da.is_error())
1646
my_error(ER_LOCK_DEADLOCK, MYF(0));
1574
if (thd->main_da.is_error())
1577
my_error(ER_LOCK_DEADLOCK, MYF(0));
1650
1581
If there is no message in THD, we can't say if it's a temporary
1699
1630
int exec_res= 0;
1701
DBUG_ENTER("apply_event_and_update_pos");
1703
DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)",
1704
ev->get_type_str(), ev->get_type_code(),
1706
DBUG_PRINT("info", ("thd->options: %s%s; rli->last_event_start_time: %lu",
1707
FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
1708
FLAGSTR(thd->options, OPTION_BEGIN),
1709
rli->last_event_start_time));
1712
1633
Execute the event to change the database and update the binary
1713
1634
log coordinates, but first we set some data that is needed for
1747
1668
pthread_mutex_unlock(&rli->data_lock);
1748
1669
if (reason == Log_event::EVENT_SKIP_NOT)
1749
1670
exec_res= ev->apply_event(rli);
1752
This only prints information to the debug trace.
1754
TODO: Print an informational message to the error log?
1756
static const char *const explain[] = {
1759
// EVENT_SKIP_IGNORE,
1760
"skipped because event should be ignored",
1762
"skipped because event skip counter was non-zero"
1764
DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
1765
thd->options & OPTION_BEGIN ? 1 : 0,
1766
rli->get_flag(Relay_log_info::IN_STMT)));
1767
DBUG_PRINT("skip_event", ("%s event was %s",
1768
ev->get_type_str(), explain[reason]));
1772
1673
exec_res= ev->apply_event(rli);
1774
DBUG_PRINT("info", ("apply_event error = %d", exec_res));
1775
1675
if (exec_res == 0)
1777
1677
int error= ev->update_pos(rli);
1785
DBUG_PRINT("info", ("update_pos error = %d", error));
1786
DBUG_PRINT("info", ("group %s %s",
1787
llstr(rli->group_relay_log_pos, buf),
1788
rli->group_relay_log_name));
1789
DBUG_PRINT("info", ("event %s %s",
1790
llstr(rli->event_relay_log_pos, buf),
1791
rli->event_relay_log_name));
1794
1679
The update should not fail, so print an error message and
1795
1680
return an error code.
1807
1692
" Stopped in %s position %s",
1808
1693
rli->group_relay_log_name,
1809
1694
llstr(rli->group_relay_log_pos, buf));
1814
DBUG_RETURN(exec_res ? 1 : 0);
1699
return(exec_res ? 1 : 0);
1857
1740
Log_event * ev = next_event(rli);
1859
DBUG_ASSERT(rli->sql_thd==thd);
1742
assert(rli->sql_thd==thd);
1861
1744
if (sql_slave_killed(thd,rli))
1863
1746
pthread_mutex_unlock(&rli->data_lock);
1887
1770
rli->abort_slave= 1;
1888
1771
pthread_mutex_unlock(&rli->data_lock);
1892
exec_res= apply_event_and_update_pos(ev, thd, rli, TRUE);
1775
exec_res= apply_event_and_update_pos(ev, thd, rli, true);
1895
1778
Format_description_log_event should not be deleted because it will be
1971
1851
non-transient error, the slave will stop with an error.
1973
1853
rli->trans_retries= 0; // restart from fresh
1974
DBUG_PRINT("info", ("Resetting retry counter, rli->trans_retries: %lu",
1975
rli->trans_retries));
1978
DBUG_RETURN(exec_res);
1980
1858
pthread_mutex_unlock(&rli->data_lock);
1981
1859
rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
2119
1992
pthread_mutex_unlock(&mi->run_lock);
2120
1993
pthread_cond_broadcast(&mi->start_cond);
2122
DBUG_PRINT("master_info",("log_file_name: '%s' position: %s",
2123
mi->master_log_name,
2124
llstr(mi->master_log_pos,llbuff)));
2126
1995
if (!(mi->mysql = mysql = mysql_init(NULL)))
2128
1997
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2182
2051
goto connected;
2184
DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_REG",
2185
if (!retry_count_reg)
2188
sql_print_information("Forcing to reconnect slave I/O thread");
2189
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2190
reconnect_messages[SLAVE_RECON_ACT_REG]))
2053
if (!retry_count_reg)
2056
sql_print_information("Forcing to reconnect slave I/O thread");
2057
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2058
reconnect_messages[SLAVE_RECON_ACT_REG]))
2196
DBUG_PRINT("info",("Starting reading binary log from master"));
2197
2064
while (!io_slave_killed(thd,mi))
2199
2066
thd_proc_info(thd, "Requesting binlog dump");
2208
2075
goto connected;
2210
DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_DUMP",
2211
if (!retry_count_dump)
2214
sql_print_information("Forcing to reconnect slave I/O thread");
2215
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2216
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2077
if (!retry_count_dump)
2080
sql_print_information("Forcing to reconnect slave I/O thread");
2081
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2082
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2221
2087
while (!io_slave_killed(thd,mi))
2232
2098
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
2233
2099
reading event"))
2235
DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_EVENT",
2236
if (!retry_count_event)
2238
retry_count_event++;
2239
sql_print_information("Forcing to reconnect slave I/O thread");
2240
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2241
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2101
if (!retry_count_event)
2103
retry_count_event++;
2104
sql_print_information("Forcing to reconnect slave I/O thread");
2105
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2106
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2246
2111
if (event_len == packet_error)
2293
2158
for no reason, but this function will do a clean read, notice the clean
2294
2159
value and exit immediately.
2298
char llbuf1[22], llbuf2[22];
2299
DBUG_PRINT("info", ("log_space_limit=%s log_space_total=%s \
2300
ignore_log_space_limit=%d",
2301
llstr(rli->log_space_limit,llbuf1),
2302
llstr(rli->log_space_total,llbuf2),
2303
(int) rli->ignore_log_space_limit));
2307
2161
if (rli->log_space_limit && rli->log_space_limit <
2308
2162
rli->log_space_total &&
2309
2163
!rli->ignore_log_space_limit)
2350
2204
mi->rli.relay_log.description_event_for_queue= 0;
2351
2205
// TODO: make rpl_status part of Master_info
2352
2206
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2353
DBUG_ASSERT(thd->net.buff != 0);
2207
assert(thd->net.buff != 0);
2354
2208
net_end(&thd->net); // destructor will not free it, because net.vio is 0
2355
2209
close_thread_tables(thd);
2356
2210
pthread_mutex_lock(&LOCK_thread_count);
2386
2240
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
2387
2241
my_thread_init();
2388
DBUG_ENTER("handle_slave_sql");
2390
DBUG_ASSERT(rli->inited);
2243
assert(rli->inited);
2391
2244
pthread_mutex_lock(&rli->run_lock);
2392
DBUG_ASSERT(!rli->slave_running);
2245
assert(!rli->slave_running);
2395
2247
rli->events_till_abort = abort_slave_event_count;
2398
2249
thd = new THD; // note that contructor of THD uses DBUG_ !
2399
2250
thd->thread_stack = (char*)&thd; // remember where our stack is
2449
2300
rli->ignore_log_space_limit= 0;
2450
2301
pthread_mutex_unlock(&rli->log_space_lock);
2451
2302
rli->trans_retries= 0; // start from "no error"
2452
DBUG_PRINT("info", ("rli->trans_retries: %lu", rli->trans_retries));
2454
2304
if (init_relay_log_pos(rli,
2455
2305
rli->group_relay_log_name,
2464
2314
THD_CHECK_SENTRY(thd);
2467
char llbuf1[22], llbuf2[22];
2468
DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
2469
llstr(my_b_tell(rli->cur_log),llbuf1),
2470
llstr(rli->event_relay_log_pos,llbuf2)));
2471
DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2473
Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2474
correct position when it's called just after my_b_seek() (the questionable
2475
stuff is those "seek is done on next read" comments in the my_b_seek()
2477
The crude reality is that this assertion randomly fails whereas
2478
replication seems to work fine. And there is no easy explanation why it
2479
fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
2480
init_relay_log_pos() called above). Maybe the assertion would be
2481
meaningful if we held rli->data_lock between the my_b_seek() and the
2484
#ifdef SHOULD_BE_CHECKED
2485
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2489
DBUG_ASSERT(rli->sql_thd == thd);
2315
assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2317
Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2318
correct position when it's called just after my_b_seek() (the questionable
2319
stuff is those "seek is done on next read" comments in the my_b_seek()
2321
The crude reality is that this assertion randomly fails whereas
2322
replication seems to work fine. And there is no easy explanation why it
2323
fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
2324
init_relay_log_pos() called above). Maybe the assertion would be
2325
meaningful if we held rli->data_lock between the my_b_seek() and the
2328
assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2329
assert(rli->sql_thd == thd);
2491
DBUG_PRINT("master_info",("log_file_name: %s position: %s",
2492
rli->group_master_log_name,
2493
llstr(rli->group_master_log_pos,llbuff)));
2494
2331
if (global_system_variables.log_warnings)
2495
2332
sql_print_information("Slave SQL thread initialized, starting replication in \
2496
2333
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
2530
2367
while (!sql_slave_killed(thd,rli))
2532
2369
thd_proc_info(thd, "Reading event from the relay log");
2533
DBUG_ASSERT(rli->sql_thd == thd);
2370
assert(rli->sql_thd == thd);
2534
2371
THD_CHECK_SENTRY(thd);
2535
2372
if (exec_relay_log_event(thd,rli))
2537
DBUG_PRINT("info", ("exec_relay_log_event() failed"));
2538
2374
// do not scare the user if SQL thread was simply killed or stopped
2539
2375
if (!sql_slave_killed(thd,rli))
2550
2386
char const *const errmsg= thd->main_da.message();
2553
("thd->main_da.sql_errno()=%d; rli->last_error.number=%d",
2554
thd->main_da.sql_errno(), last_errno));
2555
2388
if (last_errno == 0)
2557
2390
rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), errmsg);
2620
2453
pthread_mutex_lock(&rli->run_lock);
2621
2454
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
2622
2455
pthread_mutex_lock(&rli->data_lock);
2623
DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
2456
assert(rli->slave_running == 1); // tracking buffer overrun
2624
2457
/* When master_pos_wait() wakes up it will check this and terminate */
2625
2458
rli->slave_running= 0;
2626
2459
/* Forget the relay log's format */
2628
2461
rli->relay_log.description_event_for_exec= 0;
2629
2462
/* Wake up master_pos_wait() */
2630
2463
pthread_mutex_unlock(&rli->data_lock);
2631
DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
2632
2464
pthread_cond_broadcast(&rli->data_cond);
2633
2465
rli->ignore_log_space_limit= 0; /* don't need any lock */
2634
2466
/* we die so won't remember charset - re-update them on next thread start */
2640
2472
to avoid unneeded position re-init
2642
2474
thd->temporary_tables = 0; // remove tempation from destructor to close them
2643
DBUG_ASSERT(thd->net.buff != 0);
2475
assert(thd->net.buff != 0);
2644
2476
net_end(&thd->net); // destructor will not free it, because we are weird
2645
DBUG_ASSERT(rli->sql_thd == thd);
2477
assert(rli->sql_thd == thd);
2646
2478
THD_CHECK_SENTRY(thd);
2647
2479
rli->sql_thd= 0;
2648
2480
pthread_mutex_lock(&LOCK_thread_count);
2674
2506
bool cev_not_written;
2675
2507
THD *thd = mi->io_thd;
2676
2508
NET *net = &mi->mysql->net;
2677
DBUG_ENTER("process_io_create_file");
2679
2510
if (unlikely(!cev->is_valid()))
2682
2513
if (!rpl_filter->db_ok(cev->db))
2684
2515
skip_load_data_infile(net);
2687
DBUG_ASSERT(cev->inited_from_old);
2518
assert(cev->inited_from_old);
2688
2519
thd->file_id = cev->file_id = mi->file_id++;
2689
2520
thd->server_id = cev->server_id;
2690
2521
cev_not_written = 1;
2797
2628
static int process_io_rotate(Master_info *mi, Rotate_log_event *rev)
2799
DBUG_ENTER("process_io_rotate");
2800
2630
safe_mutex_assert_owner(&mi->data_lock);
2802
2632
if (unlikely(!rev->is_valid()))
2805
2635
/* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2806
2636
memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
2807
2637
mi->master_log_pos= rev->pos;
2808
DBUG_PRINT("info", ("master_log_pos: '%s' %lu",
2809
mi->master_log_name, (ulong) mi->master_log_pos));
2812
2639
If we do not do this, we will be getting the first
2813
2640
rotate event forever, so we need to not disconnect after one.
2815
2642
if (disconnect_slave_event_count)
2816
2643
mi->events_till_disconnect++;
2820
2646
If description_event_for_queue is format <4, there is conversion in the
2863
2688
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2864
2689
ER(ER_SLAVE_FATAL_ERROR), "Memory allocation failed");
2867
2692
memcpy(tmp_buf,buf,event_len);
2920
2745
/* We come here when and only when tmp_buf != 0 */
2921
DBUG_ASSERT(tmp_buf != 0);
2746
assert(tmp_buf != 0);
2922
2747
inc_pos=event_len;
2923
2748
ev->log_pos+= inc_pos;
2924
2749
int error = process_io_create_file(mi,(Create_file_log_event*)ev);
2926
2751
mi->master_log_pos += inc_pos;
2927
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
2928
2752
pthread_mutex_unlock(&mi->data_lock);
2929
2753
my_free((char*)tmp_buf, MYF(0));
2933
2757
inc_pos= event_len;
2947
2771
pthread_mutex_unlock(&mi->data_lock);
2950
2774
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2953
2777
mi->master_log_pos+= inc_pos;
2954
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
2955
2778
pthread_mutex_unlock(&mi->data_lock);
2968
2791
char *tmp_buf = 0;
2969
2792
Relay_log_info *rli= &mi->rli;
2970
DBUG_ENTER("queue_binlog_ver_3_event");
2972
2794
/* read_log_event() will adjust log_pos to be end_log_pos */
2973
2795
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2978
2800
master could be corrupt but a more likely cause of this is a bug",
2980
2802
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
2983
2805
pthread_mutex_lock(&mi->data_lock);
2984
2806
switch (ev->get_type_code()) {
3003
2825
pthread_mutex_unlock(&mi->data_lock);
3006
2828
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3008
2830
mi->master_log_pos+= inc_pos;
3010
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
3011
2832
pthread_mutex_unlock(&mi->data_lock);
3027
2848
static int queue_old_event(Master_info *mi, const char *buf,
3028
2849
ulong event_len)
3030
DBUG_ENTER("queue_old_event");
3032
2851
switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
3035
DBUG_RETURN(queue_binlog_ver_1_event(mi,buf,event_len));
2854
return(queue_binlog_ver_1_event(mi,buf,event_len));
3037
DBUG_RETURN(queue_binlog_ver_3_event(mi,buf,event_len));
2856
return(queue_binlog_ver_3_event(mi,buf,event_len));
3038
2857
default: /* unsupported format; eg version 2 */
3039
DBUG_PRINT("info",("unsupported binlog format %d in queue_old_event()",
3040
mi->rli.relay_log.description_event_for_queue->binlog_version));
3060
2877
Relay_log_info *rli= &mi->rli;
3061
2878
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
3062
DBUG_ENTER("queue_event");
3065
2881
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
3066
2882
buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
3067
DBUG_RETURN(queue_old_event(mi,buf,event_len));
2883
return(queue_old_event(mi,buf,event_len));
3069
2885
pthread_mutex_lock(&mi->data_lock);
3129
2945
it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
3131
2947
inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
3132
DBUG_PRINT("info",("binlog format is now %d",
3133
mi->rli.relay_log.description_event_for_queue->binlog_version));
3228
3041
mi->master_log_pos+= inc_pos;
3229
3042
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
3230
DBUG_ASSERT(rli->ign_master_log_name_end[0]);
3043
assert(rli->ign_master_log_name_end[0]);
3231
3044
rli->ign_master_log_pos_end= mi->master_log_pos;
3233
3046
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3234
DBUG_PRINT("info", ("master_log_pos: %lu, event originating from the same server, ignored",
3235
(ulong) mi->master_log_pos));
3240
3051
if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3242
3053
mi->master_log_pos+= inc_pos;
3243
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
3244
3054
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3257
3067
pthread_mutex_unlock(&mi->data_lock);
3258
DBUG_PRINT("info", ("error: %d", error));
3260
3069
mi->report(ERROR_LEVEL, error, ER(error),
3261
3070
(error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
3262
3071
"could not queue event from master" :
3263
3072
error_msg.ptr());
3268
3077
void end_relay_log_info(Relay_log_info* rli)
3270
DBUG_ENTER("end_relay_log_info");
3272
3079
if (!rli->inited)
3274
3081
if (rli->info_fd >= 0)
3276
3083
end_io_cache(&rli->info_file);
3312
3119
static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi)
3314
DBUG_ENTER("safe_connect");
3316
DBUG_RETURN(connect_to_master(thd, mysql, mi, 0, 0));
3121
return(connect_to_master(thd, mysql, mi, 0, 0));
3333
3138
int last_errno= -2; // impossible error
3334
3139
ulong err_count=0;
3335
3140
char llbuff[22];
3336
DBUG_ENTER("connect_to_master");
3339
3142
mi->events_till_disconnect = disconnect_slave_event_count;
3341
3143
ulong client_flag= CLIENT_REMEMBER_OPTIONS;
3342
3144
if (opt_slave_compressed_protocol)
3343
3145
client_flag=CLIENT_COMPRESS; /* We will use compression */
3421
3222
static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
3422
3223
bool suppress_warnings)
3424
DBUG_ENTER("safe_reconnect");
3425
DBUG_RETURN(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
3225
return(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
3458
3258
bool flush_relay_log_info(Relay_log_info* rli)
3461
DBUG_ENTER("flush_relay_log_info");
3463
3262
if (unlikely(rli->no_storage))
3466
3265
IO_CACHE *file = &rli->info_file;
3467
3266
char buff[FN_REFLEN*2+22*2+4], *pos;
3492
3291
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
3494
DBUG_ENTER("reopen_relay_log");
3495
DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
3496
DBUG_ASSERT(rli->cur_log_fd == -1);
3293
assert(rli->cur_log != &rli->cache_buf);
3294
assert(rli->cur_log_fd == -1);
3498
3296
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3499
3297
if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
3503
3301
We want to start exactly where we was before:
3504
3302
relay_log_pos Current log pos
3576
3371
if (!my_b_inited(cur_log))
3580
/* This is an assertion which sometimes fails, let's try to track it */
3581
char llbuf1[22], llbuf2[22];
3582
DBUG_PRINT("info", ("my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
3583
llstr(my_b_tell(cur_log),llbuf1),
3584
llstr(rli->event_relay_log_pos,llbuf2)));
3585
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3586
DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
3373
assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3374
assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
3590
3377
Relay log is always in new format - if the master is 3.23, the
3591
3378
I/O thread will convert the format for us.
3602
3389
rli->relay_log.description_event_for_exec)))
3605
DBUG_ASSERT(thd==rli->sql_thd);
3392
assert(thd==rli->sql_thd);
3607
3394
read it while we have a lock, to avoid a mutex lock in
3608
3395
inc_event_relay_log_pos()
3610
3397
rli->future_event_relay_log_pos= my_b_tell(cur_log);
3612
3399
pthread_mutex_unlock(log_lock);
3615
DBUG_ASSERT(thd==rli->sql_thd);
3402
assert(thd==rli->sql_thd);
3616
3403
if (opt_reckless_slave) // For mysql-test
3617
3404
cur_log->error = 0;
3618
3405
if (cur_log->error < 0)
3655
3442
time_t save_timestamp= rli->last_master_timestamp;
3656
3443
rli->last_master_timestamp= 0;
3658
DBUG_ASSERT(rli->relay_log.get_open_count() ==
3445
assert(rli->relay_log.get_open_count() ==
3659
3446
rli->cur_log_old_open_count);
3661
3448
if (rli->ign_master_log_name_end[0])
3663
3450
/* We generate and return a Rotate, to make our positions advance */
3664
DBUG_PRINT("info",("seeing an ignored end segment"));
3665
3451
ev= new Rotate_log_event(rli->ign_master_log_name_end,
3666
3452
0, rli->ign_master_log_pos_end,
3667
3453
Rotate_log_event::DUP_NAME);
3787
3573
To guard against this, we need to have LOCK_log.
3790
DBUG_PRINT("info",("hot_log: %d",hot_log));
3791
3576
if (!hot_log) /* if hot_log, we already have this mutex */
3792
3577
pthread_mutex_lock(log_lock);
3793
3578
if (rli->relay_log.is_active(rli->linfo.log_file_name))
3800
3585
rli->cur_log= cur_log= rli->relay_log.get_log_file();
3801
3586
rli->cur_log_old_open_count= rli->relay_log.get_open_count();
3802
DBUG_ASSERT(rli->cur_log_fd == -1);
3587
assert(rli->cur_log_fd == -1);
3805
3590
Read pointer has to be at the start since we are the only
3918
3701
@param rli Relay_log_info which tells the master's version
3919
3702
@param bug_id Number of the bug as found in bugs.mysql.com
3920
3703
@param report bool report error message, default TRUE
3921
@return TRUE if master has the bug, FALSE if it does not.
3704
@return true if master has the bug, FALSE if it does not.
3923
3706
bool rpl_master_has_bug(Relay_log_info *rli, uint bug_id, bool report)
3997
3780
if (active_mi && active_mi->rli.sql_thd == thd)
3999
3782
Relay_log_info *rli= &active_mi->rli;
4000
return rpl_master_has_bug(rli, 33029, FALSE);
3783
return rpl_master_has_bug(rli, 33029, false);
4005
3788
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION