1
1
/* Copyright (C) 2000-2003 DRIZZLE AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
143
143
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type);
144
144
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi);
145
145
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
146
bool suppress_warnings);
146
bool suppress_warnings);
147
147
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
148
bool reconnect, bool suppress_warnings);
148
bool reconnect, bool suppress_warnings);
149
149
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
150
void* thread_killed_arg);
150
void* thread_killed_arg);
151
151
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
152
152
static Log_event* next_event(Relay_log_info* rli);
153
153
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
154
154
static int32_t terminate_slave_thread(Session *session,
155
pthread_mutex_t* term_lock,
156
pthread_cond_t* term_cond,
157
volatile uint32_t *slave_running,
155
pthread_mutex_t* term_lock,
156
pthread_cond_t* term_cond,
157
volatile uint32_t *slave_running,
159
159
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info);
162
162
Find out which replications threads are running
166
mask Return value here
167
mi master_info for slave
168
inverse If set, returns which threads are not running
166
mask Return value here
167
mi master_info for slave
168
inverse If set, returns which threads are not running
171
Get a bit mask for which threads are running so that we can later restart
171
Get a bit mask for which threads are running so that we can later restart
175
mask If inverse == 0, running threads
176
If inverse == 1, stopped threads
175
mask If inverse == 0, running threads
176
If inverse == 1, stopped threads
179
179
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
352
Wait for a slave thread to terminate.
354
This function is called after requesting the thread to terminate
355
(by setting @c abort_slave member of @c Relay_log_info or @c
356
Master_info structure to 1). Termination of the thread is
357
controlled with the the predicate <code>*slave_running</code>.
359
Function will acquire @c term_lock before waiting on the condition
360
unless @c skip_lock is true in which case the mutex should be owned
361
by the caller of this function and will remain acquired after
362
return from the function.
365
Associated lock to use when waiting for @c term_cond
368
Condition that is signalled when the thread has terminated
371
Pointer to predicate to check for slave thread termination
374
If @c true the lock will not be acquired before waiting on
375
the condition. In this case, it is assumed that the calling
376
function acquires the lock before calling this function.
351
Wait for a slave thread to terminate.
353
This function is called after requesting the thread to terminate
354
(by setting @c abort_slave member of @c Relay_log_info or @c
355
Master_info structure to 1). Termination of the thread is
356
controlled with the the predicate <code>*slave_running</code>.
358
Function will acquire @c term_lock before waiting on the condition
359
unless @c skip_lock is true in which case the mutex should be owned
360
by the caller of this function and will remain acquired after
361
return from the function.
364
Associated lock to use when waiting for @c term_cond
367
Condition that is signalled when the thread has terminated
370
Pointer to predicate to check for slave thread termination
373
If @c true the lock will not be acquired before waiting on
374
the condition. In this case, it is assumed that the calling
375
function acquires the lock before calling this function.
381
380
terminate_slave_thread(Session *session,
382
381
pthread_mutex_t* term_lock,
1961
1960
// we can get killed during safe_connect
1962
1961
if (!safe_connect(session, drizzle, mi))
1964
sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
1965
"replication started in log '%s' at position %s"),
1966
mi->getUsername(), mi->getHostname(), mi->getPort(),
1968
llstr(mi->getLogPosition(), llbuff));
1970
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
1971
thread, since a replication event can become this much larger than
1972
the corresponding packet (query) sent from client to master.
1963
errmsg_printf(ERRMSG_LVL_INFO, _("Slave I/O thread: connected to master '%s@%s:%d',"
1964
"replication started in log '%s' at position %s"),
1965
mi->getUsername(), mi->getHostname(), mi->getPort(),
1967
llstr(mi->getLogPosition(), llbuff));
1969
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
1970
thread, since a replication event can become this much larger than
1971
the corresponding packet (query) sent from client to master.
1974
1973
drizzle->net.max_packet_size= session->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
1978
sql_print_information(_("Slave I/O thread killed while connecting to master"));
1977
errmsg_printf(ERRMSG_LVL_INFO, _("Slave I/O thread killed while connecting to master"));
2071
2070
uint32_t drizzle_error_number= drizzle_errno(drizzle);
2072
2071
switch (drizzle_error_number) {
2073
2072
case CR_NET_PACKET_TOO_LARGE:
2074
sql_print_error(_("Log entry on master is longer than "
2075
"max_allowed_packet (%u) on "
2076
"slave. If the entry is correct, restart the "
2077
"server with a higher value of "
2078
"max_allowed_packet"),
2079
session->variables.max_allowed_packet);
2073
errmsg_printf(ERRMSG_LVL_ERROR, _("Log entry on master is longer than "
2074
"max_allowed_packet (%u) on "
2075
"slave. If the entry is correct, restart the "
2076
"server with a higher value of "
2077
"max_allowed_packet"),
2078
session->variables.max_allowed_packet);
2081
2080
case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2082
sql_print_error(ER(drizzle_error_number), drizzle_error_number,
2083
drizzle_error(drizzle));
2081
errmsg_printf(ERRMSG_LVL_ERROR, ER(drizzle_error_number), drizzle_error_number,
2082
drizzle_error(drizzle));
2085
2084
case EE_OUTOFMEMORY:
2086
2085
case ER_OUTOFMEMORY:
2088
_("Stopping slave I/O thread due to out-of-memory error from master"));
2086
errmsg_printf(ERRMSG_LVL_ERROR,
2087
_("Stopping slave I/O thread due to out-of-memory error from master"));
2091
2090
if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2283
2282
assert(rli->sql_session == session);
2285
2284
if (global_system_variables.log_warnings)
2286
sql_print_information(_("Slave SQL thread initialized, "
2287
"starting replication in log '%s' at "
2288
"position %s, relay log '%s' position: %s"),
2290
llstr(rli->group_master_log_pos,llbuff),
2291
rli->group_relay_log_name.c_str(),
2292
llstr(rli->group_relay_log_pos,llbuff1));
2285
errmsg_printf(ERRMSG_LVL_INFO, _("Slave SQL thread initialized, "
2286
"starting replication in log '%s' at "
2287
"position %s, relay log '%s' position: %s"),
2289
llstr(rli->group_master_log_pos,llbuff),
2290
rli->group_relay_log_name.c_str(),
2291
llstr(rli->group_relay_log_pos,llbuff1));
2294
2293
/* execute init_slave variable */
2295
2294
if (sys_init_slave.value_length)
2366
2365
if (err->code == ER_CANT_OPEN_LIBRARY)
2367
2366
udf_error = true;
2368
sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
2367
errmsg_printf(ERRMSG_LVL_WARN, _("Slave: %s Error_code: %d"),err->msg, err->code);
2371
sql_print_error(_("Error loading user-defined library, slave SQL "
2372
"thread aborted. Install the missing library, "
2373
"and restart the slave SQL thread with "
2374
"\"SLAVE START\". We stopped at log '%s' "
2376
RPL_LOG_NAME, llstr(rli->group_master_log_pos,
2370
errmsg_printf(ERRMSG_LVL_ERROR, _("Error loading user-defined library, slave SQL "
2371
"thread aborted. Install the missing library, "
2372
"and restart the slave SQL thread with "
2373
"\"SLAVE START\". We stopped at log '%s' "
2375
RPL_LOG_NAME, llstr(rli->group_master_log_pos,
2379
sql_print_error(_("Error running query, slave SQL thread aborted. "
2380
"Fix the problem, and restart "
2381
"the slave SQL thread with \"SLAVE START\". "
2382
"We stopped at log '%s' position %s"),
2384
llstr(rli->group_master_log_pos, llbuff));
2378
errmsg_printf(ERRMSG_LVL_ERROR, _("Error running query, slave SQL thread aborted. "
2379
"Fix the problem, and restart "
2380
"the slave SQL thread with \"SLAVE START\". "
2381
"We stopped at log '%s' position %s"),
2383
llstr(rli->group_master_log_pos, llbuff));
2390
2389
/* Thread stopped. Print the current replication position to the log */
2391
sql_print_information(_("Slave SQL thread exiting, replication stopped in "
2392
"log '%s' at position %s"),
2394
llstr(rli->group_master_log_pos,llbuff));
2390
errmsg_printf(ERRMSG_LVL_INFO, _("Slave SQL thread exiting, replication stopped in "
2391
"log '%s' at position %s"),
2393
llstr(rli->group_master_log_pos,llbuff));
2399
2398
Some events set some playgrounds, which won't be cleared because thread
2859
2858
case ROTATE_EVENT:
2861
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
2862
if (unlikely(process_io_rotate(mi,&rev)))
2864
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2860
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
2861
if (unlikely(process_io_rotate(mi,&rev)))
2863
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2867
Now the I/O thread has just changed its mi->master_log_name, so
2868
incrementing mi->master_log_pos is nonsense.
2868
Now the I/O thread has just changed its mi->master_log_name, so
2869
incrementing mi->master_log_pos is nonsense.
2874
2873
case FORMAT_DESCRIPTION_EVENT:
2877
Create an event, and save it (when we rotate the relay log, we will have
2878
to write this event again).
2881
We are the only thread which reads/writes description_event_for_queue.
2882
The relay_log struct does not move (though some members of it can
2883
change), so we needn't any lock (no rli->data_lock, no log lock).
2885
Format_description_log_event* tmp;
2887
if (!(tmp= (Format_description_log_event*)
2888
Log_event::read_log_event(buf, event_len, &errmsg,
2889
mi->rli.relay_log.description_event_for_queue)))
2891
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2876
Create an event, and save it (when we rotate the relay log, we will have
2877
to write this event again).
2880
We are the only thread which reads/writes description_event_for_queue.
2881
The relay_log struct does not move (though some members of it can
2882
change), so we needn't any lock (no rli->data_lock, no log lock).
2884
Format_description_log_event* tmp;
2886
if (!(tmp= (Format_description_log_event*)
2887
Log_event::read_log_event(buf, event_len, &errmsg,
2888
mi->rli.relay_log.description_event_for_queue)))
2890
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2893
delete mi->rli.relay_log.description_event_for_queue;
2894
mi->rli.relay_log.description_event_for_queue= tmp;
2896
Though this does some conversion to the slave's format, this will
2897
preserve the master's binlog format version, and number of event types.
2900
If the event was not requested by the slave (the slave did not ask for
2901
it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
2903
inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
2894
delete mi->rli.relay_log.description_event_for_queue;
2895
mi->rli.relay_log.description_event_for_queue= tmp;
2897
Though this does some conversion to the slave's format, this will
2898
preserve the master's binlog format version, and number of event types.
2901
If the event was not requested by the slave (the slave did not ask for
2902
it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
2904
inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
2908
2907
case HEARTBEAT_LOG_EVENT:
2911
HB (heartbeat) cannot come before RL (Relay)
2914
Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
2917
error= ER_SLAVE_HEARTBEAT_FAILURE;
2918
error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2919
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2920
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2921
error_msg.append(STRING_WITH_LEN(" log_pos "));
2922
llstr(hb.log_pos, llbuf);
2923
error_msg.append(llbuf, strlen(llbuf));
2926
mi->received_heartbeats++;
2928
compare local and event's versions of log_file, log_pos.
2930
Heartbeat is sent only after an event corresponding to the corrdinates
2931
the heartbeat carries.
2932
Slave can not have a difference in coordinates except in the only
2933
special case when mi->master_log_name, master_log_pos have never
2934
been updated by Rotate event i.e when slave does not have any history
2935
with the master (and thereafter mi->master_log_pos is NULL).
2937
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2939
if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
2940
|| mi->getLogPosition() != hb.log_pos)
2942
/* missed events of heartbeat from the past */
2943
error= ER_SLAVE_HEARTBEAT_FAILURE;
2944
error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
2945
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2946
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2947
error_msg.append(STRING_WITH_LEN(" log_pos "));
2948
llstr(hb.log_pos, llbuf);
2949
error_msg.append(llbuf, strlen(llbuf));
2952
goto skip_relay_logging;
2910
HB (heartbeat) cannot come before RL (Relay)
2913
Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
2916
error= ER_SLAVE_HEARTBEAT_FAILURE;
2917
error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2918
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2919
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2920
error_msg.append(STRING_WITH_LEN(" log_pos "));
2921
llstr(hb.log_pos, llbuf);
2922
error_msg.append(llbuf, strlen(llbuf));
2925
mi->received_heartbeats++;
2927
compare local and event's versions of log_file, log_pos.
2929
Heartbeat is sent only after an event corresponding to the corrdinates
2930
the heartbeat carries.
2931
Slave can not have a difference in coordinates except in the only
2932
special case when mi->master_log_name, master_log_pos have never
2933
been updated by Rotate event i.e when slave does not have any history
2934
with the master (and thereafter mi->master_log_pos is NULL).
2936
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2938
if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
2939
|| mi->getLogPosition() != hb.log_pos)
2941
/* missed events of heartbeat from the past */
2942
error= ER_SLAVE_HEARTBEAT_FAILURE;
2943
error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
2944
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2945
error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2946
error_msg.append(STRING_WITH_LEN(" log_pos "));
2947
llstr(hb.log_pos, llbuf);
2948
error_msg.append(llbuf, strlen(llbuf));
2951
goto skip_relay_logging;
2957
2956
inc_pos= event_len;
2962
If this event is originating from this server, don't queue it.
2963
We don't check this for 3.23 events because it's simpler like this; 3.23
2964
will be filtered anyway by the SQL slave thread which also tests the
2965
server id (we must also keep this test in the SQL thread, in case somebody
2966
upgrades a 4.0 slave which has a not-filtered relay log).
2961
If this event is originating from this server, don't queue it.
2962
We don't check this for 3.23 events because it's simpler like this; 3.23
2963
will be filtered anyway by the SQL slave thread which also tests the
2964
server id (we must also keep this test in the SQL thread, in case somebody
2965
upgrades a 4.0 slave which has a not-filtered relay log).
2968
ANY event coming from ourselves can be ignored: it is obvious for queries;
2969
for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
2970
(--log-slave-updates would not log that) unless this slave is also its
2971
direct master (an unsupported, useless setup!).
2967
ANY event coming from ourselves can be ignored: it is obvious for queries;
2968
for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
2969
(--log-slave-updates would not log that) unless this slave is also its
2970
direct master (an unsupported, useless setup!).
2974
2973
pthread_mutex_lock(log_lock);
3176
flush_relay_log_info()
3177
rli Relay log information
3175
flush_relay_log_info()
3176
rli Relay log information
3180
- As this is only called by the slave thread, we don't need to
3181
have a lock on this.
3182
- If there is an active transaction, then we don't update the position
3183
in the relay log. This is to ensure that we re-execute statements
3184
if we die in the middle of an transaction that was rolled back.
3185
- As a transaction never spans binary logs, we don't have to handle the
3186
case where we do a relay-log-rotation in the middle of the transaction.
3187
If this would not be the case, we would have to ensure that we
3188
don't delete the relay log file where the transaction started when
3189
we switch to a new relay log file.
3179
- As this is only called by the slave thread, we don't need to
3180
have a lock on this.
3181
- If there is an active transaction, then we don't update the position
3182
in the relay log. This is to ensure that we re-execute statements
3183
if we die in the middle of an transaction that was rolled back.
3184
- As a transaction never spans binary logs, we don't have to handle the
3185
case where we do a relay-log-rotation in the middle of the transaction.
3186
If this would not be the case, we would have to ensure that we
3187
don't delete the relay log file where the transaction started when
3188
we switch to a new relay log file.
3192
- Change the log file information to a binary format to avoid calling
3191
- Change the log file information to a binary format to avoid calling
3200
3199
bool flush_relay_log_info(Relay_log_info* rli)
3396
3395
Possible deadlock :
3397
3396
- the I/O thread has reached log_space_limit
3398
3397
- the SQL thread has read all relay logs, but cannot purge for some
3400
* it has already purged all logs except the current one
3401
* there are other logs than the current one but they're involved in
3402
a transaction that finishes in the current one (or is not finished)
3404
Wake up the possibly waiting I/O thread, and set a boolean asking
3405
the I/O thread to temporarily ignore the log_space_limit
3406
constraint, because we do not want the I/O thread to block because of
3407
space (it's ok if it blocks for any other reason (e.g. because the
3408
master does not send anything). Then the I/O thread stops waiting
3409
and reads more events.
3410
The SQL thread decides when the I/O thread should take log_space_limit
3411
into account again : ignore_log_space_limit is reset to 0
3412
in purge_first_log (when the SQL thread purges the just-read relay
3413
log), and also when the SQL thread starts. We should also reset
3414
ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
3415
fact, no need as RESET SLAVE requires that the slave
3416
be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
3399
* it has already purged all logs except the current one
3400
* there are other logs than the current one but they're involved in
3401
a transaction that finishes in the current one (or is not finished)
3403
Wake up the possibly waiting I/O thread, and set a boolean asking
3404
the I/O thread to temporarily ignore the log_space_limit
3405
constraint, because we do not want the I/O thread to block because of
3406
space (it's ok if it blocks for any other reason (e.g. because the
3407
master does not send anything). Then the I/O thread stops waiting
3408
and reads more events.
3409
The SQL thread decides when the I/O thread should take log_space_limit
3410
into account again : ignore_log_space_limit is reset to 0
3411
in purge_first_log (when the SQL thread purges the just-read relay
3412
log), and also when the SQL thread starts. We should also reset
3413
ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
3414
fact, no need as RESET SLAVE requires that the slave
3415
be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
3419
3418
pthread_mutex_lock(&rli->log_space_lock);
3420
3419
// prevent the I/O thread from blocking next times
3692
BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
3693
exclusive, if one statement in a SP generated AUTO_INCREMENT value
3694
by the top statement, all statements after it would be considered
3695
generated AUTO_INCREMENT value by the top statement, and a
3696
erroneous INSERT_ID value might be associated with these statement,
3697
which could cause duplicate entry error and stop the slave.
3691
BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
3692
exclusive, if one statement in a SP generated AUTO_INCREMENT value
3693
by the top statement, all statements after it would be considered
3694
generated AUTO_INCREMENT value by the top statement, and a
3695
erroneous INSERT_ID value might be associated with these statement,
3696
which could cause duplicate entry error and stop the slave.
3699
Detect buggy master to work around.
3698
Detect buggy master to work around.
3701
3700
bool rpl_master_erroneous_autoinc(Session *session)
3703
3702
if (active_mi && active_mi->rli.sql_session == session)