130
130
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
131
131
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
132
132
static bool wait_for_relay_log_space(Relay_log_info* rli);
133
static inline bool io_slave_killed(Session* thd,Master_info* mi);
134
static inline bool sql_slave_killed(Session* thd,Relay_log_info* rli);
135
static int32_t init_slave_thread(Session* thd, SLAVE_Session_TYPE thd_type);
136
static int32_t safe_connect(Session* thd, DRIZZLE *drizzle, Master_info* mi);
137
static int32_t safe_reconnect(Session* thd, DRIZZLE *drizzle, Master_info* mi,
133
static inline bool io_slave_killed(Session* session,Master_info* mi);
134
static inline bool sql_slave_killed(Session* session,Relay_log_info* rli);
135
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type);
136
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi);
137
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
138
138
bool suppress_warnings);
139
static int32_t connect_to_master(Session* thd, DRIZZLE *drizzle, Master_info* mi,
139
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
140
140
bool reconnect, bool suppress_warnings);
141
static int32_t safe_sleep(Session* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
141
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
142
142
void* thread_killed_arg);
143
143
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
144
144
static Log_event* next_event(Relay_log_info* rli);
145
145
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
146
static int32_t terminate_slave_thread(Session *thd,
146
static int32_t terminate_slave_thread(Session *session,
147
147
pthread_mutex_t* term_lock,
148
148
pthread_cond_t* term_cond,
149
149
volatile uint32_t *slave_running,
151
static bool check_io_slave_killed(Session *thd, Master_info *mi, const char *info);
151
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info);
154
154
Find out which replications threads are running
373
terminate_slave_thread(Session *thd,
373
terminate_slave_thread(Session *session,
374
374
pthread_mutex_t* term_lock,
375
375
pthread_cond_t* term_cond,
376
376
volatile uint32_t *slave_running,
400
400
while (*slave_running) // Should always be true
402
pthread_mutex_lock(&thd->LOCK_delete);
402
pthread_mutex_lock(&session->LOCK_delete);
403
403
#ifndef DONT_USE_THR_ALARM
405
405
Error codes from pthread_kill are:
406
406
EINVAL: invalid signal number (can't happen)
407
407
ESRCH: thread already killed (can happen, should be ignored)
409
int32_t err= pthread_kill(thd->real_id, thr_client_alarm);
409
int32_t err= pthread_kill(session->real_id, thr_client_alarm);
410
410
assert(err != EINVAL);
412
thd->awake(Session::NOT_KILLED);
413
pthread_mutex_unlock(&thd->LOCK_delete);
412
session->awake(Session::NOT_KILLED);
413
pthread_mutex_unlock(&session->LOCK_delete);
416
416
There is a small chance that slave thread might miss the first
481
481
if (start_cond && cond_lock) // caller has cond_lock
483
Session* thd = current_thd;
483
Session* session = current_session;
484
484
while (start_id == *slave_run_id)
486
const char* old_msg = thd->enter_cond(start_cond,cond_lock,
486
const char* old_msg = session->enter_cond(start_cond,cond_lock,
487
487
"Waiting for slave thread to start");
488
488
pthread_cond_wait(start_cond,cond_lock);
489
thd->exit_cond(old_msg);
489
session->exit_cond(old_msg);
490
490
pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
492
return(thd->killed_errno());
492
return(session->killed_errno());
594
static bool io_slave_killed(Session* thd, Master_info* mi)
594
static bool io_slave_killed(Session* session, Master_info* mi)
596
assert(mi->io_thd == thd);
596
assert(mi->io_session == session);
597
597
assert(mi->slave_running); // tracking buffer overrun
598
return(mi->abort_slave || abort_loop || thd->killed);
598
return(mi->abort_slave || abort_loop || session->killed);
602
static bool sql_slave_killed(Session* thd, Relay_log_info* rli)
602
static bool sql_slave_killed(Session* session, Relay_log_info* rli)
604
assert(rli->sql_thd == thd);
604
assert(rli->sql_session == session);
605
605
assert(rli->slave_running == 1);// tracking buffer overrun
606
if (abort_loop || thd->killed || rli->abort_slave)
606
if (abort_loop || session->killed || rli->abort_slave)
609
609
If we are in an unsafe situation (stopping could corrupt replication),
737
static bool check_io_slave_killed(Session *thd, Master_info *mi, const char *info)
737
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info)
739
if (io_slave_killed(thd, mi))
739
if (io_slave_killed(session, mi))
741
741
if (info && global_system_variables.log_warnings)
742
742
sql_print_information("%s",info);
855
855
mi->clock_diff_with_master=
856
856
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
858
else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
858
else if (!check_io_slave_killed(mi->io_session, mi, NULL))
860
860
mi->clock_diff_with_master= 0; /* The "most sensible" value */
861
861
sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
991
991
sprintf(query, query_format, llbuf);
993
993
if (drizzle_real_query(drizzle, query, strlen(query))
994
&& !check_io_slave_killed(mi->io_thd, mi, NULL))
994
&& !check_io_slave_killed(mi->io_session, mi, NULL))
996
996
err_msg.append("The slave I/O thread stops because querying master with '");
997
997
err_msg.append(query);
1026
1026
bool slave_killed=0;
1027
1027
Master_info* mi = rli->mi;
1028
1028
const char *save_proc_info;
1029
Session* thd = mi->io_thd;
1029
Session* session = mi->io_session;
1031
1031
pthread_mutex_lock(&rli->log_space_lock);
1032
save_proc_info= thd->enter_cond(&rli->log_space_cond,
1032
save_proc_info= session->enter_cond(&rli->log_space_cond,
1033
1033
&rli->log_space_lock,
1034
1034
_("Waiting for the slave SQL thread "
1035
1035
"to free enough relay log space"));
1036
1036
while (rli->log_space_limit < rli->log_space_total &&
1037
!(slave_killed=io_slave_killed(thd,mi)) &&
1037
!(slave_killed=io_slave_killed(session,mi)) &&
1038
1038
!rli->ignore_log_space_limit)
1039
1039
pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1040
thd->exit_cond(save_proc_info);
1040
session->exit_cond(save_proc_info);
1041
1041
return(slave_killed);
1055
1055
ignored events' end position for the use of the slave SQL thread, by
1056
1056
calling this function. Only that thread can call it (see assertion).
1058
static void write_ignored_events_info_to_relay_log(Session *thd __attribute__((unused)),
1058
static void write_ignored_events_info_to_relay_log(Session *session __attribute__((unused)),
1059
1059
Master_info *mi)
1061
1061
Relay_log_info *rli= &mi->rli;
1062
1062
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1064
assert(thd == mi->io_thd);
1064
assert(session == mi->io_session);
1065
1065
pthread_mutex_lock(log_lock);
1066
1066
if (rli->ign_master_log_name_end[0])
1069
1069
0, rli->ign_master_log_pos_end,
1070
1070
Rotate_log_event::DUP_NAME);
1071
1071
rli->ign_master_log_name_end[0]= 0;
1072
/* can unlock before writing as slave SQL thd will soon see our Rotate */
1072
/* can unlock before writing as slave SQL session will soon see our Rotate */
1073
1073
pthread_mutex_unlock(log_lock);
1074
1074
if (likely((bool)ev))
1132
1132
*suppress_warnings= true; // Suppress reconnect warning
1134
else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
1134
else if (!check_io_slave_killed(mi->io_session, mi, NULL))
1137
1137
snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle),
1148
bool show_master_info(Session* thd, Master_info* mi)
1148
bool show_master_info(Session* session, Master_info* mi)
1150
1150
// TODO: fix this for multi-master
1151
1151
List<Item> field_list;
1152
Protocol *protocol= thd->protocol;
1152
Protocol *protocol= session->protocol;
1154
1154
field_list.push_back(new Item_empty_string("Slave_IO_State",
1206
1206
if (mi->host[0])
1208
String *packet= &thd->packet;
1208
String *packet= &session->packet;
1209
1209
protocol->prepare_for_resend();
1212
1212
slave_running can be accessed without run_lock but not other
1213
non-volotile members like mi->io_thd, which is guarded by the mutex.
1213
non-volotile members like mi->io_session, which is guarded by the mutex.
1215
1215
pthread_mutex_lock(&mi->run_lock);
1216
protocol->store(mi->io_thd ? mi->io_thd->get_proc_info() : "", &my_charset_bin);
1216
protocol->store(mi->io_session ? mi->io_session->get_proc_info() : "", &my_charset_bin);
1217
1217
pthread_mutex_unlock(&mi->run_lock);
1219
1219
pthread_mutex_lock(&mi->data_lock);
1308
1308
pthread_mutex_unlock(&mi->rli.data_lock);
1309
1309
pthread_mutex_unlock(&mi->data_lock);
1311
if (my_net_write(&thd->net, (unsigned char*) thd->packet.ptr(), packet->length()))
1311
if (my_net_write(&session->net, (unsigned char*) session->packet.ptr(), packet->length()))
1319
void set_slave_thread_options(Session* thd)
1319
void set_slave_thread_options(Session* session)
1322
1322
It's nonsense to constrain the slave threads with max_join_size; if a
1327
1327
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1328
1328
only for client threads.
1330
uint64_t options= thd->options | OPTION_BIG_SELECTS;
1330
uint64_t options= session->options | OPTION_BIG_SELECTS;
1331
1331
if (opt_log_slave_updates)
1332
1332
options|= OPTION_BIN_LOG;
1334
1334
options&= ~OPTION_BIN_LOG;
1335
thd->options= options;
1336
thd->variables.completion_type= 0;
1335
session->options= options;
1336
session->variables.completion_type= 0;
1341
1341
init_slave_thread()
1344
static int32_t init_slave_thread(Session* thd, SLAVE_Session_TYPE thd_type)
1344
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type)
1346
1346
int32_t simulate_error= 0;
1347
thd->system_thread = (thd_type == SLAVE_Session_SQL) ?
1347
session->system_thread = (session_type == SLAVE_Session_SQL) ?
1348
1348
SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1349
thd->security_ctx->skip_grants();
1350
my_net_init(&thd->net, 0);
1349
session->security_ctx->skip_grants();
1350
my_net_init(&session->net, 0);
1352
1352
Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1353
1353
slave threads, since a replication event can become this much larger
1354
1354
than the corresponding packet (query) sent from client to master.
1356
thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1356
session->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1357
1357
+ MAX_LOG_EVENT_HEADER; /* note, incr over the global not session var */
1358
thd->slave_thread = 1;
1359
set_slave_thread_options(thd);
1360
thd->client_capabilities = CLIENT_LOCAL_FILES;
1358
session->slave_thread = 1;
1359
set_slave_thread_options(session);
1360
session->client_capabilities = CLIENT_LOCAL_FILES;
1361
1361
pthread_mutex_lock(&LOCK_thread_count);
1362
thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
1362
session->thread_id= session->variables.pseudo_thread_id= thread_id++;
1363
1363
pthread_mutex_unlock(&LOCK_thread_count);
1365
1365
simulate_error|= (1 << SLAVE_Session_IO);
1366
1366
simulate_error|= (1 << SLAVE_Session_SQL);
1367
if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
1367
if (init_thr_lock() || session->store_globals() || simulate_error & (1<< session_type))
1374
if (thd_type == SLAVE_Session_SQL)
1375
thd->set_proc_info("Waiting for the next event in relay log");
1374
if (session_type == SLAVE_Session_SQL)
1375
session->set_proc_info("Waiting for the next event in relay log");
1377
thd->set_proc_info("Waiting for master update");
1378
thd->version=refresh_version;
1377
session->set_proc_info("Waiting for master update");
1378
session->version=refresh_version;
1379
session->set_time();
1384
static int32_t safe_sleep(Session* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1384
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1385
1385
void* thread_killed_arg)
1387
1387
int32_t nap_time;
1532
1532
that the error is temporary by pushing a warning with the error code
1533
1533
ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1535
static int32_t has_temporary_error(Session *thd)
1535
static int32_t has_temporary_error(Session *session)
1537
if (thd->is_fatal_error)
1537
if (session->is_fatal_error)
1540
if (thd->main_da.is_error())
1540
if (session->main_da.is_error())
1542
session->clear_error();
1543
1543
my_error(ER_LOCK_DEADLOCK, MYF(0));
1619
1619
has a Rotate etc).
1622
thd->server_id = ev->server_id; // use the original server id for logging
1623
thd->set_time(); // time the query
1624
thd->lex->current_select= 0;
1622
session->server_id = ev->server_id; // use the original server id for logging
1623
session->set_time(); // time the query
1624
session->lex->current_select= 0;
1626
1626
ev->when= my_time(0);
1627
ev->thd = thd; // because up to this point, ev->thd == 0
1627
ev->session = session; // because up to this point, ev->session == 0
1794
end_trans(thd, ROLLBACK);
1794
end_trans(session, ROLLBACK);
1795
1795
/* chance for concurrent connection to get more locks */
1796
safe_sleep(thd, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1796
safe_sleep(session, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1797
1797
(CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1798
1798
pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1799
1799
rli->trans_retries++;
1851
1851
are taken from @c messages array. In case @c master_retry_count is exceeded,
1852
1852
no messages are added to the log.
1854
@param[in] thd Thread context.
1854
@param[in] session Thread context.
1855
1855
@param[in] DRIZZLE DRIZZLE connection.
1856
1856
@param[in] mi Master connection information.
1857
1857
@param[in,out] retry_count Number of attempts to reconnect.
1864
1864
@retval 1 There was an error.
1867
static int32_t try_to_reconnect(Session *thd, DRIZZLE *drizzle, Master_info *mi,
1867
static int32_t try_to_reconnect(Session *session, DRIZZLE *drizzle, Master_info *mi,
1868
1868
uint32_t *retry_count, bool suppress_warnings,
1869
1869
const char *messages[SLAVE_RECON_MSG_MAX])
1871
1871
mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1872
thd->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
1872
session->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
1873
1873
drizzle_disconnect(drizzle);
1874
1874
if ((*retry_count)++)
1876
1876
if (*retry_count > master_retry_count)
1877
1877
return 1; // Don't retry forever
1878
safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1878
safe_sleep(session, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1881
if (check_io_slave_killed(thd, mi,
1881
if (check_io_slave_killed(session, mi,
1882
1882
_(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1884
thd->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
1884
session->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
1885
1885
if (!suppress_warnings)
1887
1887
char buf[256], llbuff[22];
1902
1902
sql_print_information("%s",buf);
1905
if (safe_reconnect(thd, drizzle, mi, 1) || io_slave_killed(thd, mi))
1905
if (safe_reconnect(session, drizzle, mi, 1) || io_slave_killed(session, mi))
1907
1907
if (global_system_variables.log_warnings)
1908
1908
sql_print_information("%s",_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1937
1937
mi->events_till_disconnect = disconnect_slave_event_count;
1940
Session_CHECK_SENTRY(thd);
1939
session= new Session;
1940
Session_CHECK_SENTRY(session);
1941
mi->io_session = session;
1943
1943
pthread_detach_this_thread();
1944
thd->thread_stack= (char*) &thd; // remember where our stack is
1945
if (init_slave_thread(thd, SLAVE_Session_IO))
1944
session->thread_stack= (char*) &session; // remember where our stack is
1945
if (init_slave_thread(session, SLAVE_Session_IO))
1947
1947
pthread_cond_broadcast(&mi->start_cond);
1948
1948
pthread_mutex_unlock(&mi->run_lock);
1967
thd->set_proc_info("Connecting to master");
1967
session->set_proc_info("Connecting to master");
1968
1968
// we can get killed during safe_connect
1969
if (!safe_connect(thd, drizzle, mi))
1969
if (!safe_connect(session, drizzle, mi))
1971
1971
sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
1972
1972
"replication started in log '%s' at position %s"),
1991
1991
// TODO: the assignment below should be under mutex (5.0)
1992
1992
mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
1993
thd->slave_net = &drizzle->net;
1994
thd->set_proc_info("Checking master version");
1993
session->slave_net = &drizzle->net;
1994
session->set_proc_info("Checking master version");
1995
1995
if (get_master_version_and_clock(drizzle, mi))
2001
2001
Register ourselves with the master.
2003
thd->set_proc_info("Registering slave on master");
2003
session->set_proc_info("Registering slave on master");
2004
2004
if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2006
if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
2006
if (!check_io_slave_killed(session, mi, "Slave I/O thread killed "
2007
2007
"while registering slave on master"))
2009
2009
sql_print_error(_("Slave I/O thread couldn't register on master"));
2010
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2010
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2011
2011
reconnect_messages[SLAVE_RECON_ACT_REG]))
2020
2020
retry_count_reg++;
2021
2021
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2022
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2022
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2023
2023
reconnect_messages[SLAVE_RECON_ACT_REG]))
2025
2025
goto connected;
2029
while (!io_slave_killed(thd,mi))
2029
while (!io_slave_killed(session,mi))
2031
thd->set_proc_info("Requesting binlog dump");
2031
session->set_proc_info("Requesting binlog dump");
2032
2032
if (request_dump(drizzle, mi, &suppress_warnings))
2034
2034
sql_print_error(_("Failed on request_dump()"));
2035
if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while \
2035
if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while \
2036
2036
requesting master dump")) ||
2037
try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2037
try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2038
2038
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2040
2040
goto connected;
2044
2044
retry_count_dump++;
2045
2045
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2046
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2046
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2047
2047
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2049
2049
goto connected;
2052
while (!io_slave_killed(thd,mi))
2052
while (!io_slave_killed(session,mi))
2054
2054
uint32_t event_len;
2058
2058
important thing is to not confuse users by saying "reading" whereas
2059
2059
we're in fact receiving nothing.
2061
thd->set_proc_info(_("Waiting for master to send event"));
2061
session->set_proc_info(_("Waiting for master to send event"));
2062
2062
event_len= read_event(drizzle, mi, &suppress_warnings);
2063
if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while "
2063
if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while "
2064
2064
"reading event")))
2066
2066
if (!retry_count_event)
2068
2068
retry_count_event++;
2069
2069
sql_print_information(_("Forcing to reconnect slave I/O thread"));
2070
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2070
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2071
2071
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2073
2073
goto connected;
2083
2083
"slave. If the entry is correct, restart the "
2084
2084
"server with a higher value of "
2085
2085
"max_allowed_packet"),
2086
thd->variables.max_allowed_packet);
2086
session->variables.max_allowed_packet);
2088
2088
case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2089
2089
sql_print_error(ER(drizzle_error_number), drizzle_error_number,
2095
2095
_("Stopping slave I/O thread due to out-of-memory error from master"));
2098
if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2098
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2099
2099
reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2101
2101
goto connected;
2102
2102
} // if (event_len == packet_error)
2104
2104
retry_count=0; // ok event, reset retry counter
2105
thd->set_proc_info(_("Queueing master event to the relay log"));
2105
session->set_proc_info(_("Queueing master event to the relay log"));
2106
2106
if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2143
2143
"position %s"),
2144
2144
IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
2145
2145
pthread_mutex_lock(&LOCK_thread_count);
2146
thd->query = thd->db = 0; // extra safety
2147
thd->query_length= thd->db_length= 0;
2146
session->query = session->db = 0; // extra safety
2147
session->query_length= session->db_length= 0;
2148
2148
pthread_mutex_unlock(&LOCK_thread_count);
2159
2159
drizzle_close(drizzle);
2162
write_ignored_events_info_to_relay_log(thd, mi);
2163
thd->set_proc_info(_("Waiting for slave mutex on exit"));
2162
write_ignored_events_info_to_relay_log(session, mi);
2163
session->set_proc_info(_("Waiting for slave mutex on exit"));
2164
2164
pthread_mutex_lock(&mi->run_lock);
2166
2166
/* Forget the relay log's format */
2167
2167
delete mi->rli.relay_log.description_event_for_queue;
2168
2168
mi->rli.relay_log.description_event_for_queue= 0;
2169
assert(thd->net.buff != 0);
2170
net_end(&thd->net); // destructor will not free it, because net.vio is 0
2171
close_thread_tables(thd);
2169
assert(session->net.buff != 0);
2170
net_end(&session->net); // destructor will not free it, because net.vio is 0
2171
close_thread_tables(session);
2172
2172
pthread_mutex_lock(&LOCK_thread_count);
2173
Session_CHECK_SENTRY(thd);
2173
Session_CHECK_SENTRY(session);
2175
2175
pthread_mutex_unlock(&LOCK_thread_count);
2176
2176
mi->abort_slave= 0;
2177
2177
mi->slave_running= 0;
2180
2180
Note: the order of the two following calls (first broadcast, then unlock)
2181
2181
is important. Otherwise a killer_thread can execute between the calls and
2208
2208
rli->events_till_abort = abort_slave_event_count;
2211
thd->thread_stack = (char*)&thd; // remember where our stack is
2210
session = new Session;
2211
session->thread_stack = (char*)&session; // remember where our stack is
2212
rli->sql_session= session;
2214
2214
/* Inform waiting threads that slave has started */
2215
2215
rli->slave_run_id++;
2216
2216
rli->slave_running = 1;
2218
2218
pthread_detach_this_thread();
2219
if (init_slave_thread(thd, SLAVE_Session_SQL))
2219
if (init_slave_thread(session, SLAVE_Session_SQL))
2222
2222
TODO: this is currently broken - slave start and change master
2227
2227
sql_print_error(_("Failed during slave thread initialization"));
2230
thd->init_for_queries();
2231
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
2230
session->init_for_queries();
2231
session->temporary_tables = rli->save_temporary_tables; // restore temp tables
2232
2232
pthread_mutex_lock(&LOCK_thread_count);
2233
threads.append(thd);
2233
threads.append(session);
2234
2234
pthread_mutex_unlock(&LOCK_thread_count);
2236
2236
We are going to set slave_running to 1. Assuming slave I/O thread is
2301
2301
/* execute init_slave variable */
2302
2302
if (sys_init_slave.value_length)
2304
execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
2305
if (thd->is_slave_error)
2304
execute_init_command(session, &sys_init_slave, &LOCK_sys_init_slave);
2305
if (session->is_slave_error)
2307
2307
sql_print_error(_("Slave SQL thread aborted. "
2308
2308
"Can't execute init_slave query"));
2329
2329
/* Read queries from the IO/THREAD until this thread is killed */
2331
while (!sql_slave_killed(thd,rli))
2331
while (!sql_slave_killed(session,rli))
2333
thd->set_proc_info(_("Reading event from the relay log"));
2334
assert(rli->sql_thd == thd);
2335
Session_CHECK_SENTRY(thd);
2336
if (exec_relay_log_event(thd,rli))
2333
session->set_proc_info(_("Reading event from the relay log"));
2334
assert(rli->sql_session == session);
2335
Session_CHECK_SENTRY(session);
2336
if (exec_relay_log_event(session,rli))
2338
2338
// do not scare the user if SQL thread was simply killed or stopped
2339
if (!sql_slave_killed(thd,rli))
2339
if (!sql_slave_killed(session,rli))
2342
retrieve as much info as possible from the thd and, error
2342
retrieve as much info as possible from the session and, error
2343
2343
codes and warnings and print this to the error log as to
2344
2344
allow the user to locate the error
2346
2346
uint32_t const last_errno= rli->last_error().number;
2348
if (thd->is_error())
2348
if (session->is_error())
2350
char const *const errmsg= thd->main_da.message();
2350
char const *const errmsg= session->main_da.message();
2352
2352
if (last_errno == 0)
2354
rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), "%s", errmsg);
2354
rli->report(ERROR_LEVEL, session->main_da.sql_errno(), "%s", errmsg);
2356
else if (last_errno != thd->main_da.sql_errno())
2356
else if (last_errno != session->main_da.sql_errno())
2358
2358
sql_print_error(_("Slave (additional info): %s Error_code: %d"),
2359
errmsg, thd->main_da.sql_errno());
2359
errmsg, session->main_da.sql_errno());
2363
2363
/* Print any warnings issued */
2364
List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
2364
List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
2365
2365
DRIZZLE_ERROR *err;
2367
2367
Added controlled slave thread cancel for replication
2408
2408
request is detected only by the present function, not by events), so we
2409
2409
must "proactively" clear playgrounds:
2411
rli->cleanup_context(thd, 1);
2411
rli->cleanup_context(session, 1);
2412
2412
pthread_mutex_lock(&LOCK_thread_count);
2414
2414
Some extra safety, which should not been needed (normally, event deletion
2415
2415
should already have done these assignments (each event which sets these
2416
2416
variables is supposed to set them to 0 before terminating)).
2418
thd->query= thd->db= thd->catalog= 0;
2419
thd->query_length= thd->db_length= 0;
2418
session->query= session->db= session->catalog= 0;
2419
session->query_length= session->db_length= 0;
2420
2420
pthread_mutex_unlock(&LOCK_thread_count);
2421
thd->set_proc_info("Waiting for slave mutex on exit");
2421
session->set_proc_info("Waiting for slave mutex on exit");
2422
2422
pthread_mutex_lock(&rli->run_lock);
2423
2423
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
2424
2424
pthread_mutex_lock(&rli->data_lock);
2432
2432
pthread_mutex_unlock(&rli->data_lock);
2433
2433
pthread_cond_broadcast(&rli->data_cond);
2434
2434
rli->ignore_log_space_limit= 0; /* don't need any lock */
2435
rli->save_temporary_tables = thd->temporary_tables;
2435
rli->save_temporary_tables = session->temporary_tables;
2438
2438
TODO: see if we can do this conditionally in next_event() instead
2439
2439
to avoid unneeded position re-init
2441
thd->temporary_tables = 0; // remove tempation from destructor to close them
2442
assert(thd->net.buff != 0);
2443
net_end(&thd->net); // destructor will not free it, because we are weird
2444
assert(rli->sql_thd == thd);
2445
Session_CHECK_SENTRY(thd);
2441
session->temporary_tables = 0; // remove tempation from destructor to close them
2442
assert(session->net.buff != 0);
2443
net_end(&session->net); // destructor will not free it, because we are weird
2444
assert(rli->sql_session == session);
2445
Session_CHECK_SENTRY(session);
2446
rli->sql_session= 0;
2447
2447
pthread_mutex_lock(&LOCK_thread_count);
2448
Session_CHECK_SENTRY(thd);
2448
Session_CHECK_SENTRY(session);
2450
2450
pthread_mutex_unlock(&LOCK_thread_count);
2452
2452
Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
3087
static int32_t safe_connect(Session* thd, DRIZZLE *drizzle, Master_info* mi)
3087
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi)
3089
return(connect_to_master(thd, drizzle, mi, 0, 0));
3089
return(connect_to_master(session, drizzle, mi, 0, 0));
3099
3099
master_retry_count times
3102
static int32_t connect_to_master(Session* thd, DRIZZLE *drizzle, Master_info* mi,
3102
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
3103
3103
bool reconnect, bool suppress_warnings)
3105
3105
int32_t slave_was_killed;
3115
3115
drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3116
3116
drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3118
while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3118
while (!(slave_was_killed = io_slave_killed(session,mi)) &&
3119
3119
(reconnect ? drizzle_reconnect(drizzle) != 0 :
3120
3120
drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
3121
3121
mi->getPort(), 0, client_flag) == 0))
3173
3173
master_retry_count times
3176
static int32_t safe_reconnect(Session* thd, DRIZZLE *drizzle, Master_info* mi,
3176
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
3177
3177
bool suppress_warnings)
3179
return(connect_to_master(thd, drizzle, mi, 1, suppress_warnings));
3179
return(connect_to_master(session, drizzle, mi, 1, suppress_warnings));
3440
3440
pthread_mutex_unlock(&rli->log_space_lock);
3441
3441
pthread_cond_broadcast(&rli->log_space_cond);
3442
3442
// Note that wait_for_update_relay_log unlocks lock_log !
3443
rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
3443
rli->relay_log.wait_for_update_relay_log(rli->sql_session);
3444
3444
// re-acquire data lock since we released it earlier
3445
3445
pthread_mutex_lock(&rli->data_lock);
3446
3446
rli->last_master_timestamp= save_timestamp;
3711
3711
Detect buggy master to work around.
3713
bool rpl_master_erroneous_autoinc(Session *thd)
3713
bool rpl_master_erroneous_autoinc(Session *session)
3715
if (active_mi && active_mi->rli.sql_thd == thd)
3715
if (active_mi && active_mi->rli.sql_session == session)
3717
3717
Relay_log_info *rli= &active_mi->rli;
3718
3718
return rpl_master_has_bug(rli, 33029, false);