242
if (init_master_info(active_mi,master_info_file,relay_log_info_file,
243
1, (SLAVE_IO | SLAVE_SQL)))
242
if (active_mi->init_master_info(master_info_file, relay_log_info_file, (SLAVE_IO | SLAVE_SQL)))
245
244
sql_print_error(_("Failed to initialize the master info structure"));
585
584
once multi-master code is ready.
587
586
terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
588
end_master_info(active_mi);
587
active_mi->end_master_info();
589
588
delete active_mi;
1084
1083
" to the relay log, SHOW SLAVE STATUS may be"
1085
1084
" inaccurate"));
1086
1085
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1087
if (flush_master_info(mi, 1))
1088
1087
sql_print_error(_("Failed to flush master info file"));
1124
1123
pos= net_store_data(pos, (unsigned char*) report_user, report_user_len);
1125
1124
pos= net_store_data(pos, (unsigned char*) report_password, report_password_len);
1126
1125
int2store(pos, (uint16_t) report_port); pos+= 2;
1127
int4store(pos, rpl_recovery_rank); pos+= 4;
1126
int4store(pos, 0); pos+= 4;
1128
1127
/* The master will fill in master_id */
1129
1128
int4store(pos, 0); pos+= 4;
1222
1221
pthread_mutex_lock(&mi->data_lock);
1223
1222
pthread_mutex_lock(&mi->rli.data_lock);
1224
protocol->store(mi->host, &my_charset_bin);
1225
protocol->store(mi->user, &my_charset_bin);
1226
protocol->store((uint32_t) mi->port);
1227
protocol->store((uint32_t) mi->connect_retry);
1228
protocol->store(mi->master_log_name, &my_charset_bin);
1229
protocol->store((uint64_t) mi->master_log_pos);
1230
protocol->store(mi->rli.group_relay_log_name +
1231
dirname_length(mi->rli.group_relay_log_name),
1223
protocol->store(mi->getHostname(), &my_charset_bin);
1224
protocol->store(mi->getUsername(), &my_charset_bin);
1225
protocol->store((uint32_t) mi->getPort());
1226
protocol->store(mi->getConnectionRetry());
1227
protocol->store(mi->getLogName(), &my_charset_bin);
1228
protocol->store((uint64_t) mi->getLogPosition());
1229
protocol->store(mi->rli.group_relay_log_name.c_str() +
1230
dirname_length(mi->rli.group_relay_log_name.c_str()),
1232
1231
&my_charset_bin);
1233
1232
protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1234
protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
1233
protocol->store(mi->rli.group_master_log_name.c_str(), &my_charset_bin);
1235
1234
protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1236
1235
"Yes" : "No", &my_charset_bin);
1237
1236
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1359
1358
thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1360
1359
+ MAX_LOG_EVENT_HEADER; /* note, incr over the global not session var */
1361
1360
thd->slave_thread = 1;
1362
thd->enable_slow_log= opt_log_slow_slave_statements;
1363
1361
set_slave_thread_options(thd);
1364
1362
thd->client_capabilities = CLIENT_LOCAL_FILES;
1365
1363
pthread_mutex_lock(&LOCK_thread_count);
1376
1374
lex_start(thd);
1378
1376
if (thd_type == SLAVE_THD_SQL)
1379
thd_proc_info(thd, "Waiting for the next event in relay log");
1377
thd->set_proc_info("Waiting for the next event in relay log");
1381
thd_proc_info(thd, "Waiting for master update");
1379
thd->set_proc_info("Waiting for master update");
1382
1380
thd->version=refresh_version;
1383
1381
thd->set_time();
1421
1419
unsigned char buf[FN_REFLEN + 10];
1423
1421
int32_t binlog_flags = 0; // for now
1424
char* logname = mi->master_log_name;
1422
const char* logname = mi->getLogName();
1426
1424
*suppress_warnings= false;
1428
1426
// TODO if big log files: Change next to int8store()
1429
int4store(buf, (uint32_t) mi->master_log_pos);
1427
int4store(buf, (uint32_t) mi->getLogPosition());
1430
1428
int2store(buf + 4, binlog_flags);
1431
1429
int4store(buf + 6, server_id);
1432
1430
len = (uint32_t) strlen(logname);
1660
1658
" of the relay log information: the slave may"
1661
1659
" be in an inconsistent state."
1662
1660
" Stopped in %s position %s"),
1663
rli->group_relay_log_name,
1661
rli->group_relay_log_name.c_str(),
1664
1662
llstr(rli->group_relay_log_pos, buf));
1785
1783
if (rli->trans_retries < slave_trans_retries)
1787
if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
1785
if (rli->mi->init_master_info(0, 0, SLAVE_SQL))
1788
1786
sql_print_error(_("Failed to initialize the master info structure"));
1789
1787
else if (init_relay_log_pos(rli,
1790
rli->group_relay_log_name,
1788
rli->group_relay_log_name.c_str(),
1791
1789
rli->group_relay_log_pos,
1792
1790
1, &errmsg, 1))
1793
1791
sql_print_error(_("Error initializing relay log position: %s"),
1891
1889
char buf[256], llbuff[22];
1892
1890
snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1893
IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
1891
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
1895
1893
Raise a warining during registering on master/requesting dump.
1896
1894
Log a message reading event.
1971
thd_proc_info(thd, "Connecting to master");
1969
thd->set_proc_info("Connecting to master");
1972
1970
// we can get killed during safe_connect
1973
1971
if (!safe_connect(thd, drizzle, mi))
1975
1973
sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
1976
1974
"replication started in log '%s' at position %s"),
1977
mi->user, mi->host, mi->port,
1975
mi->getUsername(), mi->getHostname(), mi->getPort(),
1978
1976
IO_RPL_LOG_NAME,
1979
llstr(mi->master_log_pos,llbuff));
1977
llstr(mi->getLogPosition(), llbuff));
1981
1979
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
1982
1980
thread, since a replication event can become this much larger than
1995
1993
// TODO: the assignment below should be under mutex (5.0)
1996
1994
mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
1997
1995
thd->slave_net = &drizzle->net;
1998
thd_proc_info(thd, "Checking master version");
1996
thd->set_proc_info("Checking master version");
1999
1997
if (get_master_version_and_clock(drizzle, mi))
2005
2003
Register ourselves with the master.
2007
thd_proc_info(thd, "Registering slave on master");
2005
thd->set_proc_info("Registering slave on master");
2008
2006
if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2010
2008
if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
2033
2031
while (!io_slave_killed(thd,mi))
2035
thd_proc_info(thd, "Requesting binlog dump");
2033
thd->set_proc_info("Requesting binlog dump");
2036
2034
if (request_dump(drizzle, mi, &suppress_warnings))
2038
2036
sql_print_error(_("Failed on request_dump()"));
2062
2060
important thing is to not confuse users by saying "reading" whereas
2063
2061
we're in fact receiving nothing.
2065
thd_proc_info(thd, _("Waiting for master to send event"));
2063
thd->set_proc_info(_("Waiting for master to send event"));
2066
2064
event_len= read_event(drizzle, mi, &suppress_warnings);
2067
2065
if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while "
2068
2066
"reading event")))
2106
2104
} // if (event_len == packet_error)
2108
2106
retry_count=0; // ok event, reset retry counter
2109
thd_proc_info(thd, _("Queueing master event to the relay log"));
2107
thd->set_proc_info(_("Queueing master event to the relay log"));
2110
2108
if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2114
if (flush_master_info(mi, 1))
2116
2114
sql_print_error(_("Failed to flush master info file"));
2145
2143
// print the current replication position
2146
2144
sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2147
2145
"position %s"),
2148
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2146
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
2149
2147
pthread_mutex_lock(&LOCK_thread_count);
2150
2148
thd->query = thd->db = 0; // extra safety
2151
2149
thd->query_length= thd->db_length= 0;
2166
2164
write_ignored_events_info_to_relay_log(thd, mi);
2167
thd_proc_info(thd, _("Waiting for slave mutex on exit"));
2165
thd->set_proc_info(_("Waiting for slave mutex on exit"));
2168
2166
pthread_mutex_lock(&mi->run_lock);
2170
2168
/* Forget the relay log's format */
2267
2265
rli->trans_retries= 0; // start from "no error"
2269
2267
if (init_relay_log_pos(rli,
2270
rli->group_relay_log_name,
2268
rli->group_relay_log_name.c_str(),
2271
2269
rli->group_relay_log_pos,
2272
2270
1 /*need data lock*/, &errmsg,
2273
2271
1 /*look for a description_event*/))
2299
2297
"position %s, relay log '%s' position: %s"),
2301
2299
llstr(rli->group_master_log_pos,llbuff),
2302
rli->group_relay_log_name,
2300
rli->group_relay_log_name.c_str(),
2303
2301
llstr(rli->group_relay_log_pos,llbuff1));
2305
2303
/* execute init_slave variable */
2335
2333
while (!sql_slave_killed(thd,rli))
2337
thd_proc_info(thd, _("Reading event from the relay log"));
2335
thd->set_proc_info(_("Reading event from the relay log"));
2338
2336
assert(rli->sql_thd == thd);
2339
2337
THD_CHECK_SENTRY(thd);
2340
2338
if (exec_relay_log_event(thd,rli))
2422
2420
thd->query= thd->db= thd->catalog= 0;
2423
2421
thd->query_length= thd->db_length= 0;
2424
2422
pthread_mutex_unlock(&LOCK_thread_count);
2425
thd_proc_info(thd, "Waiting for slave mutex on exit");
2423
thd->set_proc_info("Waiting for slave mutex on exit");
2426
2424
pthread_mutex_lock(&rli->run_lock);
2427
2425
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
2428
2426
pthread_mutex_lock(&rli->data_lock);
2606
2604
/* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2607
memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
2608
mi->master_log_pos= rev->pos;
2605
mi->setLogName(rev->new_log_ident);
2606
mi->setLogPosition(rev->pos);
2610
2608
If we do not do this, we will be getting the first
2611
2609
rotate event forever, so we need to not disconnect after one.
2693
2691
pthread_mutex_lock(&mi->data_lock);
2694
ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
2692
ev->log_pos= mi->getLogPosition(); /* 3.23 events don't contain log_pos */
2695
2693
switch (ev->get_type_code()) {
2696
2694
case STOP_EVENT:
2697
2695
ignore_event= 1;
2720
2718
ev->log_pos+= inc_pos;
2721
2719
int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2723
mi->master_log_pos += inc_pos;
2721
mi->incrementLogPosition(inc_pos);
2724
2722
pthread_mutex_unlock(&mi->data_lock);
2725
2723
free((char*)tmp_buf);
2953
2951
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2955
if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
2956
&& mi->master_log_name != NULL)
2957
|| mi->master_log_pos != hb.log_pos)
2953
if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
2954
|| mi->getLogPosition() != hb.log_pos)
2959
2956
/* missed events of heartbeat from the past */
2960
2957
error= ER_SLAVE_HEARTBEAT_FAILURE;
3011
3008
buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3012
3009
buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3014
mi->master_log_pos+= inc_pos;
3015
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
3011
mi->incrementLogPosition(inc_pos);
3012
memcpy(rli->ign_master_log_name_end, mi->getLogName(), FN_REFLEN);
3016
3013
assert(rli->ign_master_log_name_end[0]);
3017
rli->ign_master_log_pos_end= mi->master_log_pos;
3014
rli->ign_master_log_pos_end= mi->getLogPosition();
3019
3016
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3023
3020
/* write the event to the relay log */
3024
3021
if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3026
mi->master_log_pos+= inc_pos;
3023
mi->incrementLogPosition(inc_pos);
3027
3024
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3123
3120
while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3124
3121
(reconnect ? drizzle_reconnect(drizzle) != 0 :
3125
drizzle_connect(drizzle, mi->host, mi->user, mi->password, 0,
3126
mi->port, 0, client_flag) == 0))
3122
drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
3123
mi->getPort(), 0, client_flag) == 0))
3128
3125
/* Don't repeat last error */
3129
3126
if ((int32_t)drizzle_errno(drizzle) != last_errno)
3134
3131
_("error %s to master '%s@%s:%d'"
3135
3132
" - retry-time: %d retries: %u"),
3136
3133
(reconnect ? _("reconnecting") : _("connecting")),
3137
mi->user, mi->host, mi->port,
3138
mi->connect_retry, master_retry_count);
3134
mi->getUsername(), mi->getHostname(), mi->getPort(),
3135
mi->getConnectionRetry(), master_retry_count);
3141
3138
By default we try forever. The reason is that failure will trigger
3159
3156
if (!suppress_warnings && global_system_variables.log_warnings)
3160
3157
sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3161
3158
"replication resumed in log '%s' at "
3162
"position %s"), mi->user,
3159
"position %s"), mi->getUsername(),
3160
mi->getHostname(), mi->getPort(),
3164
3161
IO_RPL_LOG_NAME,
3165
llstr(mi->master_log_pos,llbuff));
3162
llstr(mi->getLogPosition(),llbuff));
3168
3165
drizzle->reconnect= 1;
3217
3214
bool flush_relay_log_info(Relay_log_info* rli)
3221
3218
if (unlikely(rli->no_storage))
3224
IO_CACHE *file = &rli->info_file;
3225
char buff[FN_REFLEN*2+22*2+4], *pos;
3227
my_b_seek(file, 0L);
3228
pos=my_stpcpy(buff, rli->group_relay_log_name);
3230
pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
3232
pos=my_stpcpy(pos, rli->group_master_log_name);
3234
pos=int64_t2str(rli->group_master_log_pos, pos, 10);
3236
if (my_b_write(file, (unsigned char*) buff, (size_t) (pos-buff)+1))
3238
if (flush_io_cache(file))
3241
/* Flushing the relay log is done by the slave I/O thread */
3253
3232
assert(rli->cur_log_fd == -1);
3255
3234
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3256
if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
3235
if ((rli->cur_log_fd=open_binlog(cur_log, rli->event_relay_log_name.c_str(), errmsg)) < 0)
3260
3238
We want to start exactly where we was before:
3496
3474
if (rli->relay_log.purge_first_log
3498
3476
rli->group_relay_log_pos == rli->event_relay_log_pos
3499
&& !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
3477
&& !strcmp(rli->group_relay_log_name.c_str(), rli->event_relay_log_name.c_str())))
3501
3479
errmsg = "Error purging processed logs";
3519
3497
rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3520
strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
3521
sizeof(rli->event_relay_log_name)-1);
3498
rli->event_relay_log_name.assign(rli->linfo.log_file_name);
3522
3499
flush_relay_log_info(rli);