1
by brian
clean slate |
1 |
/* Copyright (C) 2000-2003 MySQL AB
|
2 |
||
3 |
This program is free software; you can redistribute it and/or modify
|
|
4 |
it under the terms of the GNU General Public License as published by
|
|
5 |
the Free Software Foundation; version 2 of the License.
|
|
6 |
||
7 |
This program is distributed in the hope that it will be useful,
|
|
8 |
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
9 |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
10 |
GNU General Public License for more details.
|
|
11 |
||
12 |
You should have received a copy of the GNU General Public License
|
|
13 |
along with this program; if not, write to the Free Software
|
|
14 |
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
|
15 |
||
16 |
||
17 |
/**
|
|
18 |
@addtogroup Replication
|
|
19 |
@{
|
|
20 |
||
21 |
@file
|
|
22 |
||
23 |
@brief Code to run the io thread and the sql thread on the
|
|
24 |
replication slave.
|
|
25 |
*/
|
|
26 |
||
27 |
#include "mysql_priv.h" |
|
28 |
||
29 |
#include <mysql.h> |
|
30 |
#include <myisam.h> |
|
31 |
#include "slave.h" |
|
32 |
#include "rpl_mi.h" |
|
33 |
#include "rpl_rli.h" |
|
34 |
#include "sql_repl.h" |
|
35 |
#include "rpl_filter.h" |
|
36 |
#include "repl_failsafe.h" |
|
37 |
#include <thr_alarm.h> |
|
38 |
#include <my_dir.h> |
|
39 |
#include <sql_common.h> |
|
40 |
#include <errmsg.h> |
|
41 |
#include <mysys_err.h> |
|
42 |
||
43 |
#ifdef HAVE_REPLICATION
|
|
44 |
||
45 |
#include "rpl_tblmap.h" |
|
46 |
||
47 |
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
|
|
48 |
||
49 |
#define MAX_SLAVE_RETRY_PAUSE 5
|
|
50 |
bool use_slave_mask = 0; |
|
51 |
MY_BITMAP slave_error_mask; |
|
52 |
||
53 |
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*); |
|
54 |
||
55 |
char* slave_load_tmpdir = 0; |
|
56 |
Master_info *active_mi= 0; |
|
57 |
my_bool replicate_same_server_id; |
|
58 |
ulonglong relay_log_space_limit = 0; |
|
59 |
||
60 |
/*
|
|
61 |
When slave thread exits, we need to remember the temporary tables so we
|
|
62 |
can re-use them on slave start.
|
|
63 |
||
64 |
TODO: move the vars below under Master_info
|
|
65 |
*/
|
|
66 |
||
67 |
int disconnect_slave_event_count = 0, abort_slave_event_count = 0; |
|
68 |
int events_till_abort = -1; |
|
69 |
||
70 |
enum enum_slave_reconnect_actions |
|
71 |
{
|
|
72 |
SLAVE_RECON_ACT_REG= 0, |
|
73 |
SLAVE_RECON_ACT_DUMP= 1, |
|
74 |
SLAVE_RECON_ACT_EVENT= 2, |
|
75 |
SLAVE_RECON_ACT_MAX
|
|
76 |
};
|
|
77 |
||
78 |
enum enum_slave_reconnect_messages |
|
79 |
{
|
|
80 |
SLAVE_RECON_MSG_WAIT= 0, |
|
81 |
SLAVE_RECON_MSG_KILLED_WAITING= 1, |
|
82 |
SLAVE_RECON_MSG_AFTER= 2, |
|
83 |
SLAVE_RECON_MSG_FAILED= 3, |
|
84 |
SLAVE_RECON_MSG_COMMAND= 4, |
|
85 |
SLAVE_RECON_MSG_KILLED_AFTER= 5, |
|
86 |
SLAVE_RECON_MSG_MAX
|
|
87 |
};
|
|
88 |
||
89 |
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]= |
|
90 |
{
|
|
91 |
{
|
|
92 |
"Waiting to reconnect after a failed registration on master", |
|
93 |
"Slave I/O thread killed while waitnig to reconnect after a failed \
|
|
94 |
registration on master", |
|
95 |
"Reconnecting after a failed registration on master", |
|
96 |
"failed registering on master, reconnecting to try again, \
|
|
97 |
log '%s' at postion %s", |
|
98 |
"COM_REGISTER_SLAVE", |
|
99 |
"Slave I/O thread killed during or after reconnect"
|
|
100 |
},
|
|
101 |
{
|
|
102 |
"Waiting to reconnect after a failed binlog dump request", |
|
103 |
"Slave I/O thread killed while retrying master dump", |
|
104 |
"Reconnecting after a failed binlog dump request", |
|
105 |
"failed dump request, reconnecting to try again, log '%s' at postion %s", |
|
106 |
"COM_BINLOG_DUMP", |
|
107 |
"Slave I/O thread killed during or after reconnect"
|
|
108 |
},
|
|
109 |
{
|
|
110 |
"Waiting to reconnect after a failed master event read", |
|
111 |
"Slave I/O thread killed while waiting to reconnect after a failed read", |
|
112 |
"Reconnecting after a failed master event read", |
|
113 |
"Slave I/O thread: Failed reading log event, reconnecting to retry, \
|
|
114 |
log '%s' at postion %s", |
|
115 |
"", |
|
116 |
"Slave I/O thread killed during or after a reconnect done to recover from \
|
|
117 |
failed read"
|
|
118 |
}
|
|
119 |
};
|
|
120 |
||
121 |
||
122 |
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE; |
|
123 |
||
124 |
static int process_io_rotate(Master_info* mi, Rotate_log_event* rev); |
|
125 |
static int process_io_create_file(Master_info* mi, Create_file_log_event* cev); |
|
126 |
static bool wait_for_relay_log_space(Relay_log_info* rli); |
|
127 |
static inline bool io_slave_killed(THD* thd,Master_info* mi); |
|
128 |
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli); |
|
129 |
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type); |
|
130 |
static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi); |
|
131 |
static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, |
|
132 |
bool suppress_warnings); |
|
133 |
static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, |
|
134 |
bool reconnect, bool suppress_warnings); |
|
135 |
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed, |
|
136 |
void* thread_killed_arg); |
|
137 |
static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi); |
|
138 |
static Log_event* next_event(Relay_log_info* rli); |
|
139 |
static int queue_event(Master_info* mi,const char* buf,ulong event_len); |
|
140 |
static int terminate_slave_thread(THD *thd, |
|
141 |
pthread_mutex_t* term_lock, |
|
142 |
pthread_cond_t* term_cond, |
|
143 |
volatile uint *slave_running, |
|
144 |
bool skip_lock); |
|
145 |
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info); |
|
146 |
||
147 |
/*
|
|
148 |
Find out which replications threads are running
|
|
149 |
||
150 |
SYNOPSIS
|
|
151 |
init_thread_mask()
|
|
152 |
mask Return value here
|
|
153 |
mi master_info for slave
|
|
154 |
inverse If set, returns which threads are not running
|
|
155 |
||
156 |
IMPLEMENTATION
|
|
157 |
Get a bit mask for which threads are running so that we can later restart
|
|
158 |
these threads.
|
|
159 |
||
160 |
RETURN
|
|
161 |
mask If inverse == 0, running threads
|
|
162 |
If inverse == 1, stopped threads
|
|
163 |
*/
|
|
164 |
||
165 |
void init_thread_mask(int* mask,Master_info* mi,bool inverse) |
|
166 |
{
|
|
167 |
bool set_io = mi->slave_running, set_sql = mi->rli.slave_running; |
|
168 |
register int tmp_mask=0; |
|
169 |
DBUG_ENTER("init_thread_mask"); |
|
170 |
||
171 |
if (set_io) |
|
172 |
tmp_mask |= SLAVE_IO; |
|
173 |
if (set_sql) |
|
174 |
tmp_mask |= SLAVE_SQL; |
|
175 |
if (inverse) |
|
176 |
tmp_mask^= (SLAVE_IO | SLAVE_SQL); |
|
177 |
*mask = tmp_mask; |
|
178 |
DBUG_VOID_RETURN; |
|
179 |
}
|
|
180 |
||
181 |
||
182 |
/*
|
|
183 |
lock_slave_threads()
|
|
184 |
*/
|
|
185 |
||
186 |
void lock_slave_threads(Master_info* mi) |
|
187 |
{
|
|
188 |
DBUG_ENTER("lock_slave_threads"); |
|
189 |
||
190 |
//TODO: see if we can do this without dual mutex
|
|
191 |
pthread_mutex_lock(&mi->run_lock); |
|
192 |
pthread_mutex_lock(&mi->rli.run_lock); |
|
193 |
DBUG_VOID_RETURN; |
|
194 |
}
|
|
195 |
||
196 |
||
197 |
/*
|
|
198 |
unlock_slave_threads()
|
|
199 |
*/
|
|
200 |
||
201 |
void unlock_slave_threads(Master_info* mi) |
|
202 |
{
|
|
203 |
DBUG_ENTER("unlock_slave_threads"); |
|
204 |
||
205 |
//TODO: see if we can do this without dual mutex
|
|
206 |
pthread_mutex_unlock(&mi->rli.run_lock); |
|
207 |
pthread_mutex_unlock(&mi->run_lock); |
|
208 |
DBUG_VOID_RETURN; |
|
209 |
}
|
|
210 |
||
211 |
||
212 |
/* Initialize slave structures */
|
|
213 |
||
214 |
int init_slave() |
|
215 |
{
|
|
216 |
DBUG_ENTER("init_slave"); |
|
217 |
||
218 |
/*
|
|
219 |
This is called when mysqld starts. Before client connections are
|
|
220 |
accepted. However bootstrap may conflict with us if it does START SLAVE.
|
|
221 |
So it's safer to take the lock.
|
|
222 |
*/
|
|
223 |
pthread_mutex_lock(&LOCK_active_mi); |
|
224 |
/*
|
|
225 |
TODO: re-write this to interate through the list of files
|
|
226 |
for multi-master
|
|
227 |
*/
|
|
228 |
active_mi= new Master_info; |
|
229 |
||
230 |
/*
|
|
231 |
If master_host is not specified, try to read it from the master_info file.
|
|
232 |
If master_host is specified, create the master_info file if it doesn't
|
|
233 |
exists.
|
|
234 |
*/
|
|
235 |
if (!active_mi) |
|
236 |
{
|
|
237 |
sql_print_error("Failed to allocate memory for the master info structure"); |
|
238 |
goto err; |
|
239 |
}
|
|
240 |
||
241 |
if (init_master_info(active_mi,master_info_file,relay_log_info_file, |
|
242 |
1, (SLAVE_IO | SLAVE_SQL))) |
|
243 |
{
|
|
244 |
sql_print_error("Failed to initialize the master info structure"); |
|
245 |
goto err; |
|
246 |
}
|
|
247 |
||
248 |
/* If server id is not set, start_slave_thread() will say it */
|
|
249 |
||
250 |
if (active_mi->host[0] && !opt_skip_slave_start) |
|
251 |
{
|
|
252 |
if (start_slave_threads(1 /* need mutex */, |
|
253 |
0 /* no wait for start*/, |
|
254 |
active_mi, |
|
255 |
master_info_file, |
|
256 |
relay_log_info_file, |
|
257 |
SLAVE_IO | SLAVE_SQL)) |
|
258 |
{
|
|
259 |
sql_print_error("Failed to create slave threads"); |
|
260 |
goto err; |
|
261 |
}
|
|
262 |
}
|
|
263 |
pthread_mutex_unlock(&LOCK_active_mi); |
|
264 |
DBUG_RETURN(0); |
|
265 |
||
266 |
err: |
|
267 |
pthread_mutex_unlock(&LOCK_active_mi); |
|
268 |
DBUG_RETURN(1); |
|
269 |
}
|
|
270 |
||
271 |
||
272 |
/*
|
|
273 |
Init function to set up array for errors that should be skipped for slave
|
|
274 |
||
275 |
SYNOPSIS
|
|
276 |
init_slave_skip_errors()
|
|
277 |
arg List of errors numbers to skip, separated with ','
|
|
278 |
||
279 |
NOTES
|
|
280 |
Called from get_options() in mysqld.cc on start-up
|
|
281 |
*/
|
|
282 |
||
283 |
void init_slave_skip_errors(const char* arg) |
|
284 |
{
|
|
285 |
const char *p; |
|
286 |
DBUG_ENTER("init_slave_skip_errors"); |
|
287 |
||
288 |
if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0)) |
|
289 |
{
|
|
290 |
fprintf(stderr, "Badly out of memory, please check your system status\n"); |
|
291 |
exit(1); |
|
292 |
}
|
|
293 |
use_slave_mask = 1; |
|
294 |
for (;my_isspace(system_charset_info,*arg);++arg) |
|
295 |
/* empty */; |
|
296 |
if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4)) |
|
297 |
{
|
|
298 |
bitmap_set_all(&slave_error_mask); |
|
299 |
DBUG_VOID_RETURN; |
|
300 |
}
|
|
301 |
for (p= arg ; *p; ) |
|
302 |
{
|
|
303 |
long err_code; |
|
304 |
if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code))) |
|
305 |
break; |
|
306 |
if (err_code < MAX_SLAVE_ERROR) |
|
307 |
bitmap_set_bit(&slave_error_mask,(uint)err_code); |
|
308 |
while (!my_isdigit(system_charset_info,*p) && *p) |
|
309 |
p++; |
|
310 |
}
|
|
311 |
DBUG_VOID_RETURN; |
|
312 |
}
|
|
313 |
||
314 |
||
315 |
int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) |
|
316 |
{
|
|
317 |
DBUG_ENTER("terminate_slave_threads"); |
|
318 |
||
319 |
if (!mi->inited) |
|
320 |
DBUG_RETURN(0); /* successfully do nothing */ |
|
321 |
int error,force_all = (thread_mask & SLAVE_FORCE_ALL); |
|
322 |
pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock; |
|
323 |
||
324 |
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL))) |
|
325 |
{
|
|
326 |
DBUG_PRINT("info",("Terminating IO thread")); |
|
327 |
mi->abort_slave=1; |
|
328 |
if ((error=terminate_slave_thread(mi->io_thd,io_lock, |
|
329 |
&mi->stop_cond, |
|
330 |
&mi->slave_running, |
|
331 |
skip_lock)) && |
|
332 |
!force_all) |
|
333 |
DBUG_RETURN(error); |
|
334 |
}
|
|
335 |
if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL))) |
|
336 |
{
|
|
337 |
DBUG_PRINT("info",("Terminating SQL thread")); |
|
338 |
mi->rli.abort_slave=1; |
|
339 |
if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock, |
|
340 |
&mi->rli.stop_cond, |
|
341 |
&mi->rli.slave_running, |
|
342 |
skip_lock)) && |
|
343 |
!force_all) |
|
344 |
DBUG_RETURN(error); |
|
345 |
}
|
|
346 |
DBUG_RETURN(0); |
|
347 |
}
|
|
348 |
||
349 |
||
350 |
/**
|
|
351 |
Wait for a slave thread to terminate.
|
|
352 |
||
353 |
This function is called after requesting the thread to terminate
|
|
354 |
(by setting @c abort_slave member of @c Relay_log_info or @c
|
|
355 |
Master_info structure to 1). Termination of the thread is
|
|
356 |
controlled with the the predicate <code>*slave_running</code>.
|
|
357 |
||
358 |
Function will acquire @c term_lock before waiting on the condition
|
|
359 |
unless @c skip_lock is true in which case the mutex should be owned
|
|
360 |
by the caller of this function and will remain acquired after
|
|
361 |
return from the function.
|
|
362 |
||
363 |
@param term_lock
|
|
364 |
Associated lock to use when waiting for @c term_cond
|
|
365 |
||
366 |
@param term_cond
|
|
367 |
Condition that is signalled when the thread has terminated
|
|
368 |
||
369 |
@param slave_running
|
|
370 |
Pointer to predicate to check for slave thread termination
|
|
371 |
||
372 |
@param skip_lock
|
|
373 |
If @c true the lock will not be acquired before waiting on
|
|
374 |
the condition. In this case, it is assumed that the calling
|
|
375 |
function acquires the lock before calling this function.
|
|
376 |
||
377 |
@retval 0 All OK
|
|
378 |
*/
|
|
379 |
static int |
|
380 |
terminate_slave_thread(THD *thd, |
|
381 |
pthread_mutex_t* term_lock, |
|
382 |
pthread_cond_t* term_cond, |
|
383 |
volatile uint *slave_running, |
|
384 |
bool skip_lock) |
|
385 |
{
|
|
386 |
int error; |
|
387 |
||
388 |
DBUG_ENTER("terminate_slave_thread"); |
|
389 |
||
390 |
if (!skip_lock) |
|
391 |
pthread_mutex_lock(term_lock); |
|
392 |
||
393 |
safe_mutex_assert_owner(term_lock); |
|
394 |
||
395 |
if (!*slave_running) |
|
396 |
{
|
|
397 |
if (!skip_lock) |
|
398 |
pthread_mutex_unlock(term_lock); |
|
399 |
DBUG_RETURN(ER_SLAVE_NOT_RUNNING); |
|
400 |
}
|
|
401 |
DBUG_ASSERT(thd != 0); |
|
402 |
THD_CHECK_SENTRY(thd); |
|
403 |
||
404 |
/*
|
|
405 |
Is is critical to test if the slave is running. Otherwise, we might
|
|
406 |
be referening freed memory trying to kick it
|
|
407 |
*/
|
|
408 |
||
409 |
while (*slave_running) // Should always be true |
|
410 |
{
|
|
411 |
DBUG_PRINT("loop", ("killing slave thread")); |
|
412 |
||
413 |
pthread_mutex_lock(&thd->LOCK_delete); |
|
414 |
#ifndef DONT_USE_THR_ALARM
|
|
415 |
/*
|
|
416 |
Error codes from pthread_kill are:
|
|
417 |
EINVAL: invalid signal number (can't happen)
|
|
418 |
ESRCH: thread already killed (can happen, should be ignored)
|
|
419 |
*/
|
|
420 |
IF_DBUG(int err= ) pthread_kill(thd->real_id, thr_client_alarm); |
|
421 |
DBUG_ASSERT(err != EINVAL); |
|
422 |
#endif
|
|
423 |
thd->awake(THD::NOT_KILLED); |
|
424 |
pthread_mutex_unlock(&thd->LOCK_delete); |
|
425 |
||
426 |
/*
|
|
427 |
There is a small chance that slave thread might miss the first
|
|
428 |
alarm. To protect againts it, resend the signal until it reacts
|
|
429 |
*/
|
|
430 |
struct timespec abstime; |
|
431 |
set_timespec(abstime,2); |
|
432 |
error= pthread_cond_timedwait(term_cond, term_lock, &abstime); |
|
433 |
DBUG_ASSERT(error == ETIMEDOUT || error == 0); |
|
434 |
}
|
|
435 |
||
436 |
DBUG_ASSERT(*slave_running == 0); |
|
437 |
||
438 |
if (!skip_lock) |
|
439 |
pthread_mutex_unlock(term_lock); |
|
440 |
DBUG_RETURN(0); |
|
441 |
}
|
|
442 |
||
443 |
||
444 |
int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock, |
|
445 |
pthread_mutex_t *cond_lock, |
|
446 |
pthread_cond_t *start_cond, |
|
447 |
volatile uint *slave_running, |
|
448 |
volatile ulong *slave_run_id, |
|
449 |
Master_info* mi, |
|
450 |
bool high_priority) |
|
451 |
{
|
|
452 |
pthread_t th; |
|
453 |
ulong start_id; |
|
454 |
DBUG_ENTER("start_slave_thread"); |
|
455 |
||
456 |
DBUG_ASSERT(mi->inited); |
|
457 |
||
458 |
if (start_lock) |
|
459 |
pthread_mutex_lock(start_lock); |
|
460 |
if (!server_id) |
|
461 |
{
|
|
462 |
if (start_cond) |
|
463 |
pthread_cond_broadcast(start_cond); |
|
464 |
if (start_lock) |
|
465 |
pthread_mutex_unlock(start_lock); |
|
466 |
sql_print_error("Server id not set, will not start slave"); |
|
467 |
DBUG_RETURN(ER_BAD_SLAVE); |
|
468 |
}
|
|
469 |
||
470 |
if (*slave_running) |
|
471 |
{
|
|
472 |
if (start_cond) |
|
473 |
pthread_cond_broadcast(start_cond); |
|
474 |
if (start_lock) |
|
475 |
pthread_mutex_unlock(start_lock); |
|
476 |
DBUG_RETURN(ER_SLAVE_MUST_STOP); |
|
477 |
}
|
|
478 |
start_id= *slave_run_id; |
|
479 |
DBUG_PRINT("info",("Creating new slave thread")); |
|
480 |
if (high_priority) |
|
481 |
my_pthread_attr_setprio(&connection_attrib,CONNECT_PRIOR); |
|
482 |
if (pthread_create(&th, &connection_attrib, h_func, (void*)mi)) |
|
483 |
{
|
|
484 |
if (start_lock) |
|
485 |
pthread_mutex_unlock(start_lock); |
|
486 |
DBUG_RETURN(ER_SLAVE_THREAD); |
|
487 |
}
|
|
488 |
if (start_cond && cond_lock) // caller has cond_lock |
|
489 |
{
|
|
490 |
THD* thd = current_thd; |
|
491 |
while (start_id == *slave_run_id) |
|
492 |
{
|
|
493 |
DBUG_PRINT("sleep",("Waiting for slave thread to start")); |
|
494 |
const char* old_msg = thd->enter_cond(start_cond,cond_lock, |
|
495 |
"Waiting for slave thread to start"); |
|
496 |
pthread_cond_wait(start_cond,cond_lock); |
|
497 |
thd->exit_cond(old_msg); |
|
498 |
pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released |
|
499 |
if (thd->killed) |
|
500 |
DBUG_RETURN(thd->killed_errno()); |
|
501 |
}
|
|
502 |
}
|
|
503 |
if (start_lock) |
|
504 |
pthread_mutex_unlock(start_lock); |
|
505 |
DBUG_RETURN(0); |
|
506 |
}
|
|
507 |
||
508 |
||
509 |
/*
|
|
510 |
start_slave_threads()
|
|
511 |
||
512 |
NOTES
|
|
513 |
SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
|
|
514 |
sense to do that for starting a slave--we always care if it actually
|
|
515 |
started the threads that were not previously running
|
|
516 |
*/
|
|
517 |
||
518 |
int start_slave_threads(bool need_slave_mutex, bool wait_for_start, |
|
519 |
Master_info* mi, const char* master_info_fname, |
|
520 |
const char* slave_info_fname, int thread_mask) |
|
521 |
{
|
|
522 |
pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0; |
|
523 |
pthread_cond_t* cond_io=0,*cond_sql=0; |
|
524 |
int error=0; |
|
525 |
DBUG_ENTER("start_slave_threads"); |
|
526 |
||
527 |
if (need_slave_mutex) |
|
528 |
{
|
|
529 |
lock_io = &mi->run_lock; |
|
530 |
lock_sql = &mi->rli.run_lock; |
|
531 |
}
|
|
532 |
if (wait_for_start) |
|
533 |
{
|
|
534 |
cond_io = &mi->start_cond; |
|
535 |
cond_sql = &mi->rli.start_cond; |
|
536 |
lock_cond_io = &mi->run_lock; |
|
537 |
lock_cond_sql = &mi->rli.run_lock; |
|
538 |
}
|
|
539 |
||
540 |
if (thread_mask & SLAVE_IO) |
|
541 |
error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io, |
|
542 |
cond_io, |
|
543 |
&mi->slave_running, &mi->slave_run_id, |
|
544 |
mi, 1); //high priority, to read the most possible |
|
545 |
if (!error && (thread_mask & SLAVE_SQL)) |
|
546 |
{
|
|
547 |
error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql, |
|
548 |
cond_sql, |
|
549 |
&mi->rli.slave_running, &mi->rli.slave_run_id, |
|
550 |
mi, 0); |
|
551 |
if (error) |
|
552 |
terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0); |
|
553 |
}
|
|
554 |
DBUG_RETURN(error); |
|
555 |
}
|
|
556 |
||
557 |
||
558 |
#ifdef NOT_USED_YET
|
|
559 |
static int end_slave_on_walk(Master_info* mi, uchar* /*unused*/) |
|
560 |
{
|
|
561 |
DBUG_ENTER("end_slave_on_walk"); |
|
562 |
||
563 |
end_master_info(mi); |
|
564 |
DBUG_RETURN(0); |
|
565 |
}
|
|
566 |
#endif
|
|
567 |
||
568 |
||
569 |
/*
|
|
570 |
Free all resources used by slave
|
|
571 |
||
572 |
SYNOPSIS
|
|
573 |
end_slave()
|
|
574 |
*/
|
|
575 |
||
576 |
void end_slave() |
|
577 |
{
|
|
578 |
DBUG_ENTER("end_slave"); |
|
579 |
||
580 |
/*
|
|
581 |
This is called when the server terminates, in close_connections().
|
|
582 |
It terminates slave threads. However, some CHANGE MASTER etc may still be
|
|
583 |
running presently. If a START SLAVE was in progress, the mutex lock below
|
|
584 |
will make us wait until slave threads have started, and START SLAVE
|
|
585 |
returns, then we terminate them here.
|
|
586 |
*/
|
|
587 |
pthread_mutex_lock(&LOCK_active_mi); |
|
588 |
if (active_mi) |
|
589 |
{
|
|
590 |
/*
|
|
591 |
TODO: replace the line below with
|
|
592 |
list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
|
|
593 |
once multi-master code is ready.
|
|
594 |
*/
|
|
595 |
terminate_slave_threads(active_mi,SLAVE_FORCE_ALL); |
|
596 |
end_master_info(active_mi); |
|
597 |
delete active_mi; |
|
598 |
active_mi= 0; |
|
599 |
}
|
|
600 |
pthread_mutex_unlock(&LOCK_active_mi); |
|
601 |
DBUG_VOID_RETURN; |
|
602 |
}
|
|
603 |
||
604 |
||
605 |
static bool io_slave_killed(THD* thd, Master_info* mi) |
|
606 |
{
|
|
607 |
DBUG_ENTER("io_slave_killed"); |
|
608 |
||
609 |
DBUG_ASSERT(mi->io_thd == thd); |
|
610 |
DBUG_ASSERT(mi->slave_running); // tracking buffer overrun |
|
611 |
DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed); |
|
612 |
}
|
|
613 |
||
614 |
||
615 |
static bool sql_slave_killed(THD* thd, Relay_log_info* rli) |
|
616 |
{
|
|
617 |
DBUG_ENTER("sql_slave_killed"); |
|
618 |
||
619 |
DBUG_ASSERT(rli->sql_thd == thd); |
|
620 |
DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun |
|
621 |
if (abort_loop || thd->killed || rli->abort_slave) |
|
622 |
{
|
|
623 |
/*
|
|
624 |
If we are in an unsafe situation (stopping could corrupt replication),
|
|
625 |
we give one minute to the slave SQL thread of grace before really
|
|
626 |
terminating, in the hope that it will be able to read more events and
|
|
627 |
the unsafe situation will soon be left. Note that this one minute starts
|
|
628 |
from the last time anything happened in the slave SQL thread. So it's
|
|
629 |
really one minute of idleness, we don't timeout if the slave SQL thread
|
|
630 |
is actively working.
|
|
631 |
*/
|
|
632 |
if (rli->last_event_start_time == 0) |
|
633 |
DBUG_RETURN(1); |
|
634 |
DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving " |
|
635 |
"it some grace period")); |
|
636 |
if (difftime(time(0), rli->last_event_start_time) > 60) |
|
637 |
{
|
|
638 |
rli->report(ERROR_LEVEL, 0, |
|
639 |
"SQL thread had to stop in an unsafe situation, in "
|
|
640 |
"the middle of applying updates to a "
|
|
641 |
"non-transactional table without any primary key. "
|
|
642 |
"There is a risk of duplicate updates when the slave "
|
|
643 |
"SQL thread is restarted. Please check your tables' "
|
|
644 |
"contents after restart."); |
|
645 |
DBUG_RETURN(1); |
|
646 |
}
|
|
647 |
}
|
|
648 |
DBUG_RETURN(0); |
|
649 |
}
|
|
650 |
||
651 |
||
652 |
/*
|
|
653 |
skip_load_data_infile()
|
|
654 |
||
655 |
NOTES
|
|
656 |
This is used to tell a 3.23 master to break send_file()
|
|
657 |
*/
|
|
658 |
||
659 |
void skip_load_data_infile(NET *net) |
|
660 |
{
|
|
661 |
DBUG_ENTER("skip_load_data_infile"); |
|
662 |
||
663 |
(void)net_request_file(net, "/dev/null"); |
|
664 |
(void)my_net_read(net); // discard response |
|
665 |
(void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok |
|
666 |
DBUG_VOID_RETURN; |
|
667 |
}
|
|
668 |
||
669 |
||
670 |
bool net_request_file(NET* net, const char* fname) |
|
671 |
{
|
|
672 |
DBUG_ENTER("net_request_file"); |
|
673 |
DBUG_RETURN(net_write_command(net, 251, (uchar*) fname, strlen(fname), |
|
674 |
(uchar*) "", 0)); |
|
675 |
}
|
|
676 |
||
677 |
/*
|
|
678 |
From other comments and tests in code, it looks like
|
|
679 |
sometimes Query_log_event and Load_log_event can have db == 0
|
|
680 |
(see rewrite_db() above for example)
|
|
681 |
(cases where this happens are unclear; it may be when the master is 3.23).
|
|
682 |
*/
|
|
683 |
||
684 |
const char *print_slave_db_safe(const char* db) |
|
685 |
{
|
|
686 |
DBUG_ENTER("*print_slave_db_safe"); |
|
687 |
||
688 |
DBUG_RETURN((db ? db : "")); |
|
689 |
}
|
|
690 |
||
691 |
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, |
|
692 |
const char *default_val) |
|
693 |
{
|
|
694 |
uint length; |
|
695 |
DBUG_ENTER("init_strvar_from_file"); |
|
696 |
||
697 |
if ((length=my_b_gets(f,var, max_size))) |
|
698 |
{
|
|
699 |
char* last_p = var + length -1; |
|
700 |
if (*last_p == '\n') |
|
701 |
*last_p = 0; // if we stopped on newline, kill it |
|
702 |
else
|
|
703 |
{
|
|
704 |
/*
|
|
705 |
If we truncated a line or stopped on last char, remove all chars
|
|
706 |
up to and including newline.
|
|
707 |
*/
|
|
708 |
int c; |
|
709 |
while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)); |
|
710 |
}
|
|
711 |
DBUG_RETURN(0); |
|
712 |
}
|
|
713 |
else if (default_val) |
|
714 |
{
|
|
715 |
strmake(var, default_val, max_size-1); |
|
716 |
DBUG_RETURN(0); |
|
717 |
}
|
|
718 |
DBUG_RETURN(1); |
|
719 |
}
|
|
720 |
||
721 |
||
722 |
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val) |
|
723 |
{
|
|
724 |
char buf[32]; |
|
725 |
DBUG_ENTER("init_intvar_from_file"); |
|
726 |
||
727 |
||
728 |
if (my_b_gets(f, buf, sizeof(buf))) |
|
729 |
{
|
|
730 |
*var = atoi(buf); |
|
731 |
DBUG_RETURN(0); |
|
732 |
}
|
|
733 |
else if (default_val) |
|
734 |
{
|
|
735 |
*var = default_val; |
|
736 |
DBUG_RETURN(0); |
|
737 |
}
|
|
738 |
DBUG_RETURN(1); |
|
739 |
}
|
|
740 |
||
741 |
int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val) |
|
742 |
{
|
|
743 |
char buf[16]; |
|
744 |
DBUG_ENTER("init_floatvar_from_file"); |
|
745 |
||
746 |
||
747 |
if (my_b_gets(f, buf, sizeof(buf))) |
|
748 |
{
|
|
749 |
if (sscanf(buf, "%f", var) != 1) |
|
750 |
DBUG_RETURN(1); |
|
751 |
else
|
|
752 |
DBUG_RETURN(0); |
|
753 |
}
|
|
754 |
else if (default_val != 0.0) |
|
755 |
{
|
|
756 |
*var = default_val; |
|
757 |
DBUG_RETURN(0); |
|
758 |
}
|
|
759 |
DBUG_RETURN(1); |
|
760 |
}
|
|
761 |
||
762 |
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info) |
|
763 |
{
|
|
764 |
if (io_slave_killed(thd, mi)) |
|
765 |
{
|
|
766 |
if (info && global_system_variables.log_warnings) |
|
767 |
sql_print_information(info); |
|
768 |
return TRUE; |
|
769 |
}
|
|
770 |
return FALSE; |
|
771 |
}
|
|
772 |
||
773 |
||
774 |
/*
|
|
775 |
Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
|
|
776 |
relying on the binlog's version. This is not perfect: imagine an upgrade
|
|
777 |
of the master without waiting that all slaves are in sync with the master;
|
|
778 |
then a slave could be fooled about the binlog's format. This is what happens
|
|
779 |
when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
|
|
780 |
slaves are fooled. So we do this only to distinguish between 3.23 and more
|
|
781 |
recent masters (it's too late to change things for 3.23).
|
|
782 |
||
783 |
RETURNS
|
|
784 |
0 ok
|
|
785 |
1 error
|
|
786 |
*/
|
|
787 |
||
788 |
static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) |
|
789 |
{
|
|
790 |
char error_buf[512]; |
|
791 |
String err_msg(error_buf, sizeof(error_buf), &my_charset_bin); |
|
792 |
char err_buff[MAX_SLAVE_ERRMSG]; |
|
793 |
const char* errmsg= 0; |
|
794 |
int err_code= 0; |
|
795 |
MYSQL_RES *master_res= 0; |
|
796 |
MYSQL_ROW master_row; |
|
797 |
DBUG_ENTER("get_master_version_and_clock"); |
|
798 |
||
799 |
err_msg.length(0); |
|
800 |
/*
|
|
801 |
Free old description_event_for_queue (that is needed if we are in
|
|
802 |
a reconnection).
|
|
803 |
*/
|
|
804 |
delete mi->rli.relay_log.description_event_for_queue; |
|
805 |
mi->rli.relay_log.description_event_for_queue= 0; |
|
806 |
||
807 |
if (!my_isdigit(&my_charset_bin,*mysql->server_version)) |
|
808 |
{
|
|
809 |
errmsg = "Master reported unrecognized MySQL version"; |
|
810 |
err_code= ER_SLAVE_FATAL_ERROR; |
|
811 |
sprintf(err_buff, ER(err_code), errmsg); |
|
812 |
err_msg.append(err_buff); |
|
813 |
}
|
|
814 |
else
|
|
815 |
{
|
|
816 |
/*
|
|
817 |
Note the following switch will bug when we have MySQL branch 30 ;)
|
|
818 |
*/
|
|
819 |
switch (*mysql->server_version) |
|
820 |
{
|
|
821 |
case '0': |
|
822 |
case '1': |
|
823 |
case '2': |
|
824 |
errmsg = "Master reported unrecognized MySQL version"; |
|
825 |
err_code= ER_SLAVE_FATAL_ERROR; |
|
826 |
sprintf(err_buff, ER(err_code), errmsg); |
|
827 |
err_msg.append(err_buff); |
|
828 |
break; |
|
829 |
case '3': |
|
830 |
mi->rli.relay_log.description_event_for_queue= new |
|
831 |
Format_description_log_event(1, mysql->server_version); |
|
832 |
break; |
|
833 |
case '4': |
|
834 |
mi->rli.relay_log.description_event_for_queue= new |
|
835 |
Format_description_log_event(3, mysql->server_version); |
|
836 |
break; |
|
837 |
default: |
|
838 |
/*
|
|
839 |
Master is MySQL >=5.0. Give a default Format_desc event, so that we can
|
|
840 |
take the early steps (like tests for "is this a 3.23 master") which we
|
|
841 |
have to take before we receive the real master's Format_desc which will
|
|
842 |
override this one. Note that the Format_desc we create below is garbage
|
|
843 |
(it has the format of the *slave*); it's only good to help know if the
|
|
844 |
master is 3.23, 4.0, etc.
|
|
845 |
*/
|
|
846 |
mi->rli.relay_log.description_event_for_queue= new |
|
847 |
Format_description_log_event(4, mysql->server_version); |
|
848 |
break; |
|
849 |
}
|
|
850 |
}
|
|
851 |
||
852 |
/*
|
|
853 |
This does not mean that a 5.0 slave will be able to read a 6.0 master; but
|
|
854 |
as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
|
|
855 |
can't read a 6.0 master, this will show up when the slave can't read some
|
|
856 |
events sent by the master, and there will be error messages.
|
|
857 |
*/
|
|
858 |
||
859 |
if (err_msg.length() != 0) |
|
860 |
goto err; |
|
861 |
||
862 |
/* as we are here, we tried to allocate the event */
|
|
863 |
if (!mi->rli.relay_log.description_event_for_queue) |
|
864 |
{
|
|
865 |
errmsg= "default Format_description_log_event"; |
|
866 |
err_code= ER_SLAVE_CREATE_EVENT_FAILURE; |
|
867 |
sprintf(err_buff, ER(err_code), errmsg); |
|
868 |
err_msg.append(err_buff); |
|
869 |
goto err; |
|
870 |
}
|
|
871 |
||
872 |
/*
|
|
873 |
Compare the master and slave's clock. Do not die if master's clock is
|
|
874 |
unavailable (very old master not supporting UNIX_TIMESTAMP()?).
|
|
875 |
*/
|
|
876 |
||
877 |
if (!mysql_real_query(mysql, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) && |
|
878 |
(master_res= mysql_store_result(mysql)) && |
|
879 |
(master_row= mysql_fetch_row(master_res))) |
|
880 |
{
|
|
881 |
mi->clock_diff_with_master= |
|
882 |
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10)); |
|
883 |
}
|
|
884 |
else if (!check_io_slave_killed(mi->io_thd, mi, NULL)) |
|
885 |
{
|
|
886 |
mi->clock_diff_with_master= 0; /* The "most sensible" value */ |
|
887 |
sql_print_warning("\"SELECT UNIX_TIMESTAMP()\" failed on master, " |
|
888 |
"do not trust column Seconds_Behind_Master of SHOW "
|
|
889 |
"SLAVE STATUS. Error: %s (%d)", |
|
890 |
mysql_error(mysql), mysql_errno(mysql)); |
|
891 |
}
|
|
892 |
if (master_res) |
|
893 |
mysql_free_result(master_res); |
|
894 |
||
895 |
/*
|
|
896 |
Check that the master's server id and ours are different. Because if they
|
|
897 |
are equal (which can result from a simple copy of master's datadir to slave,
|
|
898 |
thus copying some my.cnf), replication will work but all events will be
|
|
899 |
skipped.
|
|
900 |
Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
|
|
901 |
master?).
|
|
902 |
Note: we could have put a @@SERVER_ID in the previous SELECT
|
|
903 |
UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
|
|
904 |
*/
|
|
905 |
if (!mysql_real_query(mysql, |
|
906 |
STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) && |
|
907 |
(master_res= mysql_store_result(mysql))) |
|
908 |
{
|
|
909 |
if ((master_row= mysql_fetch_row(master_res)) && |
|
910 |
(::server_id == strtoul(master_row[1], 0, 10)) && |
|
911 |
!mi->rli.replicate_same_server_id) |
|
912 |
{
|
|
913 |
errmsg= |
|
914 |
"The slave I/O thread stops because master and slave have equal"
|
|
915 |
" MySQL server ids; these ids must be different for replication to work (or"
|
|
916 |
" the --replicate-same-server-id option must be used on slave but this does"
|
|
917 |
" not always make sense; please check the manual before using it)."; |
|
918 |
err_code= ER_SLAVE_FATAL_ERROR; |
|
919 |
sprintf(err_buff, ER(err_code), errmsg); |
|
920 |
err_msg.append(err_buff); |
|
921 |
}
|
|
922 |
mysql_free_result(master_res); |
|
923 |
if (errmsg) |
|
924 |
goto err; |
|
925 |
}
|
|
926 |
||
927 |
/*
|
|
928 |
Check that the master's global character_set_server and ours are the same.
|
|
929 |
Not fatal if query fails (old master?).
|
|
930 |
Note that we don't check for equality of global character_set_client and
|
|
931 |
collation_connection (neither do we prevent their setting in
|
|
932 |
set_var.cc). That's because from what I (Guilhem) have tested, the global
|
|
933 |
values of these 2 are never used (new connections don't use them).
|
|
934 |
We don't test equality of global collation_database either as it's is
|
|
935 |
going to be deprecated (made read-only) in 4.1 very soon.
|
|
936 |
The test is only relevant if master < 5.0.3 (we'll test only if it's older
|
|
937 |
than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
|
|
938 |
charset info in each binlog event.
|
|
939 |
We don't do it for 3.23 because masters <3.23.50 hang on
|
|
940 |
SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
|
|
941 |
test only if master is 4.x.
|
|
942 |
*/
|
|
943 |
||
944 |
/* redundant with rest of code but safer against later additions */
|
|
945 |
if (*mysql->server_version == '3') |
|
946 |
goto err; |
|
947 |
||
948 |
if ((*mysql->server_version == '4') && |
|
949 |
!mysql_real_query(mysql, |
|
950 |
STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) && |
|
951 |
(master_res= mysql_store_result(mysql))) |
|
952 |
{
|
|
953 |
if ((master_row= mysql_fetch_row(master_res)) && |
|
954 |
strcmp(master_row[0], global_system_variables.collation_server->name)) |
|
955 |
{
|
|
956 |
errmsg= |
|
957 |
"The slave I/O thread stops because master and slave have"
|
|
958 |
" different values for the COLLATION_SERVER global variable."
|
|
959 |
" The values must be equal for replication to work"; |
|
960 |
err_code= ER_SLAVE_FATAL_ERROR; |
|
961 |
sprintf(err_buff, ER(err_code), errmsg); |
|
962 |
err_msg.append(err_buff); |
|
963 |
}
|
|
964 |
mysql_free_result(master_res); |
|
965 |
if (errmsg) |
|
966 |
goto err; |
|
967 |
}
|
|
968 |
||
969 |
/*
|
|
970 |
Perform analogous check for time zone. Theoretically we also should
|
|
971 |
perform check here to verify that SYSTEM time zones are the same on
|
|
972 |
slave and master, but we can't rely on value of @@system_time_zone
|
|
973 |
variable (it is time zone abbreviation) since it determined at start
|
|
974 |
time and so could differ for slave and master even if they are really
|
|
975 |
in the same system time zone. So we are omiting this check and just
|
|
976 |
relying on documentation. Also according to Monty there are many users
|
|
977 |
who are using replication between servers in various time zones. Hence
|
|
978 |
such check will broke everything for them. (And now everything will
|
|
979 |
work for them because by default both their master and slave will have
|
|
980 |
'SYSTEM' time zone).
|
|
981 |
This check is only necessary for 4.x masters (and < 5.0.4 masters but
|
|
982 |
those were alpha).
|
|
983 |
*/
|
|
984 |
if ((*mysql->server_version == '4') && |
|
985 |
!mysql_real_query(mysql, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) && |
|
986 |
(master_res= mysql_store_result(mysql))) |
|
987 |
{
|
|
988 |
if ((master_row= mysql_fetch_row(master_res)) && |
|
989 |
strcmp(master_row[0], |
|
990 |
global_system_variables.time_zone->get_name()->ptr())) |
|
991 |
{
|
|
992 |
errmsg= |
|
993 |
"The slave I/O thread stops because master and slave have"
|
|
994 |
" different values for the TIME_ZONE global variable."
|
|
995 |
" The values must be equal for replication to work"; |
|
996 |
err_code= ER_SLAVE_FATAL_ERROR; |
|
997 |
sprintf(err_buff, ER(err_code), errmsg); |
|
998 |
err_msg.append(err_buff); |
|
999 |
}
|
|
1000 |
mysql_free_result(master_res); |
|
1001 |
||
1002 |
if (errmsg) |
|
1003 |
goto err; |
|
1004 |
}
|
|
1005 |
||
1006 |
if (mi->heartbeat_period != 0.0) |
|
1007 |
{
|
|
1008 |
char llbuf[22]; |
|
1009 |
const char query_format[]= "SET @master_heartbeat_period= %s"; |
|
1010 |
char query[sizeof(query_format) - 2 + sizeof(llbuf)]; |
|
1011 |
/*
|
|
1012 |
the period is an ulonglong of nano-secs.
|
|
1013 |
*/
|
|
1014 |
llstr((ulonglong) (mi->heartbeat_period*1000000000UL), llbuf); |
|
1015 |
my_sprintf(query, (query, query_format, llbuf)); |
|
1016 |
||
1017 |
if (mysql_real_query(mysql, query, strlen(query)) |
|
1018 |
&& !check_io_slave_killed(mi->io_thd, mi, NULL)) |
|
1019 |
{
|
|
1020 |
err_msg.append("The slave I/O thread stops because querying master with '"); |
|
1021 |
err_msg.append(query); |
|
1022 |
err_msg.append("' failed;"); |
|
1023 |
err_msg.append(" error: "); |
|
1024 |
err_code= mysql_errno(mysql); |
|
1025 |
err_msg.qs_append(err_code); |
|
1026 |
err_msg.append(" '"); |
|
1027 |
err_msg.append(mysql_error(mysql)); |
|
1028 |
err_msg.append("'"); |
|
1029 |
mysql_free_result(mysql_store_result(mysql)); |
|
1030 |
goto err; |
|
1031 |
}
|
|
1032 |
mysql_free_result(mysql_store_result(mysql)); |
|
1033 |
}
|
|
1034 |
||
1035 |
err: |
|
1036 |
if (err_msg.length() != 0) |
|
1037 |
{
|
|
1038 |
sql_print_error(err_msg.ptr()); |
|
1039 |
DBUG_ASSERT(err_code != 0); |
|
1040 |
mi->report(ERROR_LEVEL, err_code, err_msg.ptr()); |
|
1041 |
DBUG_RETURN(1); |
|
1042 |
}
|
|
1043 |
||
1044 |
DBUG_RETURN(0); |
|
1045 |
}
|
|
1046 |
||
1047 |
||
1048 |
static bool wait_for_relay_log_space(Relay_log_info* rli) |
|
1049 |
{
|
|
1050 |
bool slave_killed=0; |
|
1051 |
Master_info* mi = rli->mi; |
|
1052 |
const char *save_proc_info; |
|
1053 |
THD* thd = mi->io_thd; |
|
1054 |
DBUG_ENTER("wait_for_relay_log_space"); |
|
1055 |
||
1056 |
pthread_mutex_lock(&rli->log_space_lock); |
|
1057 |
save_proc_info= thd->enter_cond(&rli->log_space_cond, |
|
1058 |
&rli->log_space_lock, |
|
1059 |
"\
|
|
1060 |
Waiting for the slave SQL thread to free enough relay log space"); |
|
1061 |
while (rli->log_space_limit < rli->log_space_total && |
|
1062 |
!(slave_killed=io_slave_killed(thd,mi)) && |
|
1063 |
!rli->ignore_log_space_limit) |
|
1064 |
pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock); |
|
1065 |
thd->exit_cond(save_proc_info); |
|
1066 |
DBUG_RETURN(slave_killed); |
|
1067 |
}
|
|
1068 |
||
1069 |
||
1070 |
/*
|
|
1071 |
Builds a Rotate from the ignored events' info and writes it to relay log.
|
|
1072 |
||
1073 |
SYNOPSIS
|
|
1074 |
write_ignored_events_info_to_relay_log()
|
|
1075 |
thd pointer to I/O thread's thd
|
|
1076 |
mi
|
|
1077 |
||
1078 |
DESCRIPTION
|
|
1079 |
Slave I/O thread, going to die, must leave a durable trace of the
|
|
1080 |
ignored events' end position for the use of the slave SQL thread, by
|
|
1081 |
calling this function. Only that thread can call it (see assertion).
|
|
1082 |
*/
|
|
1083 |
static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi) |
|
1084 |
{
|
|
1085 |
Relay_log_info *rli= &mi->rli; |
|
1086 |
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock(); |
|
1087 |
DBUG_ENTER("write_ignored_events_info_to_relay_log"); |
|
1088 |
||
1089 |
DBUG_ASSERT(thd == mi->io_thd); |
|
1090 |
pthread_mutex_lock(log_lock); |
|
1091 |
if (rli->ign_master_log_name_end[0]) |
|
1092 |
{
|
|
1093 |
DBUG_PRINT("info",("writing a Rotate event to track down ignored events")); |
|
1094 |
Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end, |
|
1095 |
0, rli->ign_master_log_pos_end, |
|
1096 |
Rotate_log_event::DUP_NAME); |
|
1097 |
rli->ign_master_log_name_end[0]= 0; |
|
1098 |
/* can unlock before writing as slave SQL thd will soon see our Rotate */
|
|
1099 |
pthread_mutex_unlock(log_lock); |
|
1100 |
if (likely((bool)ev)) |
|
1101 |
{
|
|
1102 |
ev->server_id= 0; // don't be ignored by slave SQL thread |
|
1103 |
if (unlikely(rli->relay_log.append(ev))) |
|
1104 |
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, |
|
1105 |
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), |
|
1106 |
"failed to write a Rotate event"
|
|
1107 |
" to the relay log, SHOW SLAVE STATUS may be"
|
|
1108 |
" inaccurate"); |
|
1109 |
rli->relay_log.harvest_bytes_written(&rli->log_space_total); |
|
1110 |
if (flush_master_info(mi, 1)) |
|
1111 |
sql_print_error("Failed to flush master info file"); |
|
1112 |
delete ev; |
|
1113 |
}
|
|
1114 |
else
|
|
1115 |
mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, |
|
1116 |
ER(ER_SLAVE_CREATE_EVENT_FAILURE), |
|
1117 |
"Rotate_event (out of memory?),"
|
|
1118 |
" SHOW SLAVE STATUS may be inaccurate"); |
|
1119 |
}
|
|
1120 |
else
|
|
1121 |
pthread_mutex_unlock(log_lock); |
|
1122 |
DBUG_VOID_RETURN; |
|
1123 |
}
|
|
1124 |
||
1125 |
||
1126 |
int register_slave_on_master(MYSQL* mysql, Master_info *mi, |
|
1127 |
bool *suppress_warnings) |
|
1128 |
{
|
|
1129 |
uchar buf[1024], *pos= buf; |
|
1130 |
uint report_host_len, report_user_len=0, report_password_len=0; |
|
1131 |
DBUG_ENTER("register_slave_on_master"); |
|
1132 |
||
1133 |
*suppress_warnings= FALSE; |
|
1134 |
if (!report_host) |
|
1135 |
DBUG_RETURN(0); |
|
1136 |
report_host_len= strlen(report_host); |
|
1137 |
if (report_user) |
|
1138 |
report_user_len= strlen(report_user); |
|
1139 |
if (report_password) |
|
1140 |
report_password_len= strlen(report_password); |
|
1141 |
/* 30 is a good safety margin */
|
|
1142 |
if (report_host_len + report_user_len + report_password_len + 30 > |
|
1143 |
sizeof(buf)) |
|
1144 |
DBUG_RETURN(0); // safety |
|
1145 |
||
1146 |
int4store(pos, server_id); pos+= 4; |
|
1147 |
pos= net_store_data(pos, (uchar*) report_host, report_host_len); |
|
1148 |
pos= net_store_data(pos, (uchar*) report_user, report_user_len); |
|
1149 |
pos= net_store_data(pos, (uchar*) report_password, report_password_len); |
|
1150 |
int2store(pos, (uint16) report_port); pos+= 2; |
|
1151 |
int4store(pos, rpl_recovery_rank); pos+= 4; |
|
1152 |
/* The master will fill in master_id */
|
|
1153 |
int4store(pos, 0); pos+= 4; |
|
1154 |
||
1155 |
if (simple_command(mysql, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0)) |
|
1156 |
{
|
|
1157 |
if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED) |
|
1158 |
{
|
|
1159 |
*suppress_warnings= TRUE; // Suppress reconnect warning |
|
1160 |
}
|
|
1161 |
else if (!check_io_slave_killed(mi->io_thd, mi, NULL)) |
|
1162 |
{
|
|
1163 |
char buf[256]; |
|
1164 |
my_snprintf(buf, sizeof(buf), "%s (Errno: %d)", mysql_error(mysql), |
|
1165 |
mysql_errno(mysql)); |
|
1166 |
mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE, |
|
1167 |
ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf); |
|
1168 |
}
|
|
1169 |
DBUG_RETURN(1); |
|
1170 |
}
|
|
1171 |
DBUG_RETURN(0); |
|
1172 |
}
|
|
1173 |
||
1174 |
||
1175 |
bool show_master_info(THD* thd, Master_info* mi) |
|
1176 |
{
|
|
1177 |
// TODO: fix this for multi-master
|
|
1178 |
List<Item> field_list; |
|
1179 |
Protocol *protocol= thd->protocol; |
|
1180 |
DBUG_ENTER("show_master_info"); |
|
1181 |
||
1182 |
field_list.push_back(new Item_empty_string("Slave_IO_State", |
|
1183 |
14)); |
|
1184 |
field_list.push_back(new Item_empty_string("Master_Host", |
|
1185 |
sizeof(mi->host))); |
|
1186 |
field_list.push_back(new Item_empty_string("Master_User", |
|
1187 |
sizeof(mi->user))); |
|
1188 |
field_list.push_back(new Item_return_int("Master_Port", 7, |
|
1189 |
MYSQL_TYPE_LONG)); |
|
1190 |
field_list.push_back(new Item_return_int("Connect_Retry", 10, |
|
1191 |
MYSQL_TYPE_LONG)); |
|
1192 |
field_list.push_back(new Item_empty_string("Master_Log_File", |
|
1193 |
FN_REFLEN)); |
|
1194 |
field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10, |
|
1195 |
MYSQL_TYPE_LONGLONG)); |
|
1196 |
field_list.push_back(new Item_empty_string("Relay_Log_File", |
|
1197 |
FN_REFLEN)); |
|
1198 |
field_list.push_back(new Item_return_int("Relay_Log_Pos", 10, |
|
1199 |
MYSQL_TYPE_LONGLONG)); |
|
1200 |
field_list.push_back(new Item_empty_string("Relay_Master_Log_File", |
|
1201 |
FN_REFLEN)); |
|
1202 |
field_list.push_back(new Item_empty_string("Slave_IO_Running", 3)); |
|
1203 |
field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3)); |
|
1204 |
field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20)); |
|
1205 |
field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20)); |
|
1206 |
field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20)); |
|
1207 |
field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23)); |
|
1208 |
field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24)); |
|
1209 |
field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table", |
|
1210 |
28)); |
|
1211 |
field_list.push_back(new Item_return_int("Last_Errno", 4, MYSQL_TYPE_LONG)); |
|
1212 |
field_list.push_back(new Item_empty_string("Last_Error", 20)); |
|
1213 |
field_list.push_back(new Item_return_int("Skip_Counter", 10, |
|
1214 |
MYSQL_TYPE_LONG)); |
|
1215 |
field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10, |
|
1216 |
MYSQL_TYPE_LONGLONG)); |
|
1217 |
field_list.push_back(new Item_return_int("Relay_Log_Space", 10, |
|
1218 |
MYSQL_TYPE_LONGLONG)); |
|
1219 |
field_list.push_back(new Item_empty_string("Until_Condition", 6)); |
|
1220 |
field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN)); |
|
1221 |
field_list.push_back(new Item_return_int("Until_Log_Pos", 10, |
|
1222 |
MYSQL_TYPE_LONGLONG)); |
|
1223 |
field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7)); |
|
1224 |
field_list.push_back(new Item_empty_string("Master_SSL_CA_File", |
|
1225 |
sizeof(mi->ssl_ca))); |
|
1226 |
field_list.push_back(new Item_empty_string("Master_SSL_CA_Path", |
|
1227 |
sizeof(mi->ssl_capath))); |
|
1228 |
field_list.push_back(new Item_empty_string("Master_SSL_Cert", |
|
1229 |
sizeof(mi->ssl_cert))); |
|
1230 |
field_list.push_back(new Item_empty_string("Master_SSL_Cipher", |
|
1231 |
sizeof(mi->ssl_cipher))); |
|
1232 |
field_list.push_back(new Item_empty_string("Master_SSL_Key", |
|
1233 |
sizeof(mi->ssl_key))); |
|
1234 |
field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10, |
|
1235 |
MYSQL_TYPE_LONGLONG)); |
|
1236 |
field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert", |
|
1237 |
3)); |
|
1238 |
field_list.push_back(new Item_return_int("Last_IO_Errno", 4, MYSQL_TYPE_LONG)); |
|
1239 |
field_list.push_back(new Item_empty_string("Last_IO_Error", 20)); |
|
1240 |
field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, MYSQL_TYPE_LONG)); |
|
1241 |
field_list.push_back(new Item_empty_string("Last_SQL_Error", 20)); |
|
1242 |
||
1243 |
if (protocol->send_fields(&field_list, |
|
1244 |
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) |
|
1245 |
DBUG_RETURN(TRUE); |
|
1246 |
||
1247 |
if (mi->host[0]) |
|
1248 |
{
|
|
1249 |
DBUG_PRINT("info",("host is set: '%s'", mi->host)); |
|
1250 |
String *packet= &thd->packet; |
|
1251 |
protocol->prepare_for_resend(); |
|
1252 |
||
1253 |
/*
|
|
1254 |
slave_running can be accessed without run_lock but not other
|
|
1255 |
non-volotile members like mi->io_thd, which is guarded by the mutex.
|
|
1256 |
*/
|
|
1257 |
pthread_mutex_lock(&mi->run_lock); |
|
1258 |
protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin); |
|
1259 |
pthread_mutex_unlock(&mi->run_lock); |
|
1260 |
||
1261 |
pthread_mutex_lock(&mi->data_lock); |
|
1262 |
pthread_mutex_lock(&mi->rli.data_lock); |
|
1263 |
protocol->store(mi->host, &my_charset_bin); |
|
1264 |
protocol->store(mi->user, &my_charset_bin); |
|
1265 |
protocol->store((uint32) mi->port); |
|
1266 |
protocol->store((uint32) mi->connect_retry); |
|
1267 |
protocol->store(mi->master_log_name, &my_charset_bin); |
|
1268 |
protocol->store((ulonglong) mi->master_log_pos); |
|
1269 |
protocol->store(mi->rli.group_relay_log_name + |
|
1270 |
dirname_length(mi->rli.group_relay_log_name), |
|
1271 |
&my_charset_bin); |
|
1272 |
protocol->store((ulonglong) mi->rli.group_relay_log_pos); |
|
1273 |
protocol->store(mi->rli.group_master_log_name, &my_charset_bin); |
|
1274 |
protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ? |
|
1275 |
"Yes" : "No", &my_charset_bin); |
|
1276 |
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin); |
|
1277 |
protocol->store(rpl_filter->get_do_db()); |
|
1278 |
protocol->store(rpl_filter->get_ignore_db()); |
|
1279 |
||
1280 |
char buf[256]; |
|
1281 |
String tmp(buf, sizeof(buf), &my_charset_bin); |
|
1282 |
rpl_filter->get_do_table(&tmp); |
|
1283 |
protocol->store(&tmp); |
|
1284 |
rpl_filter->get_ignore_table(&tmp); |
|
1285 |
protocol->store(&tmp); |
|
1286 |
rpl_filter->get_wild_do_table(&tmp); |
|
1287 |
protocol->store(&tmp); |
|
1288 |
rpl_filter->get_wild_ignore_table(&tmp); |
|
1289 |
protocol->store(&tmp); |
|
1290 |
||
1291 |
protocol->store(mi->rli.last_error().number); |
|
1292 |
protocol->store(mi->rli.last_error().message, &my_charset_bin); |
|
1293 |
protocol->store((uint32) mi->rli.slave_skip_counter); |
|
1294 |
protocol->store((ulonglong) mi->rli.group_master_log_pos); |
|
1295 |
protocol->store((ulonglong) mi->rli.log_space_total); |
|
1296 |
||
1297 |
protocol->store( |
|
1298 |
mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None": |
|
1299 |
( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master": |
|
1300 |
"Relay"), &my_charset_bin); |
|
1301 |
protocol->store(mi->rli.until_log_name, &my_charset_bin); |
|
1302 |
protocol->store((ulonglong) mi->rli.until_log_pos); |
|
1303 |
||
1304 |
protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin); |
|
1305 |
protocol->store(mi->ssl_ca, &my_charset_bin); |
|
1306 |
protocol->store(mi->ssl_capath, &my_charset_bin); |
|
1307 |
protocol->store(mi->ssl_cert, &my_charset_bin); |
|
1308 |
protocol->store(mi->ssl_cipher, &my_charset_bin); |
|
1309 |
protocol->store(mi->ssl_key, &my_charset_bin); |
|
1310 |
||
1311 |
/*
|
|
1312 |
Seconds_Behind_Master: if SQL thread is running and I/O thread is
|
|
1313 |
connected, we can compute it otherwise show NULL (i.e. unknown).
|
|
1314 |
*/
|
|
1315 |
if ((mi->slave_running == MYSQL_SLAVE_RUN_CONNECT) && |
|
1316 |
mi->rli.slave_running) |
|
1317 |
{
|
|
1318 |
long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp) |
|
1319 |
- mi->clock_diff_with_master); |
|
1320 |
/*
|
|
1321 |
Apparently on some systems time_diff can be <0. Here are possible
|
|
1322 |
reasons related to MySQL:
|
|
1323 |
- the master is itself a slave of another master whose time is ahead.
|
|
1324 |
- somebody used an explicit SET TIMESTAMP on the master.
|
|
1325 |
Possible reason related to granularity-to-second of time functions
|
|
1326 |
(nothing to do with MySQL), which can explain a value of -1:
|
|
1327 |
assume the master's and slave's time are perfectly synchronized, and
|
|
1328 |
that at slave's connection time, when the master's timestamp is read,
|
|
1329 |
it is at the very end of second 1, and (a very short time later) when
|
|
1330 |
the slave's timestamp is read it is at the very beginning of second
|
|
1331 |
2. Then the recorded value for master is 1 and the recorded value for
|
|
1332 |
slave is 2. At SHOW SLAVE STATUS time, assume that the difference
|
|
1333 |
between timestamp of slave and rli->last_master_timestamp is 0
|
|
1334 |
(i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
|
|
1335 |
This confuses users, so we don't go below 0: hence the max().
|
|
1336 |
||
1337 |
last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
|
|
1338 |
special marker to say "consider we have caught up".
|
|
1339 |
*/
|
|
1340 |
protocol->store((longlong)(mi->rli.last_master_timestamp ? |
|
1341 |
max(0, time_diff) : 0)); |
|
1342 |
}
|
|
1343 |
else
|
|
1344 |
{
|
|
1345 |
protocol->store_null(); |
|
1346 |
}
|
|
1347 |
protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin); |
|
1348 |
||
1349 |
// Last_IO_Errno
|
|
1350 |
protocol->store(mi->last_error().number); |
|
1351 |
// Last_IO_Error
|
|
1352 |
protocol->store(mi->last_error().message, &my_charset_bin); |
|
1353 |
// Last_SQL_Errno
|
|
1354 |
protocol->store(mi->rli.last_error().number); |
|
1355 |
// Last_SQL_Error
|
|
1356 |
protocol->store(mi->rli.last_error().message, &my_charset_bin); |
|
1357 |
||
1358 |
pthread_mutex_unlock(&mi->rli.data_lock); |
|
1359 |
pthread_mutex_unlock(&mi->data_lock); |
|
1360 |
||
1361 |
if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length())) |
|
1362 |
DBUG_RETURN(TRUE); |
|
1363 |
}
|
|
1364 |
my_eof(thd); |
|
1365 |
DBUG_RETURN(FALSE); |
|
1366 |
}
|
|
1367 |
||
1368 |
||
1369 |
void set_slave_thread_options(THD* thd) |
|
1370 |
{
|
|
1371 |
DBUG_ENTER("set_slave_thread_options"); |
|
1372 |
/*
|
|
1373 |
It's nonsense to constrain the slave threads with max_join_size; if a
|
|
1374 |
query succeeded on master, we HAVE to execute it. So set
|
|
1375 |
OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
|
|
1376 |
(and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
|
|
1377 |
SELECT examining more than 4 billion rows would still fail (yes, because
|
|
1378 |
when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
|
|
1379 |
only for client threads.
|
|
1380 |
*/
|
|
1381 |
ulonglong options= thd->options | OPTION_BIG_SELECTS; |
|
1382 |
if (opt_log_slave_updates) |
|
1383 |
options|= OPTION_BIN_LOG; |
|
1384 |
else
|
|
1385 |
options&= ~OPTION_BIN_LOG; |
|
1386 |
thd->options= options; |
|
1387 |
thd->variables.completion_type= 0; |
|
1388 |
DBUG_VOID_RETURN; |
|
1389 |
}
|
|
1390 |
||
1391 |
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli) |
|
1392 |
{
|
|
1393 |
DBUG_ENTER("set_slave_thread_default_charset"); |
|
1394 |
||
1395 |
thd->variables.character_set_client= |
|
1396 |
global_system_variables.character_set_client; |
|
1397 |
thd->variables.collation_connection= |
|
1398 |
global_system_variables.collation_connection; |
|
1399 |
thd->variables.collation_server= |
|
1400 |
global_system_variables.collation_server; |
|
1401 |
thd->update_charset(); |
|
1402 |
||
1403 |
/*
|
|
1404 |
We use a const cast here since the conceptual (and externally
|
|
1405 |
visible) behavior of the function is to set the default charset of
|
|
1406 |
the thread. That the cache has to be invalidated is a secondary
|
|
1407 |
effect.
|
|
1408 |
*/
|
|
1409 |
const_cast<Relay_log_info*>(rli)->cached_charset_invalidate(); |
|
1410 |
DBUG_VOID_RETURN; |
|
1411 |
}
|
|
1412 |
||
1413 |
/*
|
|
1414 |
init_slave_thread()
|
|
1415 |
*/
|
|
1416 |
||
1417 |
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type) |
|
1418 |
{
|
|
1419 |
DBUG_ENTER("init_slave_thread"); |
|
1420 |
#if !defined(DBUG_OFF)
|
|
1421 |
int simulate_error= 0; |
|
1422 |
#endif
|
|
1423 |
thd->system_thread = (thd_type == SLAVE_THD_SQL) ? |
|
1424 |
SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO; |
|
1425 |
thd->security_ctx->skip_grants(); |
|
1426 |
my_net_init(&thd->net, 0); |
|
1427 |
/*
|
|
1428 |
Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
|
|
1429 |
slave threads, since a replication event can become this much larger
|
|
1430 |
than the corresponding packet (query) sent from client to master.
|
|
1431 |
*/
|
|
1432 |
thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet |
|
1433 |
+ MAX_LOG_EVENT_HEADER; /* note, incr over the global not session var */ |
|
1434 |
thd->slave_thread = 1; |
|
1435 |
thd->enable_slow_log= opt_log_slow_slave_statements; |
|
1436 |
set_slave_thread_options(thd); |
|
1437 |
thd->client_capabilities = CLIENT_LOCAL_FILES; |
|
1438 |
pthread_mutex_lock(&LOCK_thread_count); |
|
1439 |
thd->thread_id= thd->variables.pseudo_thread_id= thread_id++; |
|
1440 |
pthread_mutex_unlock(&LOCK_thread_count); |
|
1441 |
||
1442 |
DBUG_EXECUTE_IF("simulate_io_slave_error_on_init", |
|
1443 |
simulate_error|= (1 << SLAVE_THD_IO);); |
|
1444 |
DBUG_EXECUTE_IF("simulate_sql_slave_error_on_init", |
|
1445 |
simulate_error|= (1 << SLAVE_THD_SQL);); |
|
1446 |
#if !defined(DBUG_OFF)
|
|
1447 |
if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type)) |
|
1448 |
#else
|
|
1449 |
if (init_thr_lock() || thd->store_globals()) |
|
1450 |
#endif
|
|
1451 |
{
|
|
1452 |
thd->cleanup(); |
|
1453 |
DBUG_RETURN(-1); |
|
1454 |
}
|
|
1455 |
lex_start(thd); |
|
1456 |
||
1457 |
if (thd_type == SLAVE_THD_SQL) |
|
1458 |
thd_proc_info(thd, "Waiting for the next event in relay log"); |
|
1459 |
else
|
|
1460 |
thd_proc_info(thd, "Waiting for master update"); |
|
1461 |
thd->version=refresh_version; |
|
1462 |
thd->set_time(); |
|
1463 |
DBUG_RETURN(0); |
|
1464 |
}
|
|
1465 |
||
1466 |
||
1467 |
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed, |
|
1468 |
void* thread_killed_arg) |
|
1469 |
{
|
|
1470 |
int nap_time; |
|
1471 |
thr_alarm_t alarmed; |
|
1472 |
DBUG_ENTER("safe_sleep"); |
|
1473 |
||
1474 |
thr_alarm_init(&alarmed); |
|
1475 |
time_t start_time= my_time(0); |
|
1476 |
time_t end_time= start_time+sec; |
|
1477 |
||
1478 |
while ((nap_time= (int) (end_time - start_time)) > 0) |
|
1479 |
{
|
|
1480 |
ALARM alarm_buff; |
|
1481 |
/*
|
|
1482 |
The only reason we are asking for alarm is so that
|
|
1483 |
we will be woken up in case of murder, so if we do not get killed,
|
|
1484 |
set the alarm so it goes off after we wake up naturally
|
|
1485 |
*/
|
|
1486 |
thr_alarm(&alarmed, 2 * nap_time, &alarm_buff); |
|
1487 |
sleep(nap_time); |
|
1488 |
thr_end_alarm(&alarmed); |
|
1489 |
||
1490 |
if ((*thread_killed)(thd,thread_killed_arg)) |
|
1491 |
DBUG_RETURN(1); |
|
1492 |
start_time= my_time(0); |
|
1493 |
}
|
|
1494 |
DBUG_RETURN(0); |
|
1495 |
}
|
|
1496 |
||
1497 |
||
1498 |
static int request_dump(MYSQL* mysql, Master_info* mi, |
|
1499 |
bool *suppress_warnings) |
|
1500 |
{
|
|
1501 |
uchar buf[FN_REFLEN + 10]; |
|
1502 |
int len; |
|
1503 |
int binlog_flags = 0; // for now |
|
1504 |
char* logname = mi->master_log_name; |
|
1505 |
DBUG_ENTER("request_dump"); |
|
1506 |
||
1507 |
*suppress_warnings= FALSE; |
|
1508 |
||
1509 |
// TODO if big log files: Change next to int8store()
|
|
1510 |
int4store(buf, (ulong) mi->master_log_pos); |
|
1511 |
int2store(buf + 4, binlog_flags); |
|
1512 |
int4store(buf + 6, server_id); |
|
1513 |
len = (uint) strlen(logname); |
|
1514 |
memcpy(buf + 10, logname,len); |
|
1515 |
if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1)) |
|
1516 |
{
|
|
1517 |
/*
|
|
1518 |
Something went wrong, so we will just reconnect and retry later
|
|
1519 |
in the future, we should do a better error analysis, but for
|
|
1520 |
now we just fill up the error log :-)
|
|
1521 |
*/
|
|
1522 |
if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED) |
|
1523 |
*suppress_warnings= TRUE; // Suppress reconnect warning |
|
1524 |
else
|
|
1525 |
sql_print_error("Error on COM_BINLOG_DUMP: %d %s, will retry in %d secs", |
|
1526 |
mysql_errno(mysql), mysql_error(mysql), |
|
1527 |
mi->connect_retry); |
|
1528 |
DBUG_RETURN(1); |
|
1529 |
}
|
|
1530 |
||
1531 |
DBUG_RETURN(0); |
|
1532 |
}
|
|
1533 |
||
1534 |
/*
|
|
1535 |
Read one event from the master
|
|
1536 |
||
1537 |
SYNOPSIS
|
|
1538 |
read_event()
|
|
1539 |
mysql MySQL connection
|
|
1540 |
mi Master connection information
|
|
1541 |
suppress_warnings TRUE when a normal net read timeout has caused us to
|
|
1542 |
try a reconnect. We do not want to print anything to
|
|
1543 |
the error log in this case because this a anormal
|
|
1544 |
event in an idle server.
|
|
1545 |
||
1546 |
RETURN VALUES
|
|
1547 |
'packet_error' Error
|
|
1548 |
number Length of packet
|
|
1549 |
*/
|
|
1550 |
||
1551 |
static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings) |
|
1552 |
{
|
|
1553 |
ulong len; |
|
1554 |
DBUG_ENTER("read_event"); |
|
1555 |
||
1556 |
*suppress_warnings= FALSE; |
|
1557 |
/*
|
|
1558 |
my_real_read() will time us out
|
|
1559 |
We check if we were told to die, and if not, try reading again
|
|
1560 |
*/
|
|
1561 |
#ifndef DBUG_OFF
|
|
1562 |
if (disconnect_slave_event_count && !(mi->events_till_disconnect--)) |
|
1563 |
DBUG_RETURN(packet_error); |
|
1564 |
#endif
|
|
1565 |
||
1566 |
len = cli_safe_read(mysql); |
|
1567 |
if (len == packet_error || (long) len < 1) |
|
1568 |
{
|
|
1569 |
if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED) |
|
1570 |
{
|
|
1571 |
/*
|
|
1572 |
We are trying a normal reconnect after a read timeout;
|
|
1573 |
we suppress prints to .err file as long as the reconnect
|
|
1574 |
happens without problems
|
|
1575 |
*/
|
|
1576 |
*suppress_warnings= TRUE; |
|
1577 |
}
|
|
1578 |
else
|
|
1579 |
sql_print_error("Error reading packet from server: %s ( server_errno=%d)", |
|
1580 |
mysql_error(mysql), mysql_errno(mysql)); |
|
1581 |
DBUG_RETURN(packet_error); |
|
1582 |
}
|
|
1583 |
||
1584 |
/* Check if eof packet */
|
|
1585 |
if (len < 8 && mysql->net.read_pos[0] == 254) |
|
1586 |
{
|
|
1587 |
sql_print_information("Slave: received end packet from server, apparent " |
|
1588 |
"master shutdown: %s", |
|
1589 |
mysql_error(mysql)); |
|
1590 |
DBUG_RETURN(packet_error); |
|
1591 |
}
|
|
1592 |
||
1593 |
DBUG_PRINT("exit", ("len: %lu net->read_pos[4]: %d", |
|
1594 |
len, mysql->net.read_pos[4])); |
|
1595 |
DBUG_RETURN(len - 1); |
|
1596 |
}
|
|
1597 |
||
1598 |
||
1599 |
int check_expected_error(THD* thd, Relay_log_info const *rli, |
|
1600 |
int expected_error) |
|
1601 |
{
|
|
1602 |
DBUG_ENTER("check_expected_error"); |
|
1603 |
||
1604 |
switch (expected_error) { |
|
1605 |
case ER_NET_READ_ERROR: |
|
1606 |
case ER_NET_ERROR_ON_WRITE: |
|
1607 |
case ER_QUERY_INTERRUPTED: |
|
1608 |
case ER_SERVER_SHUTDOWN: |
|
1609 |
case ER_NEW_ABORTING_CONNECTION: |
|
1610 |
DBUG_RETURN(1); |
|
1611 |
default: |
|
1612 |
DBUG_RETURN(0); |
|
1613 |
}
|
|
1614 |
}
|
|
1615 |
||
1616 |
||
1617 |
/*
|
|
1618 |
Check if the current error is of temporary nature of not.
|
|
1619 |
Some errors are temporary in nature, such as
|
|
1620 |
ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. Ndb also signals
|
|
1621 |
that the error is temporary by pushing a warning with the error code
|
|
1622 |
ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
|
|
1623 |
*/
|
|
1624 |
static int has_temporary_error(THD *thd) |
|
1625 |
{
|
|
1626 |
DBUG_ENTER("has_temporary_error"); |
|
1627 |
||
1628 |
if (thd->is_fatal_error) |
|
1629 |
DBUG_RETURN(0); |
|
1630 |
||
1631 |
DBUG_EXECUTE_IF("all_errors_are_temporary_errors", |
|
1632 |
if (thd->main_da.is_error()) |
|
1633 |
{
|
|
1634 |
thd->clear_error(); |
|
1635 |
my_error(ER_LOCK_DEADLOCK, MYF(0)); |
|
1636 |
});
|
|
1637 |
||
1638 |
/*
|
|
1639 |
If there is no message in THD, we can't say if it's a temporary
|
|
1640 |
error or not. This is currently the case for Incident_log_event,
|
|
1641 |
which sets no message. Return FALSE.
|
|
1642 |
*/
|
|
1643 |
if (!thd->is_error()) |
|
1644 |
DBUG_RETURN(0); |
|
1645 |
||
1646 |
/*
|
|
1647 |
Temporary error codes:
|
|
1648 |
currently, InnoDB deadlock detected by InnoDB or lock
|
|
1649 |
wait timeout (innodb_lock_wait_timeout exceeded
|
|
1650 |
*/
|
|
1651 |
if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK || |
|
1652 |
thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT) |
|
1653 |
DBUG_RETURN(1); |
|
1654 |
||
1655 |
DBUG_RETURN(0); |
|
1656 |
}
|
|
1657 |
||
1658 |
||
1659 |
/**
|
|
1660 |
Applies the given event and advances the relay log position.
|
|
1661 |
||
1662 |
In essence, this function does:
|
|
1663 |
||
1664 |
@code
|
|
1665 |
ev->apply_event(rli);
|
|
1666 |
ev->update_pos(rli);
|
|
1667 |
@endcode
|
|
1668 |
||
1669 |
But it also does some maintainance, such as skipping events if
|
|
1670 |
needed and reporting errors.
|
|
1671 |
||
1672 |
If the @c skip flag is set, then it is tested whether the event
|
|
1673 |
should be skipped, by looking at the slave_skip_counter and the
|
|
1674 |
server id. The skip flag should be set when calling this from a
|
|
1675 |
replication thread but not set when executing an explicit BINLOG
|
|
1676 |
statement.
|
|
1677 |
||
1678 |
@retval 0 OK.
|
|
1679 |
||
1680 |
@retval 1 Error calling ev->apply_event().
|
|
1681 |
||
1682 |
@retval 2 No error calling ev->apply_event(), but error calling
|
|
1683 |
ev->update_pos().
|
|
1684 |
*/
|
|
1685 |
int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli, |
|
1686 |
bool skip) |
|
1687 |
{
|
|
1688 |
int exec_res= 0; |
|
1689 |
||
1690 |
DBUG_ENTER("apply_event_and_update_pos"); |
|
1691 |
||
1692 |
DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)", |
|
1693 |
ev->get_type_str(), ev->get_type_code(), |
|
1694 |
ev->server_id)); |
|
1695 |
DBUG_PRINT("info", ("thd->options: %s%s; rli->last_event_start_time: %lu", |
|
1696 |
FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), |
|
1697 |
FLAGSTR(thd->options, OPTION_BEGIN), |
|
1698 |
rli->last_event_start_time)); |
|
1699 |
||
1700 |
/*
|
|
1701 |
Execute the event to change the database and update the binary
|
|
1702 |
log coordinates, but first we set some data that is needed for
|
|
1703 |
the thread.
|
|
1704 |
||
1705 |
The event will be executed unless it is supposed to be skipped.
|
|
1706 |
||
1707 |
Queries originating from this server must be skipped. Low-level
|
|
1708 |
events (Format_description_log_event, Rotate_log_event,
|
|
1709 |
Stop_log_event) from this server must also be skipped. But for
|
|
1710 |
those we don't want to modify 'group_master_log_pos', because
|
|
1711 |
these events did not exist on the master.
|
|
1712 |
Format_description_log_event is not completely skipped.
|
|
1713 |
||
1714 |
Skip queries specified by the user in 'slave_skip_counter'. We
|
|
1715 |
can't however skip events that has something to do with the log
|
|
1716 |
files themselves.
|
|
1717 |
||
1718 |
Filtering on own server id is extremely important, to ignore
|
|
1719 |
execution of events created by the creation/rotation of the relay
|
|
1720 |
log (remember that now the relay log starts with its Format_desc,
|
|
1721 |
has a Rotate etc).
|
|
1722 |
*/
|
|
1723 |
||
1724 |
thd->server_id = ev->server_id; // use the original server id for logging |
|
1725 |
thd->set_time(); // time the query |
|
1726 |
thd->lex->current_select= 0; |
|
1727 |
if (!ev->when) |
|
1728 |
ev->when= my_time(0); |
|
1729 |
ev->thd = thd; // because up to this point, ev->thd == 0 |
|
1730 |
||
1731 |
if (skip) |
|
1732 |
{
|
|
1733 |
int reason= ev->shall_skip(rli); |
|
1734 |
if (reason == Log_event::EVENT_SKIP_COUNT) |
|
1735 |
--rli->slave_skip_counter; |
|
1736 |
pthread_mutex_unlock(&rli->data_lock); |
|
1737 |
if (reason == Log_event::EVENT_SKIP_NOT) |
|
1738 |
exec_res= ev->apply_event(rli); |
|
1739 |
#ifndef DBUG_OFF
|
|
1740 |
/*
|
|
1741 |
This only prints information to the debug trace.
|
|
1742 |
||
1743 |
TODO: Print an informational message to the error log?
|
|
1744 |
*/
|
|
1745 |
static const char *const explain[] = { |
|
1746 |
// EVENT_SKIP_NOT,
|
|
1747 |
"not skipped", |
|
1748 |
// EVENT_SKIP_IGNORE,
|
|
1749 |
"skipped because event should be ignored", |
|
1750 |
// EVENT_SKIP_COUNT
|
|
1751 |
"skipped because event skip counter was non-zero"
|
|
1752 |
};
|
|
1753 |
DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d", |
|
1754 |
thd->options & OPTION_BEGIN ? 1 : 0, |
|
1755 |
rli->get_flag(Relay_log_info::IN_STMT))); |
|
1756 |
DBUG_PRINT("skip_event", ("%s event was %s", |
|
1757 |
ev->get_type_str(), explain[reason])); |
|
1758 |
#endif
|
|
1759 |
}
|
|
1760 |
else
|
|
1761 |
exec_res= ev->apply_event(rli); |
|
1762 |
||
1763 |
DBUG_PRINT("info", ("apply_event error = %d", exec_res)); |
|
1764 |
if (exec_res == 0) |
|
1765 |
{
|
|
1766 |
int error= ev->update_pos(rli); |
|
1767 |
#ifdef HAVE_purify
|
|
1768 |
if (!rli->is_fake) |
|
1769 |
#endif
|
|
1770 |
{
|
|
1771 |
#ifndef DBUG_OFF
|
|
1772 |
char buf[22]; |
|
1773 |
#endif
|
|
1774 |
DBUG_PRINT("info", ("update_pos error = %d", error)); |
|
1775 |
DBUG_PRINT("info", ("group %s %s", |
|
1776 |
llstr(rli->group_relay_log_pos, buf), |
|
1777 |
rli->group_relay_log_name)); |
|
1778 |
DBUG_PRINT("info", ("event %s %s", |
|
1779 |
llstr(rli->event_relay_log_pos, buf), |
|
1780 |
rli->event_relay_log_name)); |
|
1781 |
}
|
|
1782 |
/*
|
|
1783 |
The update should not fail, so print an error message and
|
|
1784 |
return an error code.
|
|
1785 |
||
1786 |
TODO: Replace this with a decent error message when merged
|
|
1787 |
with BUG#24954 (which adds several new error message).
|
|
1788 |
*/
|
|
1789 |
if (error) |
|
1790 |
{
|
|
1791 |
char buf[22]; |
|
1792 |
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, |
|
1793 |
"It was not possible to update the positions"
|
|
1794 |
" of the relay log information: the slave may"
|
|
1795 |
" be in an inconsistent state."
|
|
1796 |
" Stopped in %s position %s", |
|
1797 |
rli->group_relay_log_name, |
|
1798 |
llstr(rli->group_relay_log_pos, buf)); |
|
1799 |
DBUG_RETURN(2); |
|
1800 |
}
|
|
1801 |
}
|
|
1802 |
||
1803 |
DBUG_RETURN(exec_res ? 1 : 0); |
|
1804 |
}
|
|
1805 |
||
1806 |
||
1807 |
/**
|
|
1808 |
Top-level function for executing the next event from the relay log.
|
|
1809 |
||
1810 |
This function reads the event from the relay log, executes it, and
|
|
1811 |
advances the relay log position. It also handles errors, etc.
|
|
1812 |
||
1813 |
This function may fail to apply the event for the following reasons:
|
|
1814 |
||
1815 |
- The position specfied by the UNTIL condition of the START SLAVE
|
|
1816 |
command is reached.
|
|
1817 |
||
1818 |
- It was not possible to read the event from the log.
|
|
1819 |
||
1820 |
- The slave is killed.
|
|
1821 |
||
1822 |
- An error occurred when applying the event, and the event has been
|
|
1823 |
tried slave_trans_retries times. If the event has been retried
|
|
1824 |
fewer times, 0 is returned.
|
|
1825 |
||
1826 |
- init_master_info or init_relay_log_pos failed. (These are called
|
|
1827 |
if a failure occurs when applying the event.)</li>
|
|
1828 |
||
1829 |
- An error occurred when updating the binlog position.
|
|
1830 |
||
1831 |
@retval 0 The event was applied.
|
|
1832 |
||
1833 |
@retval 1 The event was not applied.
|
|
1834 |
*/
|
|
1835 |
static int exec_relay_log_event(THD* thd, Relay_log_info* rli) |
|
1836 |
{
|
|
1837 |
DBUG_ENTER("exec_relay_log_event"); |
|
1838 |
||
1839 |
/*
|
|
1840 |
We acquire this mutex since we need it for all operations except
|
|
1841 |
event execution. But we will release it in places where we will
|
|
1842 |
wait for something for example inside of next_event().
|
|
1843 |
*/
|
|
1844 |
pthread_mutex_lock(&rli->data_lock); |
|
1845 |
||
1846 |
Log_event * ev = next_event(rli); |
|
1847 |
||
1848 |
DBUG_ASSERT(rli->sql_thd==thd); |
|
1849 |
||
1850 |
if (sql_slave_killed(thd,rli)) |
|
1851 |
{
|
|
1852 |
pthread_mutex_unlock(&rli->data_lock); |
|
1853 |
delete ev; |
|
1854 |
DBUG_RETURN(1); |
|
1855 |
}
|
|
1856 |
if (ev) |
|
1857 |
{
|
|
1858 |
int exec_res; |
|
1859 |
||
1860 |
/*
|
|
1861 |
This tests if the position of the beginning of the current event
|
|
1862 |
hits the UNTIL barrier.
|
|
1863 |
*/
|
|
1864 |
if (rli->until_condition != Relay_log_info::UNTIL_NONE && |
|
1865 |
rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ? |
|
1866 |
rli->group_master_log_pos : |
|
1867 |
ev->log_pos - ev->data_written)) |
|
1868 |
{
|
|
1869 |
char buf[22]; |
|
1870 |
sql_print_information("Slave SQL thread stopped because it reached its" |
|
1871 |
" UNTIL position %s", llstr(rli->until_pos(), buf)); |
|
1872 |
/*
|
|
1873 |
Setting abort_slave flag because we do not want additional message about
|
|
1874 |
error in query execution to be printed.
|
|
1875 |
*/
|
|
1876 |
rli->abort_slave= 1; |
|
1877 |
pthread_mutex_unlock(&rli->data_lock); |
|
1878 |
delete ev; |
|
1879 |
DBUG_RETURN(1); |
|
1880 |
}
|
|
1881 |
exec_res= apply_event_and_update_pos(ev, thd, rli, TRUE); |
|
1882 |
||
1883 |
/*
|
|
1884 |
Format_description_log_event should not be deleted because it will be
|
|
1885 |
used to read info about the relay log's format; it will be deleted when
|
|
1886 |
the SQL thread does not need it, i.e. when this thread terminates.
|
|
1887 |
*/
|
|
1888 |
if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) |
|
1889 |
{
|
|
1890 |
DBUG_PRINT("info", ("Deleting the event after it has been executed")); |
|
1891 |
delete ev; |
|
1892 |
}
|
|
1893 |
||
1894 |
/*
|
|
1895 |
update_log_pos failed: this should not happen, so we don't
|
|
1896 |
retry.
|
|
1897 |
*/
|
|
1898 |
if (exec_res == 2) |
|
1899 |
DBUG_RETURN(1); |
|
1900 |
||
1901 |
if (slave_trans_retries) |
|
1902 |
{
|
|
1903 |
int temp_err= 0; |
|
1904 |
if (exec_res && (temp_err= has_temporary_error(thd))) |
|
1905 |
{
|
|
1906 |
const char *errmsg; |
|
1907 |
/*
|
|
1908 |
We were in a transaction which has been rolled back because of a
|
|
1909 |
temporary error;
|
|
1910 |
let's seek back to BEGIN log event and retry it all again.
|
|
1911 |
Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
|
|
1912 |
there is no rollback since 5.0.13 (ref: manual).
|
|
1913 |
We have to not only seek but also
|
|
1914 |
a) init_master_info(), to seek back to hot relay log's start for later
|
|
1915 |
(for when we will come back to this hot log after re-processing the
|
|
1916 |
possibly existing old logs where BEGIN is: check_binlog_magic() will
|
|
1917 |
then need the cache to be at position 0 (see comments at beginning of
|
|
1918 |
init_master_info()).
|
|
1919 |
b) init_relay_log_pos(), because the BEGIN may be an older relay log.
|
|
1920 |
*/
|
|
1921 |
if (rli->trans_retries < slave_trans_retries) |
|
1922 |
{
|
|
1923 |
if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL)) |
|
1924 |
sql_print_error("Failed to initialize the master info structure"); |
|
1925 |
else if (init_relay_log_pos(rli, |
|
1926 |
rli->group_relay_log_name, |
|
1927 |
rli->group_relay_log_pos, |
|
1928 |
1, &errmsg, 1)) |
|
1929 |
sql_print_error("Error initializing relay log position: %s", |
|
1930 |
errmsg); |
|
1931 |
else
|
|
1932 |
{
|
|
1933 |
exec_res= 0; |
|
1934 |
end_trans(thd, ROLLBACK); |
|
1935 |
/* chance for concurrent connection to get more locks */
|
|
1936 |
safe_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE), |
|
1937 |
(CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli); |
|
1938 |
pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS |
|
1939 |
rli->trans_retries++; |
|
1940 |
rli->retried_trans++; |
|
1941 |
pthread_mutex_unlock(&rli->data_lock); |
|
1942 |
DBUG_PRINT("info", ("Slave retries transaction " |
|
1943 |
"rli->trans_retries: %lu", rli->trans_retries)); |
|
1944 |
}
|
|
1945 |
}
|
|
1946 |
else
|
|
1947 |
sql_print_error("Slave SQL thread retried transaction %lu time(s) " |
|
1948 |
"in vain, giving up. Consider raising the value of "
|
|
1949 |
"the slave_transaction_retries variable.", |
|
1950 |
slave_trans_retries); |
|
1951 |
}
|
|
1952 |
else if (exec_res && !temp_err || |
|
1953 |
(opt_using_transactions && |
|
1954 |
rli->group_relay_log_pos == rli->event_relay_log_pos)) |
|
1955 |
{
|
|
1956 |
/*
|
|
1957 |
Only reset the retry counter if the entire group succeeded
|
|
1958 |
or failed with a non-transient error. On a successful
|
|
1959 |
event, the execution will proceed as usual; in the case of a
|
|
1960 |
non-transient error, the slave will stop with an error.
|
|
1961 |
*/
|
|
1962 |
rli->trans_retries= 0; // restart from fresh |
|
1963 |
DBUG_PRINT("info", ("Resetting retry counter, rli->trans_retries: %lu", |
|
1964 |
rli->trans_retries)); |
|
1965 |
}
|
|
1966 |
}
|
|
1967 |
DBUG_RETURN(exec_res); |
|
1968 |
}
|
|
1969 |
pthread_mutex_unlock(&rli->data_lock); |
|
1970 |
rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE, |
|
1971 |
ER(ER_SLAVE_RELAY_LOG_READ_FAILURE), "\ |
|
1972 |
Could not parse relay log event entry. The possible reasons are: the master's \
|
|
1973 |
binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
|
|
1974 |
binary log), the slave's relay log is corrupted (you can check this by running \
|
|
1975 |
'mysqlbinlog' on the relay log), a network problem, or a bug in the master's \
|
|
1976 |
or slave's MySQL code. If you want to check the master's binary log or slave's \
|
|
1977 |
relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' \
|
|
1978 |
on this slave.\
|
|
1979 |
"); |
|
1980 |
DBUG_RETURN(1); |
|
1981 |
}
|
|
1982 |
||
1983 |
||
1984 |
/**
|
|
1985 |
@brief Try to reconnect slave IO thread.
|
|
1986 |
||
1987 |
@details Terminates current connection to master, sleeps for
|
|
1988 |
@c mi->connect_retry msecs and initiates new connection with
|
|
1989 |
@c safe_reconnect(). Variable pointed by @c retry_count is increased -
|
|
1990 |
if it exceeds @c master_retry_count then connection is not re-established
|
|
1991 |
and function signals error.
|
|
1992 |
Unless @c suppres_warnings is TRUE, a warning is put in the server error log
|
|
1993 |
when reconnecting. The warning message and messages used to report errors
|
|
1994 |
are taken from @c messages array. In case @c master_retry_count is exceeded,
|
|
1995 |
no messages are added to the log.
|
|
1996 |
||
1997 |
@param[in] thd Thread context.
|
|
1998 |
@param[in] mysql MySQL connection.
|
|
1999 |
@param[in] mi Master connection information.
|
|
2000 |
@param[in,out] retry_count Number of attempts to reconnect.
|
|
2001 |
@param[in] suppress_warnings TRUE when a normal net read timeout
|
|
2002 |
has caused to reconnecting.
|
|
2003 |
@param[in] messages Messages to print/log, see
|
|
2004 |
reconnect_messages[] array.
|
|
2005 |
||
2006 |
@retval 0 OK.
|
|
2007 |
@retval 1 There was an error.
|
|
2008 |
*/
|
|
2009 |
||
2010 |
static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi, |
|
2011 |
uint *retry_count, bool suppress_warnings, |
|
2012 |
const char *messages[SLAVE_RECON_MSG_MAX]) |
|
2013 |
{
|
|
2014 |
mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT; |
|
2015 |
thd->proc_info= messages[SLAVE_RECON_MSG_WAIT]; |
|
2016 |
#ifdef SIGNAL_WITH_VIO_CLOSE
|
|
2017 |
thd->clear_active_vio(); |
|
2018 |
#endif
|
|
2019 |
end_server(mysql); |
|
2020 |
if ((*retry_count)++) |
|
2021 |
{
|
|
2022 |
if (*retry_count > master_retry_count) |
|
2023 |
return 1; // Don't retry forever |
|
2024 |
safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed, |
|
2025 |
(void *) mi); |
|
2026 |
}
|
|
2027 |
if (check_io_slave_killed(thd, mi, messages[SLAVE_RECON_MSG_KILLED_WAITING])) |
|
2028 |
return 1; |
|
2029 |
thd->proc_info = messages[SLAVE_RECON_MSG_AFTER]; |
|
2030 |
if (!suppress_warnings) |
|
2031 |
{
|
|
2032 |
char buf[256], llbuff[22]; |
|
2033 |
my_snprintf(buf, sizeof(buf), messages[SLAVE_RECON_MSG_FAILED], |
|
2034 |
IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff)); |
|
2035 |
/*
|
|
2036 |
Raise a warining during registering on master/requesting dump.
|
|
2037 |
Log a message reading event.
|
|
2038 |
*/
|
|
2039 |
if (messages[SLAVE_RECON_MSG_COMMAND][0]) |
|
2040 |
{
|
|
2041 |
mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE, |
|
2042 |
ER(ER_SLAVE_MASTER_COM_FAILURE), |
|
2043 |
messages[SLAVE_RECON_MSG_COMMAND], buf); |
|
2044 |
}
|
|
2045 |
else
|
|
2046 |
{
|
|
2047 |
sql_print_information(buf); |
|
2048 |
}
|
|
2049 |
}
|
|
2050 |
if (safe_reconnect(thd, mysql, mi, 1) || io_slave_killed(thd, mi)) |
|
2051 |
{
|
|
2052 |
if (global_system_variables.log_warnings) |
|
2053 |
sql_print_information(messages[SLAVE_RECON_MSG_KILLED_AFTER]); |
|
2054 |
return 1; |
|
2055 |
}
|
|
2056 |
return 0; |
|
2057 |
}
|
|
2058 |
||
2059 |
||
2060 |
/* Slave I/O Thread entry point */
|
|
2061 |
||
2062 |
pthread_handler_t handle_slave_io(void *arg) |
|
2063 |
{
|
|
2064 |
THD *thd; // needs to be first for thread_stack |
|
2065 |
MYSQL *mysql; |
|
2066 |
Master_info *mi = (Master_info*)arg; |
|
2067 |
Relay_log_info *rli= &mi->rli; |
|
2068 |
char llbuff[22]; |
|
2069 |
uint retry_count; |
|
2070 |
bool suppress_warnings; |
|
2071 |
#ifndef DBUG_OFF
|
|
2072 |
uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0; |
|
2073 |
#endif
|
|
2074 |
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
|
|
2075 |
my_thread_init(); |
|
2076 |
DBUG_ENTER("handle_slave_io"); |
|
2077 |
||
2078 |
DBUG_ASSERT(mi->inited); |
|
2079 |
mysql= NULL ; |
|
2080 |
retry_count= 0; |
|
2081 |
||
2082 |
pthread_mutex_lock(&mi->run_lock); |
|
2083 |
/* Inform waiting threads that slave has started */
|
|
2084 |
mi->slave_run_id++; |
|
2085 |
||
2086 |
#ifndef DBUG_OFF
|
|
2087 |
mi->events_till_disconnect = disconnect_slave_event_count; |
|
2088 |
#endif
|
|
2089 |
||
2090 |
thd= new THD; // note that contructor of THD uses DBUG_ ! |
|
2091 |
THD_CHECK_SENTRY(thd); |
|
2092 |
mi->io_thd = thd; |
|
2093 |
||
2094 |
pthread_detach_this_thread(); |
|
2095 |
thd->thread_stack= (char*) &thd; // remember where our stack is |
|
2096 |
if (init_slave_thread(thd, SLAVE_THD_IO)) |
|
2097 |
{
|
|
2098 |
pthread_cond_broadcast(&mi->start_cond); |
|
2099 |
pthread_mutex_unlock(&mi->run_lock); |
|
2100 |
sql_print_error("Failed during slave I/O thread initialization"); |
|
2101 |
goto err; |
|
2102 |
}
|
|
2103 |
pthread_mutex_lock(&LOCK_thread_count); |
|
2104 |
threads.append(thd); |
|
2105 |
pthread_mutex_unlock(&LOCK_thread_count); |
|
2106 |
mi->slave_running = 1; |
|
2107 |
mi->abort_slave = 0; |
|
2108 |
pthread_mutex_unlock(&mi->run_lock); |
|
2109 |
pthread_cond_broadcast(&mi->start_cond); |
|
2110 |
||
2111 |
DBUG_PRINT("master_info",("log_file_name: '%s' position: %s", |
|
2112 |
mi->master_log_name, |
|
2113 |
llstr(mi->master_log_pos,llbuff))); |
|
2114 |
||
2115 |
if (!(mi->mysql = mysql = mysql_init(NULL))) |
|
2116 |
{
|
|
2117 |
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, |
|
2118 |
ER(ER_SLAVE_FATAL_ERROR), "error in mysql_init()"); |
|
2119 |
goto err; |
|
2120 |
}
|
|
2121 |
||
2122 |
thd_proc_info(thd, "Connecting to master"); |
|
2123 |
// we can get killed during safe_connect
|
|
2124 |
if (!safe_connect(thd, mysql, mi)) |
|
2125 |
{
|
|
2126 |
sql_print_information("Slave I/O thread: connected to master '%s@%s:%d'," |
|
2127 |
"replication started in log '%s' at position %s", |
|
2128 |
mi->user, mi->host, mi->port, |
|
2129 |
IO_RPL_LOG_NAME, |
|
2130 |
llstr(mi->master_log_pos,llbuff)); |
|
2131 |
/*
|
|
2132 |
Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
|
|
2133 |
thread, since a replication event can become this much larger than
|
|
2134 |
the corresponding packet (query) sent from client to master.
|
|
2135 |
*/
|
|
2136 |
mysql->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER; |
|
2137 |
}
|
|
2138 |
else
|
|
2139 |
{
|
|
2140 |
sql_print_information("Slave I/O thread killed while connecting to master"); |
|
2141 |
goto err; |
|
2142 |
}
|
|
2143 |
||
2144 |
connected: |
|
2145 |
||
2146 |
// TODO: the assignment below should be under mutex (5.0)
|
|
2147 |
mi->slave_running= MYSQL_SLAVE_RUN_CONNECT; |
|
2148 |
thd->slave_net = &mysql->net; |
|
2149 |
thd_proc_info(thd, "Checking master version"); |
|
2150 |
if (get_master_version_and_clock(mysql, mi)) |
|
2151 |
goto err; |
|
2152 |
||
2153 |
if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1) |
|
2154 |
{
|
|
2155 |
/*
|
|
2156 |
Register ourselves with the master.
|
|
2157 |
*/
|
|
2158 |
thd_proc_info(thd, "Registering slave on master"); |
|
2159 |
if (register_slave_on_master(mysql, mi, &suppress_warnings)) |
|
2160 |
{
|
|
2161 |
if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed " |
|
2162 |
"while registering slave on master")) |
|
2163 |
{
|
|
2164 |
sql_print_error("Slave I/O thread couldn't register on master"); |
|
2165 |
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, |
|
2166 |
reconnect_messages[SLAVE_RECON_ACT_REG])) |
|
2167 |
goto err; |
|
2168 |
}
|
|
2169 |
else
|
|
2170 |
goto err; |
|
2171 |
goto connected; |
|
2172 |
}
|
|
2173 |
DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_REG", |
|
2174 |
if (!retry_count_reg) |
|
2175 |
{
|
|
2176 |
retry_count_reg++; |
|
2177 |
sql_print_information("Forcing to reconnect slave I/O thread"); |
|
2178 |
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, |
|
2179 |
reconnect_messages[SLAVE_RECON_ACT_REG])) |
|
2180 |
goto err; |
|
2181 |
goto connected; |
|
2182 |
});
|
|
2183 |
}
|
|
2184 |
||
2185 |
DBUG_PRINT("info",("Starting reading binary log from master")); |
|
2186 |
while (!io_slave_killed(thd,mi)) |
|
2187 |
{
|
|
2188 |
thd_proc_info(thd, "Requesting binlog dump"); |
|
2189 |
if (request_dump(mysql, mi, &suppress_warnings)) |
|
2190 |
{
|
|
2191 |
sql_print_error("Failed on request_dump()"); |
|
2192 |
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \ |
|
2193 |
requesting master dump") || |
|
2194 |
try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, |
|
2195 |
reconnect_messages[SLAVE_RECON_ACT_DUMP])) |
|
2196 |
goto err; |
|
2197 |
goto connected; |
|
2198 |
}
|
|
2199 |
DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_DUMP", |
|
2200 |
if (!retry_count_dump) |
|
2201 |
{
|
|
2202 |
retry_count_dump++; |
|
2203 |
sql_print_information("Forcing to reconnect slave I/O thread"); |
|
2204 |
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, |
|
2205 |
reconnect_messages[SLAVE_RECON_ACT_DUMP])) |
|
2206 |
goto err; |
|
2207 |
goto connected; |
|
2208 |
});
|
|
2209 |
||
2210 |
while (!io_slave_killed(thd,mi)) |
|
2211 |
{
|
|
2212 |
ulong event_len; |
|
2213 |
/*
|
|
2214 |
We say "waiting" because read_event() will wait if there's nothing to
|
|
2215 |
read. But if there's something to read, it will not wait. The
|
|
2216 |
important thing is to not confuse users by saying "reading" whereas
|
|
2217 |
we're in fact receiving nothing.
|
|
2218 |
*/
|
|
2219 |
thd_proc_info(thd, "Waiting for master to send event"); |
|
2220 |
event_len= read_event(mysql, mi, &suppress_warnings); |
|
2221 |
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \ |
|
2222 |
reading event")) |
|
2223 |
goto err; |
|
2224 |
DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_EVENT", |
|
2225 |
if (!retry_count_event) |
|
2226 |
{
|
|
2227 |
retry_count_event++; |
|
2228 |
sql_print_information("Forcing to reconnect slave I/O thread"); |
|
2229 |
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, |
|
2230 |
reconnect_messages[SLAVE_RECON_ACT_EVENT])) |
|
2231 |
goto err; |
|
2232 |
goto connected; |
|
2233 |
});
|
|
2234 |
||
2235 |
if (event_len == packet_error) |
|
2236 |
{
|
|
2237 |
uint mysql_error_number= mysql_errno(mysql); |
|
2238 |
switch (mysql_error_number) { |
|
2239 |
case CR_NET_PACKET_TOO_LARGE: |
|
2240 |
sql_print_error("\ |
|
2241 |
Log entry on master is longer than max_allowed_packet (%ld) on \
|
|
2242 |
slave. If the entry is correct, restart the server with a higher value of \
|
|
2243 |
max_allowed_packet", |
|
2244 |
thd->variables.max_allowed_packet); |
|
2245 |
goto err; |
|
2246 |
case ER_MASTER_FATAL_ERROR_READING_BINLOG: |
|
2247 |
sql_print_error(ER(mysql_error_number), mysql_error_number, |
|
2248 |
mysql_error(mysql)); |
|
2249 |
goto err; |
|
2250 |
case EE_OUTOFMEMORY: |
|
2251 |
case ER_OUTOFMEMORY: |
|
2252 |
sql_print_error("\ |
|
2253 |
Stopping slave I/O thread due to out-of-memory error from master"); |
|
2254 |
goto err; |
|
2255 |
}
|
|
2256 |
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, |
|
2257 |
reconnect_messages[SLAVE_RECON_ACT_EVENT])) |
|
2258 |
goto err; |
|
2259 |
goto connected; |
|
2260 |
} // if (event_len == packet_error) |
|
2261 |
||
2262 |
retry_count=0; // ok event, reset retry counter |
|
2263 |
thd_proc_info(thd, "Queueing master event to the relay log"); |
|
2264 |
if (queue_event(mi,(const char*)mysql->net.read_pos + 1, event_len)) |
|
2265 |
{
|
|
2266 |
goto err; |
|
2267 |
}
|
|
2268 |
if (flush_master_info(mi, 1)) |
|
2269 |
{
|
|
2270 |
sql_print_error("Failed to flush master info file"); |
|
2271 |
goto err; |
|
2272 |
}
|
|
2273 |
/*
|
|
2274 |
See if the relay logs take too much space.
|
|
2275 |
We don't lock mi->rli.log_space_lock here; this dirty read saves time
|
|
2276 |
and does not introduce any problem:
|
|
2277 |
- if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
|
|
2278 |
the clean value is 0), then we are reading only one more event as we
|
|
2279 |
should, and we'll block only at the next event. No big deal.
|
|
2280 |
- if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
|
|
2281 |
the clean value is 1), then we are going into wait_for_relay_log_space()
|
|
2282 |
for no reason, but this function will do a clean read, notice the clean
|
|
2283 |
value and exit immediately.
|
|
2284 |
*/
|
|
2285 |
#ifndef DBUG_OFF
|
|
2286 |
{
|
|
2287 |
char llbuf1[22], llbuf2[22]; |
|
2288 |
DBUG_PRINT("info", ("log_space_limit=%s log_space_total=%s \ |
|
2289 |
ignore_log_space_limit=%d", |
|
2290 |
llstr(rli->log_space_limit,llbuf1), |
|
2291 |
llstr(rli->log_space_total,llbuf2), |
|
2292 |
(int) rli->ignore_log_space_limit)); |
|
2293 |
}
|
|
2294 |
#endif
|
|
2295 |
||
2296 |
if (rli->log_space_limit && rli->log_space_limit < |
|
2297 |
rli->log_space_total && |
|
2298 |
!rli->ignore_log_space_limit) |
|
2299 |
if (wait_for_relay_log_space(rli)) |
|
2300 |
{
|
|
2301 |
sql_print_error("Slave I/O thread aborted while waiting for relay \ |
|
2302 |
log space"); |
|
2303 |
goto err; |
|
2304 |
}
|
|
2305 |
}
|
|
2306 |
}
|
|
2307 |
||
2308 |
// error = 0;
|
|
2309 |
err: |
|
2310 |
// print the current replication position
|
|
2311 |
sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s", |
|
2312 |
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); |
|
2313 |
VOID(pthread_mutex_lock(&LOCK_thread_count)); |
|
2314 |
thd->query = thd->db = 0; // extra safety |
|
2315 |
thd->query_length= thd->db_length= 0; |
|
2316 |
VOID(pthread_mutex_unlock(&LOCK_thread_count)); |
|
2317 |
if (mysql) |
|
2318 |
{
|
|
2319 |
/*
|
|
2320 |
Here we need to clear the active VIO before closing the
|
|
2321 |
connection with the master. The reason is that THD::awake()
|
|
2322 |
might be called from terminate_slave_thread() because somebody
|
|
2323 |
issued a STOP SLAVE. If that happends, the close_active_vio()
|
|
2324 |
can be called in the middle of closing the VIO associated with
|
|
2325 |
the 'mysql' object, causing a crash.
|
|
2326 |
*/
|
|
2327 |
#ifdef SIGNAL_WITH_VIO_CLOSE
|
|
2328 |
thd->clear_active_vio(); |
|
2329 |
#endif
|
|
2330 |
mysql_close(mysql); |
|
2331 |
mi->mysql=0; |
|
2332 |
}
|
|
2333 |
write_ignored_events_info_to_relay_log(thd, mi); |
|
2334 |
thd_proc_info(thd, "Waiting for slave mutex on exit"); |
|
2335 |
pthread_mutex_lock(&mi->run_lock); |
|
2336 |
||
2337 |
/* Forget the relay log's format */
|
|
2338 |
delete mi->rli.relay_log.description_event_for_queue; |
|
2339 |
mi->rli.relay_log.description_event_for_queue= 0; |
|
2340 |
// TODO: make rpl_status part of Master_info
|
|
2341 |
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE); |
|
2342 |
DBUG_ASSERT(thd->net.buff != 0); |
|
2343 |
net_end(&thd->net); // destructor will not free it, because net.vio is 0 |
|
2344 |
close_thread_tables(thd); |
|
2345 |
pthread_mutex_lock(&LOCK_thread_count); |
|
2346 |
THD_CHECK_SENTRY(thd); |
|
2347 |
delete thd; |
|
2348 |
pthread_mutex_unlock(&LOCK_thread_count); |
|
2349 |
mi->abort_slave= 0; |
|
2350 |
mi->slave_running= 0; |
|
2351 |
mi->io_thd= 0; |
|
2352 |
/*
|
|
2353 |
Note: the order of the two following calls (first broadcast, then unlock)
|
|
2354 |
is important. Otherwise a killer_thread can execute between the calls and
|
|
2355 |
delete the mi structure leading to a crash! (see BUG#25306 for details)
|
|
2356 |
*/
|
|
2357 |
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done |
|
2358 |
pthread_mutex_unlock(&mi->run_lock); |
|
2359 |
my_thread_end(); |
|
2360 |
pthread_exit(0); |
|
2361 |
DBUG_RETURN(0); // Can't return anything here |
|
2362 |
}
|
|
2363 |
||
2364 |
||
2365 |
/* Slave SQL Thread entry point */
|
|
2366 |
||
2367 |
pthread_handler_t handle_slave_sql(void *arg) |
|
2368 |
{
|
|
2369 |
THD *thd; /* needs to be first for thread_stack */ |
|
2370 |
char llbuff[22],llbuff1[22]; |
|
2371 |
||
2372 |
Relay_log_info* rli = &((Master_info*)arg)->rli; |
|
2373 |
const char *errmsg; |
|
2374 |
||
2375 |
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
|
|
2376 |
my_thread_init(); |
|
2377 |
DBUG_ENTER("handle_slave_sql"); |
|
2378 |
||
2379 |
DBUG_ASSERT(rli->inited); |
|
2380 |
pthread_mutex_lock(&rli->run_lock); |
|
2381 |
DBUG_ASSERT(!rli->slave_running); |
|
2382 |
errmsg= 0; |
|
2383 |
#ifndef DBUG_OFF
|
|
2384 |
rli->events_till_abort = abort_slave_event_count; |
|
2385 |
#endif
|
|
2386 |
||
2387 |
thd = new THD; // note that contructor of THD uses DBUG_ ! |
|
2388 |
thd->thread_stack = (char*)&thd; // remember where our stack is |
|
2389 |
rli->sql_thd= thd; |
|
2390 |
||
2391 |
/* Inform waiting threads that slave has started */
|
|
2392 |
rli->slave_run_id++; |
|
2393 |
rli->slave_running = 1; |
|
2394 |
||
2395 |
pthread_detach_this_thread(); |
|
2396 |
if (init_slave_thread(thd, SLAVE_THD_SQL)) |
|
2397 |
{
|
|
2398 |
/*
|
|
2399 |
TODO: this is currently broken - slave start and change master
|
|
2400 |
will be stuck if we fail here
|
|
2401 |
*/
|
|
2402 |
pthread_cond_broadcast(&rli->start_cond); |
|
2403 |
pthread_mutex_unlock(&rli->run_lock); |
|
2404 |
sql_print_error("Failed during slave thread initialization"); |
|
2405 |
goto err; |
|
2406 |
}
|
|
2407 |
thd->init_for_queries(); |
|
2408 |
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables |
|
2409 |
pthread_mutex_lock(&LOCK_thread_count); |
|
2410 |
threads.append(thd); |
|
2411 |
pthread_mutex_unlock(&LOCK_thread_count); |
|
2412 |
/*
|
|
2413 |
We are going to set slave_running to 1. Assuming slave I/O thread is
|
|
2414 |
alive and connected, this is going to make Seconds_Behind_Master be 0
|
|
2415 |
i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
|
|
2416 |
the moment we start we can think we are caught up, and the next second we
|
|
2417 |
start receiving data so we realize we are not caught up and
|
|
2418 |
Seconds_Behind_Master grows. No big deal.
|
|
2419 |
*/
|
|
2420 |
rli->abort_slave = 0; |
|
2421 |
pthread_mutex_unlock(&rli->run_lock); |
|
2422 |
pthread_cond_broadcast(&rli->start_cond); |
|
2423 |
||
2424 |
/*
|
|
2425 |
Reset errors for a clean start (otherwise, if the master is idle, the SQL
|
|
2426 |
thread may execute no Query_log_event, so the error will remain even
|
|
2427 |
though there's no problem anymore). Do not reset the master timestamp
|
|
2428 |
(imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
|
|
2429 |
as we are not sure that we are going to receive a query, we want to
|
|
2430 |
remember the last master timestamp (to say how many seconds behind we are
|
|
2431 |
now.
|
|
2432 |
But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
|
|
2433 |
*/
|
|
2434 |
rli->clear_error(); |
|
2435 |
||
2436 |
//tell the I/O thread to take relay_log_space_limit into account from now on
|
|
2437 |
pthread_mutex_lock(&rli->log_space_lock); |
|
2438 |
rli->ignore_log_space_limit= 0; |
|
2439 |
pthread_mutex_unlock(&rli->log_space_lock); |
|
2440 |
rli->trans_retries= 0; // start from "no error" |
|
2441 |
DBUG_PRINT("info", ("rli->trans_retries: %lu", rli->trans_retries)); |
|
2442 |
||
2443 |
if (init_relay_log_pos(rli, |
|
2444 |
rli->group_relay_log_name, |
|
2445 |
rli->group_relay_log_pos, |
|
2446 |
1 /*need data lock*/, &errmsg, |
|
2447 |
1 /*look for a description_event*/)) |
|
2448 |
{
|
|
2449 |
sql_print_error("Error initializing relay log position: %s", |
|
2450 |
errmsg); |
|
2451 |
goto err; |
|
2452 |
}
|
|
2453 |
THD_CHECK_SENTRY(thd); |
|
2454 |
#ifndef DBUG_OFF
|
|
2455 |
{
|
|
2456 |
char llbuf1[22], llbuf2[22]; |
|
2457 |
DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s", |
|
2458 |
llstr(my_b_tell(rli->cur_log),llbuf1), |
|
2459 |
llstr(rli->event_relay_log_pos,llbuf2))); |
|
2460 |
DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE); |
|
2461 |
/*
|
|
2462 |
Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
|
|
2463 |
correct position when it's called just after my_b_seek() (the questionable
|
|
2464 |
stuff is those "seek is done on next read" comments in the my_b_seek()
|
|
2465 |
source code).
|
|
2466 |
The crude reality is that this assertion randomly fails whereas
|
|
2467 |
replication seems to work fine. And there is no easy explanation why it
|
|
2468 |
fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
|
|
2469 |
init_relay_log_pos() called above). Maybe the assertion would be
|
|
2470 |
meaningful if we held rli->data_lock between the my_b_seek() and the
|
|
2471 |
DBUG_ASSERT().
|
|
2472 |
*/
|
|
2473 |
#ifdef SHOULD_BE_CHECKED
|
|
2474 |
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos); |
|
2475 |
#endif
|
|
2476 |
}
|
|
2477 |
#endif
|
|
2478 |
DBUG_ASSERT(rli->sql_thd == thd); |
|
2479 |
||
2480 |
DBUG_PRINT("master_info",("log_file_name: %s position: %s", |
|
2481 |
rli->group_master_log_name, |
|
2482 |
llstr(rli->group_master_log_pos,llbuff))); |
|
2483 |
if (global_system_variables.log_warnings) |
|
2484 |
sql_print_information("Slave SQL thread initialized, starting replication in \ |
|
2485 |
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, |
|
2486 |
llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name, |
|
2487 |
llstr(rli->group_relay_log_pos,llbuff1)); |
|
2488 |
||
2489 |
/* execute init_slave variable */
|
|
2490 |
if (sys_init_slave.value_length) |
|
2491 |
{
|
|
2492 |
execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave); |
|
2493 |
if (thd->is_slave_error) |
|
2494 |
{
|
|
2495 |
sql_print_error("\ |
|
2496 |
Slave SQL thread aborted. Can't execute init_slave query"); |
|
2497 |
goto err; |
|
2498 |
}
|
|
2499 |
}
|
|
2500 |
||
2501 |
/*
|
|
2502 |
First check until condition - probably there is nothing to execute. We
|
|
2503 |
do not want to wait for next event in this case.
|
|
2504 |
*/
|
|
2505 |
pthread_mutex_lock(&rli->data_lock); |
|
2506 |
if (rli->until_condition != Relay_log_info::UNTIL_NONE && |
|
2507 |
rli->is_until_satisfied(rli->group_master_log_pos)) |
|
2508 |
{
|
|
2509 |
char buf[22]; |
|
2510 |
sql_print_information("Slave SQL thread stopped because it reached its" |
|
2511 |
" UNTIL position %s", llstr(rli->until_pos(), buf)); |
|
2512 |
pthread_mutex_unlock(&rli->data_lock); |
|
2513 |
goto err; |
|
2514 |
}
|
|
2515 |
pthread_mutex_unlock(&rli->data_lock); |
|
2516 |
||
2517 |
/* Read queries from the IO/THREAD until this thread is killed */
|
|
2518 |
||
2519 |
while (!sql_slave_killed(thd,rli)) |
|
2520 |
{
|
|
2521 |
thd_proc_info(thd, "Reading event from the relay log"); |
|
2522 |
DBUG_ASSERT(rli->sql_thd == thd); |
|
2523 |
THD_CHECK_SENTRY(thd); |
|
2524 |
if (exec_relay_log_event(thd,rli)) |
|
2525 |
{
|
|
2526 |
DBUG_PRINT("info", ("exec_relay_log_event() failed")); |
|
2527 |
// do not scare the user if SQL thread was simply killed or stopped
|
|
2528 |
if (!sql_slave_killed(thd,rli)) |
|
2529 |
{
|
|
2530 |
/*
|
|
2531 |
retrieve as much info as possible from the thd and, error
|
|
2532 |
codes and warnings and print this to the error log as to
|
|
2533 |
allow the user to locate the error
|
|
2534 |
*/
|
|
2535 |
uint32 const last_errno= rli->last_error().number; |
|
2536 |
||
2537 |
if (thd->is_error()) |
|
2538 |
{
|
|
2539 |
char const *const errmsg= thd->main_da.message(); |
|
2540 |
||
2541 |
DBUG_PRINT("info", |
|
2542 |
("thd->main_da.sql_errno()=%d; rli->last_error.number=%d", |
|
2543 |
thd->main_da.sql_errno(), last_errno)); |
|
2544 |
if (last_errno == 0) |
|
2545 |
{
|
|
2546 |
rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), errmsg); |
|
2547 |
}
|
|
2548 |
else if (last_errno != thd->main_da.sql_errno()) |
|
2549 |
{
|
|
2550 |
sql_print_error("Slave (additional info): %s Error_code: %d", |
|
2551 |
errmsg, thd->main_da.sql_errno()); |
|
2552 |
}
|
|
2553 |
}
|
|
2554 |
||
2555 |
/* Print any warnings issued */
|
|
2556 |
List_iterator_fast<MYSQL_ERROR> it(thd->warn_list); |
|
2557 |
MYSQL_ERROR *err; |
|
2558 |
/*
|
|
2559 |
Added controlled slave thread cancel for replication
|
|
2560 |
of user-defined variables.
|
|
2561 |
*/
|
|
2562 |
bool udf_error = false; |
|
2563 |
while ((err= it++)) |
|
2564 |
{
|
|
2565 |
if (err->code == ER_CANT_OPEN_LIBRARY) |
|
2566 |
udf_error = true; |
|
2567 |
sql_print_warning("Slave: %s Error_code: %d",err->msg, err->code); |
|
2568 |
}
|
|
2569 |
if (udf_error) |
|
2570 |
sql_print_error("Error loading user-defined library, slave SQL " |
|
2571 |
"thread aborted. Install the missing library, and restart the "
|
|
2572 |
"slave SQL thread with \"SLAVE START\". We stopped at log '%s' " |
|
2573 |
"position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, |
|
2574 |
llbuff)); |
|
2575 |
else
|
|
2576 |
sql_print_error("\ |
|
2577 |
Error running query, slave SQL thread aborted. Fix the problem, and restart \
|
|
2578 |
the slave SQL thread with \"SLAVE START\". We stopped at log \ |
|
2579 |
'%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff)); |
|
2580 |
}
|
|
2581 |
goto err; |
|
2582 |
}
|
|
2583 |
}
|
|
2584 |
||
2585 |
/* Thread stopped. Print the current replication position to the log */
|
|
2586 |
sql_print_information("Slave SQL thread exiting, replication stopped in log " |
|
2587 |
"'%s' at position %s", |
|
2588 |
RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff)); |
|
2589 |
||
2590 |
err: |
|
2591 |
||
2592 |
/*
|
|
2593 |
Some events set some playgrounds, which won't be cleared because thread
|
|
2594 |
stops. Stopping of this thread may not be known to these events ("stop"
|
|
2595 |
request is detected only by the present function, not by events), so we
|
|
2596 |
must "proactively" clear playgrounds:
|
|
2597 |
*/
|
|
2598 |
rli->cleanup_context(thd, 1); |
|
2599 |
VOID(pthread_mutex_lock(&LOCK_thread_count)); |
|
2600 |
/*
|
|
2601 |
Some extra safety, which should not been needed (normally, event deletion
|
|
2602 |
should already have done these assignments (each event which sets these
|
|
2603 |
variables is supposed to set them to 0 before terminating)).
|
|
2604 |
*/
|
|
2605 |
thd->query= thd->db= thd->catalog= 0; |
|
2606 |
thd->query_length= thd->db_length= 0; |
|
2607 |
VOID(pthread_mutex_unlock(&LOCK_thread_count)); |
|
2608 |
thd_proc_info(thd, "Waiting for slave mutex on exit"); |
|
2609 |
pthread_mutex_lock(&rli->run_lock); |
|
2610 |
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
|
|
2611 |
pthread_mutex_lock(&rli->data_lock); |
|
2612 |
DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun |
|
2613 |
/* When master_pos_wait() wakes up it will check this and terminate */
|
|
2614 |
rli->slave_running= 0; |
|
2615 |
/* Forget the relay log's format */
|
|
2616 |
delete rli->relay_log.description_event_for_exec; |
|
2617 |
rli->relay_log.description_event_for_exec= 0; |
|
2618 |
/* Wake up master_pos_wait() */
|
|
2619 |
pthread_mutex_unlock(&rli->data_lock); |
|
2620 |
DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions")); |
|
2621 |
pthread_cond_broadcast(&rli->data_cond); |
|
2622 |
rli->ignore_log_space_limit= 0; /* don't need any lock */ |
|
2623 |
/* we die so won't remember charset - re-update them on next thread start */
|
|
2624 |
rli->cached_charset_invalidate(); |
|
2625 |
rli->save_temporary_tables = thd->temporary_tables; |
|
2626 |
||
2627 |
/*
|
|
2628 |
TODO: see if we can do this conditionally in next_event() instead
|
|
2629 |
to avoid unneeded position re-init
|
|
2630 |
*/
|
|
2631 |
thd->temporary_tables = 0; // remove tempation from destructor to close them |
|
2632 |
DBUG_ASSERT(thd->net.buff != 0); |
|
2633 |
net_end(&thd->net); // destructor will not free it, because we are weird |
|
2634 |
DBUG_ASSERT(rli->sql_thd == thd); |
|
2635 |
THD_CHECK_SENTRY(thd); |
|
2636 |
rli->sql_thd= 0; |
|
2637 |
pthread_mutex_lock(&LOCK_thread_count); |
|
2638 |
THD_CHECK_SENTRY(thd); |
|
2639 |
delete thd; |
|
2640 |
pthread_mutex_unlock(&LOCK_thread_count); |
|
2641 |
/*
|
|
2642 |
Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
|
|
2643 |
is important. Otherwise a killer_thread can execute between the calls and
|
|
2644 |
delete the mi structure leading to a crash! (see BUG#25306 for details)
|
|
2645 |
*/
|
|
2646 |
pthread_cond_broadcast(&rli->stop_cond); |
|
2647 |
pthread_mutex_unlock(&rli->run_lock); // tell the world we are done |
|
2648 |
||
2649 |
my_thread_end(); |
|
2650 |
pthread_exit(0); |
|
2651 |
DBUG_RETURN(0); // Can't return anything here |
|
2652 |
}
|
|
2653 |
||
2654 |
||
2655 |
/*
|
|
2656 |
process_io_create_file()
|
|
2657 |
*/
|
|
2658 |
||
2659 |
static int process_io_create_file(Master_info* mi, Create_file_log_event* cev) |
|
2660 |
{
|
|
2661 |
int error = 1; |
|
2662 |
ulong num_bytes; |
|
2663 |
bool cev_not_written; |
|
2664 |
THD *thd = mi->io_thd; |
|
2665 |
NET *net = &mi->mysql->net; |
|
2666 |
DBUG_ENTER("process_io_create_file"); |
|
2667 |
||
2668 |
if (unlikely(!cev->is_valid())) |
|
2669 |
DBUG_RETURN(1); |
|
2670 |
||
2671 |
if (!rpl_filter->db_ok(cev->db)) |
|
2672 |
{
|
|
2673 |
skip_load_data_infile(net); |
|
2674 |
DBUG_RETURN(0); |
|
2675 |
}
|
|
2676 |
DBUG_ASSERT(cev->inited_from_old); |
|
2677 |
thd->file_id = cev->file_id = mi->file_id++; |
|
2678 |
thd->server_id = cev->server_id; |
|
2679 |
cev_not_written = 1; |
|
2680 |
||
2681 |
if (unlikely(net_request_file(net,cev->fname))) |
|
2682 |
{
|
|
2683 |
sql_print_error("Slave I/O: failed requesting download of '%s'", |
|
2684 |
cev->fname); |
|
2685 |
goto err; |
|
2686 |
}
|
|
2687 |
||
2688 |
/*
|
|
2689 |
This dummy block is so we could instantiate Append_block_log_event
|
|
2690 |
once and then modify it slightly instead of doing it multiple times
|
|
2691 |
in the loop
|
|
2692 |
*/
|
|
2693 |
{
|
|
2694 |
Append_block_log_event aev(thd,0,0,0,0); |
|
2695 |
||
2696 |
for (;;) |
|
2697 |
{
|
|
2698 |
if (unlikely((num_bytes=my_net_read(net)) == packet_error)) |
|
2699 |
{
|
|
2700 |
sql_print_error("Network read error downloading '%s' from master", |
|
2701 |
cev->fname); |
|
2702 |
goto err; |
|
2703 |
}
|
|
2704 |
if (unlikely(!num_bytes)) /* eof */ |
|
2705 |
{
|
|
2706 |
/* 3.23 master wants it */
|
|
2707 |
net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); |
|
2708 |
/*
|
|
2709 |
If we wrote Create_file_log_event, then we need to write
|
|
2710 |
Execute_load_log_event. If we did not write Create_file_log_event,
|
|
2711 |
then this is an empty file and we can just do as if the LOAD DATA
|
|
2712 |
INFILE had not existed, i.e. write nothing.
|
|
2713 |
*/
|
|
2714 |
if (unlikely(cev_not_written)) |
|
2715 |
break; |
|
2716 |
Execute_load_log_event xev(thd,0,0); |
|
2717 |
xev.log_pos = cev->log_pos; |
|
2718 |
if (unlikely(mi->rli.relay_log.append(&xev))) |
|
2719 |
{
|
|
2720 |
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, |
|
2721 |
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), |
|
2722 |
"error writing Exec_load event to relay log"); |
|
2723 |
goto err; |
|
2724 |
}
|
|
2725 |
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total); |
|
2726 |
break; |
|
2727 |
}
|
|
2728 |
if (unlikely(cev_not_written)) |
|
2729 |
{
|
|
2730 |
cev->block = net->read_pos; |
|
2731 |
cev->block_len = num_bytes; |
|
2732 |
if (unlikely(mi->rli.relay_log.append(cev))) |
|
2733 |
{
|
|
2734 |
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, |
|
2735 |
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), |
|
2736 |
"error writing Create_file event to relay log"); |
|
2737 |
goto err; |
|
2738 |
}
|
|
2739 |
cev_not_written=0; |
|
2740 |
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total); |
|
2741 |
}
|
|
2742 |
else
|
|
2743 |
{
|
|
2744 |
aev.block = net->read_pos; |
|
2745 |
aev.block_len = num_bytes; |
|
2746 |
aev.log_pos = cev->log_pos; |
|
2747 |
if (unlikely(mi->rli.relay_log.append(&aev))) |
|
2748 |
{
|
|
2749 |
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, |
|
2750 |
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), |
|
2751 |
"error writing Append_block event to relay log"); |
|
2752 |
goto err; |
|
2753 |
}
|
|
2754 |
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ; |
|
2755 |
}
|
|
2756 |
}
|
|
2757 |
}
|
|
2758 |
error=0; |
|
2759 |
err: |
|
2760 |
DBUG_RETURN(error); |
|
2761 |
}
|
|
2762 |
||
2763 |
||
2764 |
/*
|
|
2765 |
Start using a new binary log on the master
|
|
2766 |
||
2767 |
SYNOPSIS
|
|
2768 |
process_io_rotate()
|
|
2769 |
mi master_info for the slave
|
|
2770 |
rev The rotate log event read from the binary log
|
|
2771 |
||
2772 |
DESCRIPTION
|
|
2773 |
Updates the master info with the place in the next binary
|
|
2774 |
log where we should start reading.
|
|
2775 |
Rotate the relay log to avoid mixed-format relay logs.
|
|
2776 |
||
2777 |
NOTES
|
|
2778 |
We assume we already locked mi->data_lock
|
|
2779 |
||
2780 |
RETURN VALUES
|
|
2781 |
0 ok
|
|
2782 |
1 Log event is illegal
|
|
2783 |
||
2784 |
*/
|
|
2785 |
||
2786 |
static int process_io_rotate(Master_info *mi, Rotate_log_event *rev) |
|
2787 |
{
|
|
2788 |
DBUG_ENTER("process_io_rotate"); |
|
2789 |
safe_mutex_assert_owner(&mi->data_lock); |
|
2790 |
||
2791 |
if (unlikely(!rev->is_valid())) |
|
2792 |
DBUG_RETURN(1); |
|
2793 |
||
2794 |
/* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
|
|
2795 |
memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1); |
|
2796 |
mi->master_log_pos= rev->pos; |
|
2797 |
DBUG_PRINT("info", ("master_log_pos: '%s' %lu", |
|
2798 |
mi->master_log_name, (ulong) mi->master_log_pos)); |
|
2799 |
#ifndef DBUG_OFF
|
|
2800 |
/*
|
|
2801 |
If we do not do this, we will be getting the first
|
|
2802 |
rotate event forever, so we need to not disconnect after one.
|
|
2803 |
*/
|
|
2804 |
if (disconnect_slave_event_count) |
|
2805 |
mi->events_till_disconnect++; |
|
2806 |
#endif
|
|
2807 |
||
2808 |
/*
|
|
2809 |
If description_event_for_queue is format <4, there is conversion in the
|
|
2810 |
relay log to the slave's format (4). And Rotate can mean upgrade or
|
|
2811 |
nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
|
|
2812 |
no need to reset description_event_for_queue now. And if it's nothing (same
|
|
2813 |
master version as before), no need (still using the slave's format).
|
|
2814 |
*/
|
|
2815 |
if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4) |
|
2816 |
{
|
|
2817 |
delete mi->rli.relay_log.description_event_for_queue; |
|
2818 |
/* start from format 3 (MySQL 4.0) again */
|
|
2819 |
mi->rli.relay_log.description_event_for_queue= new |
|
2820 |
Format_description_log_event(3); |
|
2821 |
}
|
|
2822 |
/*
|
|
2823 |
Rotate the relay log makes binlog format detection easier (at next slave
|
|
2824 |
start or mysqlbinlog)
|
|
2825 |
*/
|
|
2826 |
rotate_relay_log(mi); /* will take the right mutexes */ |
|
2827 |
DBUG_RETURN(0); |
|
2828 |
}
|
|
2829 |
||
2830 |
/*
|
|
2831 |
Reads a 3.23 event and converts it to the slave's format. This code was
|
|
2832 |
copied from MySQL 4.0.
|
|
2833 |
*/
|
|
2834 |
static int queue_binlog_ver_1_event(Master_info *mi, const char *buf, |
|
2835 |
ulong event_len) |
|
2836 |
{
|
|
2837 |
const char *errmsg = 0; |
|
2838 |
ulong inc_pos; |
|
2839 |
bool ignore_event= 0; |
|
2840 |
char *tmp_buf = 0; |
|
2841 |
Relay_log_info *rli= &mi->rli; |
|
2842 |
DBUG_ENTER("queue_binlog_ver_1_event"); |
|
2843 |
||
2844 |
/*
|
|
2845 |
If we get Load event, we need to pass a non-reusable buffer
|
|
2846 |
to read_log_event, so we do a trick
|
|
2847 |
*/
|
|
2848 |
if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) |
|
2849 |
{
|
|
2850 |
if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME))))) |
|
2851 |
{
|
|
2852 |
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, |
|
2853 |
ER(ER_SLAVE_FATAL_ERROR), "Memory allocation failed"); |
|
2854 |
DBUG_RETURN(1); |
|
2855 |
}
|
|
2856 |
memcpy(tmp_buf,buf,event_len); |
|
2857 |
/*
|
|
2858 |
Create_file constructor wants a 0 as last char of buffer, this 0 will
|
|
2859 |
serve as the string-termination char for the file's name (which is at the
|
|
2860 |
end of the buffer)
|
|
2861 |
We must increment event_len, otherwise the event constructor will not see
|
|
2862 |
this end 0, which leads to segfault.
|
|
2863 |
*/
|
|
2864 |
tmp_buf[event_len++]=0; |
|
2865 |
int4store(tmp_buf+EVENT_LEN_OFFSET, event_len); |
|
2866 |
buf = (const char*)tmp_buf; |
|
2867 |
}
|
|
2868 |
/*
|
|
2869 |
This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
|
|
2870 |
send the loaded file, and write it to the relay log in the form of
|
|
2871 |
Append_block/Exec_load (the SQL thread needs the data, as that thread is not
|
|
2872 |
connected to the master).
|
|
2873 |
*/
|
|
2874 |
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg, |
|
2875 |
mi->rli.relay_log.description_event_for_queue); |
|
2876 |
if (unlikely(!ev)) |
|
2877 |
{
|
|
2878 |
sql_print_error("Read invalid event from master: '%s',\ |
|
2879 |
master could be corrupt but a more likely cause of this is a bug", |
|
2880 |
errmsg); |
|
2881 |
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR)); |
|
2882 |
DBUG_RETURN(1); |
|
2883 |
}
|
|
2884 |
||
2885 |
pthread_mutex_lock(&mi->data_lock); |
|
2886 |
ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */ |
|
2887 |
switch (ev->get_type_code()) { |
|
2888 |
case STOP_EVENT: |
|
2889 |
ignore_event= 1; |
|
2890 |
inc_pos= event_len; |
|
2891 |
break; |
|
2892 |
case ROTATE_EVENT: |
|
2893 |
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev))) |
|
2894 |
{
|
|
2895 |
delete ev; |
|
2896 |
pthread_mutex_unlock(&mi->data_lock); |
|
2897 |
DBUG_RETURN(1); |
|
2898 |
}
|
|
2899 |
inc_pos= 0; |
|
2900 |
break; |
|
2901 |
case CREATE_FILE_EVENT: |
|
2902 |
/*
|
|
2903 |
Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
|
|
2904 |
queue_old_event() which is for 3.23 events which don't comprise
|
|
2905 |
CREATE_FILE_EVENT. This is because read_log_event() above has just
|
|
2906 |
transformed LOAD_EVENT into CREATE_FILE_EVENT.
|
|
2907 |
*/
|
|
2908 |
{
|
|
2909 |
/* We come here when and only when tmp_buf != 0 */
|
|
2910 |
DBUG_ASSERT(tmp_buf != 0); |
|
2911 |
inc_pos=event_len; |
|
2912 |
ev->log_pos+= inc_pos; |
|
2913 |
int error = process_io_create_file(mi,(Create_file_log_event*)ev); |
|
2914 |
delete ev; |
|
2915 |
mi->master_log_pos += inc_pos; |
|
2916 |
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); |
|
2917 |
pthread_mutex_unlock(&mi->data_lock); |
|
2918 |
my_free((char*)tmp_buf, MYF(0)); |
|
2919 |
DBUG_RETURN(error); |
|
2920 |
}
|
|
2921 |
default: |
|
2922 |
inc_pos= event_len; |
|
2923 |
break; |
|
2924 |
}
|
|
2925 |
if (likely(!ignore_event)) |
|
2926 |
{
|
|
2927 |
if (ev->log_pos) |
|
2928 |
/*
|
|
2929 |
Don't do it for fake Rotate events (see comment in
|
|
2930 |
Log_event::Log_event(const char* buf...) in log_event.cc).
|
|
2931 |
*/
|
|
2932 |
ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */ |
|
2933 |
if (unlikely(rli->relay_log.append(ev))) |
|
2934 |
{
|
|
2935 |
delete ev; |
|
2936 |
pthread_mutex_unlock(&mi->data_lock); |
|
2937 |
DBUG_RETURN(1); |
|
2938 |
}
|
|
2939 |
rli->relay_log.harvest_bytes_written(&rli->log_space_total); |
|
2940 |
}
|
|
2941 |
delete ev; |
|
2942 |
mi->master_log_pos+= inc_pos; |
|
2943 |
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); |
|
2944 |
pthread_mutex_unlock(&mi->data_lock); |
|
2945 |
DBUG_RETURN(0); |
|
2946 |
}
|
|
2947 |
||
2948 |
/*
|
|
2949 |
Reads a 4.0 event and converts it to the slave's format. This code was copied
|
|
2950 |
from queue_binlog_ver_1_event(), with some affordable simplifications.
|
|
2951 |
*/
|
|
2952 |
static int queue_binlog_ver_3_event(Master_info *mi, const char *buf, |
|
2953 |
ulong event_len) |
|
2954 |
{
|
|
2955 |
const char *errmsg = 0; |
|
2956 |
ulong inc_pos; |
|
2957 |
char *tmp_buf = 0; |
|
2958 |
Relay_log_info *rli= &mi->rli; |
|
2959 |
DBUG_ENTER("queue_binlog_ver_3_event"); |
|
2960 |
||
2961 |
/* read_log_event() will adjust log_pos to be end_log_pos */
|
|
2962 |
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg, |
|
2963 |
mi->rli.relay_log.description_event_for_queue); |
|
2964 |
if (unlikely(!ev)) |
|
2965 |
{
|
|
2966 |
sql_print_error("Read invalid event from master: '%s',\ |
|
2967 |
master could be corrupt but a more likely cause of this is a bug", |
|
2968 |
errmsg); |
|
2969 |
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR)); |
|
2970 |
DBUG_RETURN(1); |
|
2971 |
}
|
|
2972 |
pthread_mutex_lock(&mi->data_lock); |
|
2973 |
switch (ev->get_type_code()) { |
|
2974 |
case STOP_EVENT: |
|
2975 |
goto err; |
|
2976 |
case ROTATE_EVENT: |
|
2977 |
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev))) |
|
2978 |
{
|
|
2979 |
delete ev; |
|
2980 |
pthread_mutex_unlock(&mi->data_lock); |
|
2981 |
DBUG_RETURN(1); |
|
2982 |
}
|
|
2983 |
inc_pos= 0; |
|
2984 |
break; |
|
2985 |
default: |
|
2986 |
inc_pos= event_len; |
|
2987 |
break; |
|
2988 |
}
|
|
2989 |
if (unlikely(rli->relay_log.append(ev))) |
|
2990 |
{
|
|
2991 |
delete ev; |
|
2992 |
pthread_mutex_unlock(&mi->data_lock); |
|
2993 |
DBUG_RETURN(1); |
|
2994 |
}
|
|
2995 |
rli->relay_log.harvest_bytes_written(&rli->log_space_total); |
|
2996 |
delete ev; |
|
2997 |
mi->master_log_pos+= inc_pos; |
|
2998 |
err: |
|
2999 |
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); |
|
3000 |
pthread_mutex_unlock(&mi->data_lock); |
|
3001 |
DBUG_RETURN(0); |
|
3002 |
}
|
|
3003 |
||
3004 |
/*
|
|
3005 |
queue_old_event()
|
|
3006 |
||
3007 |
Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
|
|
3008 |
(exactly, slave's) format. To do the conversion, we create a 5.0 event from
|
|
3009 |
the 3.23/4.0 bytes, then write this event to the relay log.
|
|
3010 |
||
3011 |
TODO:
|
|
3012 |
Test this code before release - it has to be tested on a separate
|
|
3013 |
setup with 3.23 master or 4.0 master
|
|
3014 |
*/
|
|
3015 |
||
3016 |
static int queue_old_event(Master_info *mi, const char *buf, |
|
3017 |
ulong event_len) |
|
3018 |
{
|
|
3019 |
DBUG_ENTER("queue_old_event"); |
|
3020 |
||
3021 |
switch (mi->rli.relay_log.description_event_for_queue->binlog_version) |
|
3022 |
{
|
|
3023 |
case 1: |
|
3024 |
DBUG_RETURN(queue_binlog_ver_1_event(mi,buf,event_len)); |
|
3025 |
case 3: |
|
3026 |
DBUG_RETURN(queue_binlog_ver_3_event(mi,buf,event_len)); |
|
3027 |
default: /* unsupported format; eg version 2 */ |
|
3028 |
DBUG_PRINT("info",("unsupported binlog format %d in queue_old_event()", |
|
3029 |
mi->rli.relay_log.description_event_for_queue->binlog_version)); |
|
3030 |
DBUG_RETURN(1); |
|
3031 |
}
|
|
3032 |
}
|
|
3033 |
||
3034 |
/*
|
|
3035 |
queue_event()
|
|
3036 |
||
3037 |
If the event is 3.23/4.0, passes it to queue_old_event() which will convert
|
|
3038 |
it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
|
|
3039 |
no format conversion, it's pure read/write of bytes.
|
|
3040 |
So a 5.0.0 slave's relay log can contain events in the slave's format or in
|
|
3041 |
any >=5.0.0 format.
|
|
3042 |
*/
|
|
3043 |
||
3044 |
static int queue_event(Master_info* mi,const char* buf, ulong event_len) |
|
3045 |
{
|
|
3046 |
int error= 0; |
|
3047 |
String error_msg; |
|
3048 |
ulong inc_pos; |
|
3049 |
Relay_log_info *rli= &mi->rli; |
|
3050 |
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock(); |
|
3051 |
DBUG_ENTER("queue_event"); |
|
3052 |
||
3053 |
||
3054 |
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 && |
|
3055 |
buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) |
|
3056 |
DBUG_RETURN(queue_old_event(mi,buf,event_len)); |
|
3057 |
||
3058 |
pthread_mutex_lock(&mi->data_lock); |
|
3059 |
||
3060 |
switch (buf[EVENT_TYPE_OFFSET]) { |
|
3061 |
case STOP_EVENT: |
|
3062 |
/*
|
|
3063 |
We needn't write this event to the relay log. Indeed, it just indicates a
|
|
3064 |
master server shutdown. The only thing this does is cleaning. But
|
|
3065 |
cleaning is already done on a per-master-thread basis (as the master
|
|
3066 |
server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
|
|
3067 |
prepared statements' deletion are TODO only when we binlog prep stmts).
|
|
3068 |
||
3069 |
We don't even increment mi->master_log_pos, because we may be just after
|
|
3070 |
a Rotate event. Btw, in a few milliseconds we are going to have a Start
|
|
3071 |
event from the next binlog (unless the master is presently running
|
|
3072 |
without --log-bin).
|
|
3073 |
*/
|
|
3074 |
goto err; |
|
3075 |
case ROTATE_EVENT: |
|
3076 |
{
|
|
3077 |
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue); |
|
3078 |
if (unlikely(process_io_rotate(mi,&rev))) |
|
3079 |
{
|
|
3080 |
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; |
|
3081 |
goto err; |
|
3082 |
}
|
|
3083 |
/*
|
|
3084 |
Now the I/O thread has just changed its mi->master_log_name, so
|
|
3085 |
incrementing mi->master_log_pos is nonsense.
|
|
3086 |
*/
|
|
3087 |
inc_pos= 0; |
|
3088 |
break; |
|
3089 |
}
|
|
3090 |
case FORMAT_DESCRIPTION_EVENT: |
|
3091 |
{
|
|
3092 |
/*
|
|
3093 |
Create an event, and save it (when we rotate the relay log, we will have
|
|
3094 |
to write this event again).
|
|
3095 |
*/
|
|
3096 |
/*
|
|
3097 |
We are the only thread which reads/writes description_event_for_queue.
|
|
3098 |
The relay_log struct does not move (though some members of it can
|
|
3099 |
change), so we needn't any lock (no rli->data_lock, no log lock).
|
|
3100 |
*/
|
|
3101 |
Format_description_log_event* tmp; |
|
3102 |
const char* errmsg; |
|
3103 |
if (!(tmp= (Format_description_log_event*) |
|
3104 |
Log_event::read_log_event(buf, event_len, &errmsg, |
|
3105 |
mi->rli.relay_log.description_event_for_queue))) |
|
3106 |
{
|
|
3107 |
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; |
|
3108 |
goto err; |
|
3109 |
}
|
|
3110 |
delete mi->rli.relay_log.description_event_for_queue; |
|
3111 |
mi->rli.relay_log.description_event_for_queue= tmp; |
|
3112 |
/*
|
|
3113 |
Though this does some conversion to the slave's format, this will
|
|
3114 |
preserve the master's binlog format version, and number of event types.
|
|
3115 |
*/
|
|
3116 |
/*
|
|
3117 |
If the event was not requested by the slave (the slave did not ask for
|
|
3118 |
it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
|
|
3119 |
*/
|
|
3120 |
inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0; |
|
3121 |
DBUG_PRINT("info",("binlog format is now %d", |
|
3122 |
mi->rli.relay_log.description_event_for_queue->binlog_version)); |
|
3123 |
||
3124 |
}
|
|
3125 |
break; |
|
3126 |
||
3127 |
case HEARTBEAT_LOG_EVENT: |
|
3128 |
{
|
|
3129 |
/*
|
|
3130 |
HB (heartbeat) cannot come before RL (Relay)
|
|
3131 |
*/
|
|
3132 |
char llbuf[22]; |
|
3133 |
Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue); |
|
3134 |
if (!hb.is_valid()) |
|
3135 |
{
|
|
3136 |
error= ER_SLAVE_HEARTBEAT_FAILURE; |
|
3137 |
error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;")); |
|
3138 |
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name ")); |
|
3139 |
error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident())); |
|
3140 |
error_msg.append(STRING_WITH_LEN(" log_pos ")); |
|
3141 |
llstr(hb.log_pos, llbuf); |
|
3142 |
error_msg.append(llbuf, strlen(llbuf)); |
|
3143 |
goto err; |
|
3144 |
}
|
|
3145 |
mi->received_heartbeats++; |
|
3146 |
/*
|
|
3147 |
compare local and event's versions of log_file, log_pos.
|
|
3148 |
|
|
3149 |
Heartbeat is sent only after an event corresponding to the corrdinates
|
|
3150 |
the heartbeat carries.
|
|
3151 |
Slave can not have a difference in coordinates except in the only
|
|
3152 |
special case when mi->master_log_name, master_log_pos have never
|
|
3153 |
been updated by Rotate event i.e when slave does not have any history
|
|
3154 |
with the master (and thereafter mi->master_log_pos is NULL).
|
|
3155 |
||
3156 |
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
|
|
3157 |
*/
|
|
3158 |
if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len()) |
|
3159 |
&& mi->master_log_name != NULL) |
|
3160 |
|| mi->master_log_pos != hb.log_pos) |
|
3161 |
{
|
|
3162 |
/* missed events of heartbeat from the past */
|
|
3163 |
error= ER_SLAVE_HEARTBEAT_FAILURE; |
|
3164 |
error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;")); |
|
3165 |
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name ")); |
|
3166 |
error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident())); |
|
3167 |
error_msg.append(STRING_WITH_LEN(" log_pos ")); |
|
3168 |
llstr(hb.log_pos, llbuf); |
|
3169 |
error_msg.append(llbuf, strlen(llbuf)); |
|
3170 |
goto err; |
|
3171 |
}
|
|
3172 |
goto skip_relay_logging; |
|
3173 |
}
|
|
3174 |
break; |
|
3175 |
||
3176 |
default: |
|
3177 |
inc_pos= event_len; |
|
3178 |
break; |
|
3179 |
}
|
|
3180 |
||
3181 |
/*
|
|
3182 |
If this event is originating from this server, don't queue it.
|
|
3183 |
We don't check this for 3.23 events because it's simpler like this; 3.23
|
|
3184 |
will be filtered anyway by the SQL slave thread which also tests the
|
|
3185 |
server id (we must also keep this test in the SQL thread, in case somebody
|
|
3186 |
upgrades a 4.0 slave which has a not-filtered relay log).
|
|
3187 |
||
3188 |
ANY event coming from ourselves can be ignored: it is obvious for queries;
|
|
3189 |
for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
|
|
3190 |
(--log-slave-updates would not log that) unless this slave is also its
|
|
3191 |
direct master (an unsupported, useless setup!).
|
|
3192 |
*/
|
|
3193 |
||
3194 |
pthread_mutex_lock(log_lock); |
|
3195 |
||
3196 |
if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) && |
|
3197 |
!mi->rli.replicate_same_server_id) |
|
3198 |
{
|
|
3199 |
/*
|
|
3200 |
Do not write it to the relay log.
|
|
3201 |
a) We still want to increment mi->master_log_pos, so that we won't
|
|
3202 |
re-read this event from the master if the slave IO thread is now
|
|
3203 |
stopped/restarted (more efficient if the events we are ignoring are big
|
|
3204 |
LOAD DATA INFILE).
|
|
3205 |
b) We want to record that we are skipping events, for the information of
|
|
3206 |
the slave SQL thread, otherwise that thread may let
|
|
3207 |
rli->group_relay_log_pos stay too small if the last binlog's event is
|
|
3208 |
ignored.
|
|
3209 |
But events which were generated by this slave and which do not exist in
|
|
3210 |
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
|
|
3211 |
mi->master_log_pos.
|
|
3212 |
*/
|
|
3213 |
if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT && |
|
3214 |
buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT && |
|
3215 |
buf[EVENT_TYPE_OFFSET]!=STOP_EVENT) |
|
3216 |
{
|
|
3217 |
mi->master_log_pos+= inc_pos; |
|
3218 |
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); |
|
3219 |
DBUG_ASSERT(rli->ign_master_log_name_end[0]); |
|
3220 |
rli->ign_master_log_pos_end= mi->master_log_pos; |
|
3221 |
}
|
|
3222 |
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check |
|
3223 |
DBUG_PRINT("info", ("master_log_pos: %lu, event originating from the same server, ignored", |
|
3224 |
(ulong) mi->master_log_pos)); |
|
3225 |
}
|
|
3226 |
else
|
|
3227 |
{
|
|
3228 |
/* write the event to the relay log */
|
|
3229 |
if (likely(!(rli->relay_log.appendv(buf,event_len,0)))) |
|
3230 |
{
|
|
3231 |
mi->master_log_pos+= inc_pos; |
|
3232 |
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); |
|
3233 |
rli->relay_log.harvest_bytes_written(&rli->log_space_total); |
|
3234 |
}
|
|
3235 |
else
|
|
3236 |
{
|
|
3237 |
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; |
|
3238 |
}
|
|
3239 |
rli->ign_master_log_name_end[0]= 0; // last event is not ignored |
|
3240 |
}
|
|
3241 |
pthread_mutex_unlock(log_lock); |
|
3242 |
||
3243 |
skip_relay_logging: |
|
3244 |
||
3245 |
err: |
|
3246 |
pthread_mutex_unlock(&mi->data_lock); |
|
3247 |
DBUG_PRINT("info", ("error: %d", error)); |
|
3248 |
if (error) |
|
3249 |
mi->report(ERROR_LEVEL, error, ER(error), |
|
3250 |
(error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)? |
|
3251 |
"could not queue event from master" : |
|
3252 |
error_msg.ptr()); |
|
3253 |
DBUG_RETURN(error); |
|
3254 |
}
|
|
3255 |
||
3256 |
||
3257 |
void end_relay_log_info(Relay_log_info* rli) |
|
3258 |
{
|
|
3259 |
DBUG_ENTER("end_relay_log_info"); |
|
3260 |
||
3261 |
if (!rli->inited) |
|
3262 |
DBUG_VOID_RETURN; |
|
3263 |
if (rli->info_fd >= 0) |
|
3264 |
{
|
|
3265 |
end_io_cache(&rli->info_file); |
|
3266 |
(void) my_close(rli->info_fd, MYF(MY_WME)); |
|
3267 |
rli->info_fd = -1; |
|
3268 |
}
|
|
3269 |
if (rli->cur_log_fd >= 0) |
|
3270 |
{
|
|
3271 |
end_io_cache(&rli->cache_buf); |
|
3272 |
(void)my_close(rli->cur_log_fd, MYF(MY_WME)); |
|
3273 |
rli->cur_log_fd = -1; |
|
3274 |
}
|
|
3275 |
rli->inited = 0; |
|
3276 |
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); |
|
3277 |
rli->relay_log.harvest_bytes_written(&rli->log_space_total); |
|
3278 |
/*
|
|
3279 |
Delete the slave's temporary tables from memory.
|
|
3280 |
In the future there will be other actions than this, to ensure persistance
|
|
3281 |
of slave's temp tables after shutdown.
|
|
3282 |
*/
|
|
3283 |
rli->close_temporary_tables(); |
|
3284 |
DBUG_VOID_RETURN; |
|
3285 |
}
|
|
3286 |
||
3287 |
/*
|
|
3288 |
Try to connect until successful or slave killed
|
|
3289 |
||
3290 |
SYNPOSIS
|
|
3291 |
safe_connect()
|
|
3292 |
thd Thread handler for slave
|
|
3293 |
mysql MySQL connection handle
|
|
3294 |
mi Replication handle
|
|
3295 |
||
3296 |
RETURN
|
|
3297 |
0 ok
|
|
3298 |
# Error
|
|
3299 |
*/
|
|
3300 |
||
3301 |
static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi) |
|
3302 |
{
|
|
3303 |
DBUG_ENTER("safe_connect"); |
|
3304 |
||
3305 |
DBUG_RETURN(connect_to_master(thd, mysql, mi, 0, 0)); |
|
3306 |
}
|
|
3307 |
||
3308 |
||
3309 |
/*
|
|
3310 |
SYNPOSIS
|
|
3311 |
connect_to_master()
|
|
3312 |
||
3313 |
IMPLEMENTATION
|
|
3314 |
Try to connect until successful or slave killed or we have retried
|
|
3315 |
master_retry_count times
|
|
3316 |
*/
|
|
3317 |
||
3318 |
static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, |
|
3319 |
bool reconnect, bool suppress_warnings) |
|
3320 |
{
|
|
3321 |
int slave_was_killed; |
|
3322 |
int last_errno= -2; // impossible error |
|
3323 |
ulong err_count=0; |
|
3324 |
char llbuff[22]; |
|
3325 |
DBUG_ENTER("connect_to_master"); |
|
3326 |
||
3327 |
#ifndef DBUG_OFF
|
|
3328 |
mi->events_till_disconnect = disconnect_slave_event_count; |
|
3329 |
#endif
|
|
3330 |
ulong client_flag= CLIENT_REMEMBER_OPTIONS; |
|
3331 |
if (opt_slave_compressed_protocol) |
|
3332 |
client_flag=CLIENT_COMPRESS; /* We will use compression */ |
|
3333 |
||
3334 |
mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout); |
|
3335 |
mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout); |
|
3336 |
||
3337 |
mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname); |
|
3338 |
/* This one is not strictly needed but we have it here for completeness */
|
|
3339 |
mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir); |
|
3340 |
||
3341 |
while (!(slave_was_killed = io_slave_killed(thd,mi)) && |
|
3342 |
(reconnect ? mysql_reconnect(mysql) != 0 : |
|
3343 |
mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0, |
|
3344 |
mi->port, 0, client_flag) == 0)) |
|
3345 |
{
|
|
3346 |
/* Don't repeat last error */
|
|
3347 |
if ((int)mysql_errno(mysql) != last_errno) |
|
3348 |
{
|
|
3349 |
last_errno=mysql_errno(mysql); |
|
3350 |
suppress_warnings= 0; |
|
3351 |
mi->report(ERROR_LEVEL, last_errno, |
|
3352 |
"error %s to master '%s@%s:%d'"
|
|
3353 |
" - retry-time: %d retries: %lu", |
|
3354 |
(reconnect ? "reconnecting" : "connecting"), |
|
3355 |
mi->user, mi->host, mi->port, |
|
3356 |
mi->connect_retry, master_retry_count); |
|
3357 |
}
|
|
3358 |
/*
|
|
3359 |
By default we try forever. The reason is that failure will trigger
|
|
3360 |
master election, so if the user did not set master_retry_count we
|
|
3361 |
do not want to have election triggered on the first failure to
|
|
3362 |
connect
|
|
3363 |
*/
|
|
3364 |
if (++err_count == master_retry_count) |
|
3365 |
{
|
|
3366 |
slave_was_killed=1; |
|
3367 |
if (reconnect) |
|
3368 |
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER); |
|
3369 |
break; |
|
3370 |
}
|
|
3371 |
safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed, |
|
3372 |
(void*)mi); |
|
3373 |
}
|
|
3374 |
||
3375 |
if (!slave_was_killed) |
|
3376 |
{
|
|
3377 |
if (reconnect) |
|
3378 |
{
|
|
3379 |
if (!suppress_warnings && global_system_variables.log_warnings) |
|
3380 |
sql_print_information("Slave: connected to master '%s@%s:%d',\ |
|
3381 |
replication resumed in log '%s' at position %s", mi->user, |
|
3382 |
mi->host, mi->port, |
|
3383 |
IO_RPL_LOG_NAME, |
|
3384 |
llstr(mi->master_log_pos,llbuff)); |
|
3385 |
}
|
|
3386 |
else
|
|
3387 |
{
|
|
3388 |
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE); |
|
3389 |
general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d", |
|
3390 |
mi->user, mi->host, mi->port); |
|
3391 |
}
|
|
3392 |
#ifdef SIGNAL_WITH_VIO_CLOSE
|
|
3393 |
thd->set_active_vio(mysql->net.vio); |
|
3394 |
#endif
|
|
3395 |
}
|
|
3396 |
mysql->reconnect= 1; |
|
3397 |
DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed)); |
|
3398 |
DBUG_RETURN(slave_was_killed); |
|
3399 |
}
|
|
3400 |
||
3401 |
||
3402 |
/*
|
|
3403 |
safe_reconnect()
|
|
3404 |
||
3405 |
IMPLEMENTATION
|
|
3406 |
Try to connect until successful or slave killed or we have retried
|
|
3407 |
master_retry_count times
|
|
3408 |
*/
|
|
3409 |
||
3410 |
static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, |
|
3411 |
bool suppress_warnings) |
|
3412 |
{
|
|
3413 |
DBUG_ENTER("safe_reconnect"); |
|
3414 |
DBUG_RETURN(connect_to_master(thd, mysql, mi, 1, suppress_warnings)); |
|
3415 |
}
|
|
3416 |
||
3417 |
||
3418 |
/*
|
|
3419 |
Store the file and position where the execute-slave thread are in the
|
|
3420 |
relay log.
|
|
3421 |
||
3422 |
SYNOPSIS
|
|
3423 |
flush_relay_log_info()
|
|
3424 |
rli Relay log information
|
|
3425 |
||
3426 |
NOTES
|
|
3427 |
- As this is only called by the slave thread, we don't need to
|
|
3428 |
have a lock on this.
|
|
3429 |
- If there is an active transaction, then we don't update the position
|
|
3430 |
in the relay log. This is to ensure that we re-execute statements
|
|
3431 |
if we die in the middle of an transaction that was rolled back.
|
|
3432 |
- As a transaction never spans binary logs, we don't have to handle the
|
|
3433 |
case where we do a relay-log-rotation in the middle of the transaction.
|
|
3434 |
If this would not be the case, we would have to ensure that we
|
|
3435 |
don't delete the relay log file where the transaction started when
|
|
3436 |
we switch to a new relay log file.
|
|
3437 |
||
3438 |
TODO
|
|
3439 |
- Change the log file information to a binary format to avoid calling
|
|
3440 |
longlong2str.
|
|
3441 |
||
3442 |
RETURN VALUES
|
|
3443 |
0 ok
|
|
3444 |
1 write error
|
|
3445 |
*/
|
|
3446 |
||
3447 |
bool flush_relay_log_info(Relay_log_info* rli) |
|
3448 |
{
|
|
3449 |
bool error=0; |
|
3450 |
DBUG_ENTER("flush_relay_log_info"); |
|
3451 |
||
3452 |
if (unlikely(rli->no_storage)) |
|
3453 |
DBUG_RETURN(0); |
|
3454 |
||
3455 |
IO_CACHE *file = &rli->info_file; |
|
3456 |
char buff[FN_REFLEN*2+22*2+4], *pos; |
|
3457 |
||
3458 |
my_b_seek(file, 0L); |
|
3459 |
pos=strmov(buff, rli->group_relay_log_name); |
|
3460 |
*pos++='\n'; |
|
3461 |
pos=longlong2str(rli->group_relay_log_pos, pos, 10); |
|
3462 |
*pos++='\n'; |
|
3463 |
pos=strmov(pos, rli->group_master_log_name); |
|
3464 |
*pos++='\n'; |
|
3465 |
pos=longlong2str(rli->group_master_log_pos, pos, 10); |
|
3466 |
*pos='\n'; |
|
3467 |
if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1)) |
|
3468 |
error=1; |
|
3469 |
if (flush_io_cache(file)) |
|
3470 |
error=1; |
|
3471 |
||
3472 |
/* Flushing the relay log is done by the slave I/O thread */
|
|
3473 |
DBUG_RETURN(error); |
|
3474 |
}
|
|
3475 |
||
3476 |
||
3477 |
/*
|
|
3478 |
Called when we notice that the current "hot" log got rotated under our feet.
|
|
3479 |
*/
|
|
3480 |
||
3481 |
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg) |
|
3482 |
{
|
|
3483 |
DBUG_ENTER("reopen_relay_log"); |
|
3484 |
DBUG_ASSERT(rli->cur_log != &rli->cache_buf); |
|
3485 |
DBUG_ASSERT(rli->cur_log_fd == -1); |
|
3486 |
||
3487 |
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf; |
|
3488 |
if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name, |
|
3489 |
errmsg)) <0) |
|
3490 |
DBUG_RETURN(0); |
|
3491 |
/*
|
|
3492 |
We want to start exactly where we was before:
|
|
3493 |
relay_log_pos Current log pos
|
|
3494 |
pending Number of bytes already processed from the event
|
|
3495 |
*/
|
|
3496 |
rli->event_relay_log_pos= max(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE); |
|
3497 |
my_b_seek(cur_log,rli->event_relay_log_pos); |
|
3498 |
DBUG_RETURN(cur_log); |
|
3499 |
}
|
|
3500 |
||
3501 |
||
3502 |
static Log_event* next_event(Relay_log_info* rli) |
|
3503 |
{
|
|
3504 |
Log_event* ev; |
|
3505 |
IO_CACHE* cur_log = rli->cur_log; |
|
3506 |
pthread_mutex_t *log_lock = rli->relay_log.get_log_lock(); |
|
3507 |
const char* errmsg=0; |
|
3508 |
THD* thd = rli->sql_thd; |
|
3509 |
DBUG_ENTER("next_event"); |
|
3510 |
||
3511 |
DBUG_ASSERT(thd != 0); |
|
3512 |
||
3513 |
#ifndef DBUG_OFF
|
|
3514 |
if (abort_slave_event_count && !rli->events_till_abort--) |
|
3515 |
DBUG_RETURN(0); |
|
3516 |
#endif
|
|
3517 |
||
3518 |
/*
|
|
3519 |
For most operations we need to protect rli members with data_lock,
|
|
3520 |
so we assume calling function acquired this mutex for us and we will
|
|
3521 |
hold it for the most of the loop below However, we will release it
|
|
3522 |
whenever it is worth the hassle, and in the cases when we go into a
|
|
3523 |
pthread_cond_wait() with the non-data_lock mutex
|
|
3524 |
*/
|
|
3525 |
safe_mutex_assert_owner(&rli->data_lock); |
|
3526 |
||
3527 |
while (!sql_slave_killed(thd,rli)) |
|
3528 |
{
|
|
3529 |
/*
|
|
3530 |
We can have two kinds of log reading:
|
|
3531 |
hot_log:
|
|
3532 |
rli->cur_log points at the IO_CACHE of relay_log, which
|
|
3533 |
is actively being updated by the I/O thread. We need to be careful
|
|
3534 |
in this case and make sure that we are not looking at a stale log that
|
|
3535 |
has already been rotated. If it has been, we reopen the log.
|
|
3536 |
||
3537 |
The other case is much simpler:
|
|
3538 |
We just have a read only log that nobody else will be updating.
|
|
3539 |
*/
|
|
3540 |
bool hot_log; |
|
3541 |
if ((hot_log = (cur_log != &rli->cache_buf))) |
|
3542 |
{
|
|
3543 |
DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor |
|
3544 |
pthread_mutex_lock(log_lock); |
|
3545 |
||
3546 |
/*
|
|
3547 |
Reading xxx_file_id is safe because the log will only
|
|
3548 |
be rotated when we hold relay_log.LOCK_log
|
|
3549 |
*/
|
|
3550 |
if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count) |
|
3551 |
{
|
|
3552 |
// The master has switched to a new log file; Reopen the old log file
|
|
3553 |
cur_log=reopen_relay_log(rli, &errmsg); |
|
3554 |
pthread_mutex_unlock(log_lock); |
|
3555 |
if (!cur_log) // No more log files |
|
3556 |
goto err; |
|
3557 |
hot_log=0; // Using old binary log |
|
3558 |
}
|
|
3559 |
}
|
|
3560 |
/*
|
|
3561 |
As there is no guarantee that the relay is open (for example, an I/O
|
|
3562 |
error during a write by the slave I/O thread may have closed it), we
|
|
3563 |
have to test it.
|
|
3564 |
*/
|
|
3565 |
if (!my_b_inited(cur_log)) |
|
3566 |
goto err; |
|
3567 |
#ifndef DBUG_OFF
|
|
3568 |
{
|
|
3569 |
/* This is an assertion which sometimes fails, let's try to track it */
|
|
3570 |
char llbuf1[22], llbuf2[22]; |
|
3571 |
DBUG_PRINT("info", ("my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s", |
|
3572 |
llstr(my_b_tell(cur_log),llbuf1), |
|
3573 |
llstr(rli->event_relay_log_pos,llbuf2))); |
|
3574 |
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); |
|
3575 |
DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos); |
|
3576 |
}
|
|
3577 |
#endif
|
|
3578 |
/*
|
|
3579 |
Relay log is always in new format - if the master is 3.23, the
|
|
3580 |
I/O thread will convert the format for us.
|
|
3581 |
A problem: the description event may be in a previous relay log. So if
|
|
3582 |
the slave has been shutdown meanwhile, we would have to look in old relay
|
|
3583 |
logs, which may even have been deleted. So we need to write this
|
|
3584 |
description event at the beginning of the relay log.
|
|
3585 |
When the relay log is created when the I/O thread starts, easy: the
|
|
3586 |
master will send the description event and we will queue it.
|
|
3587 |
But if the relay log is created by new_file(): then the solution is:
|
|
3588 |
MYSQL_BIN_LOG::open() will write the buffered description event.
|
|
3589 |
*/
|
|
3590 |
if ((ev=Log_event::read_log_event(cur_log,0, |
|
3591 |
rli->relay_log.description_event_for_exec))) |
|
3592 |
||
3593 |
{
|
|
3594 |
DBUG_ASSERT(thd==rli->sql_thd); |
|
3595 |
/*
|
|
3596 |
read it while we have a lock, to avoid a mutex lock in
|
|
3597 |
inc_event_relay_log_pos()
|
|
3598 |
*/
|
|
3599 |
rli->future_event_relay_log_pos= my_b_tell(cur_log); |
|
3600 |
if (hot_log) |
|
3601 |
pthread_mutex_unlock(log_lock); |
|
3602 |
DBUG_RETURN(ev); |
|
3603 |
}
|
|
3604 |
DBUG_ASSERT(thd==rli->sql_thd); |
|
3605 |
if (opt_reckless_slave) // For mysql-test |
|
3606 |
cur_log->error = 0; |
|
3607 |
if (cur_log->error < 0) |
|
3608 |
{
|
|
3609 |
errmsg = "slave SQL thread aborted because of I/O error"; |
|
3610 |
if (hot_log) |
|
3611 |
pthread_mutex_unlock(log_lock); |
|
3612 |
goto err; |
|
3613 |
}
|
|
3614 |
if (!cur_log->error) /* EOF */ |
|
3615 |
{
|
|
3616 |
/*
|
|
3617 |
On a hot log, EOF means that there are no more updates to
|
|
3618 |
process and we must block until I/O thread adds some and
|
|
3619 |
signals us to continue
|
|
3620 |
*/
|
|
3621 |
if (hot_log) |
|
3622 |
{
|
|
3623 |
/*
|
|
3624 |
We say in Seconds_Behind_Master that we have "caught up". Note that
|
|
3625 |
for example if network link is broken but I/O slave thread hasn't
|
|
3626 |
noticed it (slave_net_timeout not elapsed), then we'll say "caught
|
|
3627 |
up" whereas we're not really caught up. Fixing that would require
|
|
3628 |
internally cutting timeout in smaller pieces in network read, no
|
|
3629 |
thanks. Another example: SQL has caught up on I/O, now I/O has read
|
|
3630 |
a new event and is queuing it; the false "0" will exist until SQL
|
|
3631 |
finishes executing the new event; it will be look abnormal only if
|
|
3632 |
the events have old timestamps (then you get "many", 0, "many").
|
|
3633 |
||
3634 |
Transient phases like this can be fixed with implemeting
|
|
3635 |
Heartbeat event which provides the slave the status of the
|
|
3636 |
master at time the master does not have any new update to send.
|
|
3637 |
Seconds_Behind_Master would be zero only when master has no
|
|
3638 |
more updates in binlog for slave. The heartbeat can be sent
|
|
3639 |
in a (small) fraction of slave_net_timeout. Until it's done
|
|
3640 |
rli->last_master_timestamp is temporarely (for time of
|
|
3641 |
waiting for the following event) reset whenever EOF is
|
|
3642 |
reached.
|
|
3643 |
*/
|
|
3644 |
time_t save_timestamp= rli->last_master_timestamp; |
|
3645 |
rli->last_master_timestamp= 0; |
|
3646 |
||
3647 |
DBUG_ASSERT(rli->relay_log.get_open_count() == |
|
3648 |
rli->cur_log_old_open_count); |
|
3649 |
||
3650 |
if (rli->ign_master_log_name_end[0]) |
|
3651 |
{
|
|
3652 |
/* We generate and return a Rotate, to make our positions advance */
|
|
3653 |
DBUG_PRINT("info",("seeing an ignored end segment")); |
|
3654 |
ev= new Rotate_log_event(rli->ign_master_log_name_end, |
|
3655 |
0, rli->ign_master_log_pos_end, |
|
3656 |
Rotate_log_event::DUP_NAME); |
|
3657 |
rli->ign_master_log_name_end[0]= 0; |
|
3658 |
pthread_mutex_unlock(log_lock); |
|
3659 |
if (unlikely(!ev)) |
|
3660 |
{
|
|
3661 |
errmsg= "Slave SQL thread failed to create a Rotate event " |
|
3662 |
"(out of memory?), SHOW SLAVE STATUS may be inaccurate"; |
|
3663 |
goto err; |
|
3664 |
}
|
|
3665 |
ev->server_id= 0; // don't be ignored by slave SQL thread |
|
3666 |
DBUG_RETURN(ev); |
|
3667 |
}
|
|
3668 |
||
3669 |
/*
|
|
3670 |
We can, and should release data_lock while we are waiting for
|
|
3671 |
update. If we do not, show slave status will block
|
|
3672 |
*/
|
|
3673 |
pthread_mutex_unlock(&rli->data_lock); |
|
3674 |
||
3675 |
/*
|
|
3676 |
Possible deadlock :
|
|
3677 |
- the I/O thread has reached log_space_limit
|
|
3678 |
- the SQL thread has read all relay logs, but cannot purge for some
|
|
3679 |
reason:
|
|
3680 |
* it has already purged all logs except the current one
|
|
3681 |
* there are other logs than the current one but they're involved in
|
|
3682 |
a transaction that finishes in the current one (or is not finished)
|
|
3683 |
Solution :
|
|
3684 |
Wake up the possibly waiting I/O thread, and set a boolean asking
|
|
3685 |
the I/O thread to temporarily ignore the log_space_limit
|
|
3686 |
constraint, because we do not want the I/O thread to block because of
|
|
3687 |
space (it's ok if it blocks for any other reason (e.g. because the
|
|
3688 |
master does not send anything). Then the I/O thread stops waiting
|
|
3689 |
and reads more events.
|
|
3690 |
The SQL thread decides when the I/O thread should take log_space_limit
|
|
3691 |
into account again : ignore_log_space_limit is reset to 0
|
|
3692 |
in purge_first_log (when the SQL thread purges the just-read relay
|
|
3693 |
log), and also when the SQL thread starts. We should also reset
|
|
3694 |
ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
|
|
3695 |
fact, no need as RESET SLAVE requires that the slave
|
|
3696 |
be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
|
|
3697 |
it stops.
|
|
3698 |
*/
|
|
3699 |
pthread_mutex_lock(&rli->log_space_lock); |
|
3700 |
// prevent the I/O thread from blocking next times
|
|
3701 |
rli->ignore_log_space_limit= 1; |
|
3702 |
/*
|
|
3703 |
If the I/O thread is blocked, unblock it. Ok to broadcast
|
|
3704 |
after unlock, because the mutex is only destroyed in
|
|
3705 |
~Relay_log_info(), i.e. when rli is destroyed, and rli will
|
|
3706 |
not be destroyed before we exit the present function.
|
|
3707 |
*/
|
|
3708 |
pthread_mutex_unlock(&rli->log_space_lock); |
|
3709 |
pthread_cond_broadcast(&rli->log_space_cond); |
|
3710 |
// Note that wait_for_update_relay_log unlocks lock_log !
|
|
3711 |
rli->relay_log.wait_for_update_relay_log(rli->sql_thd); |
|
3712 |
// re-acquire data lock since we released it earlier
|
|
3713 |
pthread_mutex_lock(&rli->data_lock); |
|
3714 |
rli->last_master_timestamp= save_timestamp; |
|
3715 |
continue; |
|
3716 |
}
|
|
3717 |
/*
|
|
3718 |
If the log was not hot, we need to move to the next log in
|
|
3719 |
sequence. The next log could be hot or cold, we deal with both
|
|
3720 |
cases separately after doing some common initialization
|
|
3721 |
*/
|
|
3722 |
end_io_cache(cur_log); |
|
3723 |
DBUG_ASSERT(rli->cur_log_fd >= 0); |
|
3724 |
my_close(rli->cur_log_fd, MYF(MY_WME)); |
|
3725 |
rli->cur_log_fd = -1; |
|
3726 |
||
3727 |
if (relay_log_purge) |
|
3728 |
{
|
|
3729 |
/*
|
|
3730 |
purge_first_log will properly set up relay log coordinates in rli.
|
|
3731 |
If the group's coordinates are equal to the event's coordinates
|
|
3732 |
(i.e. the relay log was not rotated in the middle of a group),
|
|
3733 |
we can purge this relay log too.
|
|
3734 |
We do ulonglong and string comparisons, this may be slow but
|
|
3735 |
- purging the last relay log is nice (it can save 1GB of disk), so we
|
|
3736 |
like to detect the case where we can do it, and given this,
|
|
3737 |
- I see no better detection method
|
|
3738 |
- purge_first_log is not called that often
|
|
3739 |
*/
|
|
3740 |
if (rli->relay_log.purge_first_log |
|
3741 |
(rli, |
|
3742 |
rli->group_relay_log_pos == rli->event_relay_log_pos |
|
3743 |
&& !strcmp(rli->group_relay_log_name,rli->event_relay_log_name))) |
|
3744 |
{
|
|
3745 |
errmsg = "Error purging processed logs"; |
|
3746 |
goto err; |
|
3747 |
}
|
|
3748 |
}
|
|
3749 |
else
|
|
3750 |
{
|
|
3751 |
/*
|
|
3752 |
If hot_log is set, then we already have a lock on
|
|
3753 |
LOCK_log. If not, we have to get the lock.
|
|
3754 |
||
3755 |
According to Sasha, the only time this code will ever be executed
|
|
3756 |
is if we are recovering from a bug.
|
|
3757 |
*/
|
|
3758 |
if (rli->relay_log.find_next_log(&rli->linfo, !hot_log)) |
|
3759 |
{
|
|
3760 |
errmsg = "error switching to the next log"; |
|
3761 |
goto err; |
|
3762 |
}
|
|
3763 |
rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE; |
|
3764 |
strmake(rli->event_relay_log_name,rli->linfo.log_file_name, |
|
3765 |
sizeof(rli->event_relay_log_name)-1); |
|
3766 |
flush_relay_log_info(rli); |
|
3767 |
}
|
|
3768 |
||
3769 |
/*
|
|
3770 |
Now we want to open this next log. To know if it's a hot log (the one
|
|
3771 |
being written by the I/O thread now) or a cold log, we can use
|
|
3772 |
is_active(); if it is hot, we use the I/O cache; if it's cold we open
|
|
3773 |
the file normally. But if is_active() reports that the log is hot, this
|
|
3774 |
may change between the test and the consequence of the test. So we may
|
|
3775 |
open the I/O cache whereas the log is now cold, which is nonsense.
|
|
3776 |
To guard against this, we need to have LOCK_log.
|
|
3777 |
*/
|
|
3778 |
||
3779 |
DBUG_PRINT("info",("hot_log: %d",hot_log)); |
|
3780 |
if (!hot_log) /* if hot_log, we already have this mutex */ |
|
3781 |
pthread_mutex_lock(log_lock); |
|
3782 |
if (rli->relay_log.is_active(rli->linfo.log_file_name)) |
|
3783 |
{
|
|
3784 |
#ifdef EXTRA_DEBUG
|
|
3785 |
if (global_system_variables.log_warnings) |
|
3786 |
sql_print_information("next log '%s' is currently active", |
|
3787 |
rli->linfo.log_file_name); |
|
3788 |
#endif
|
|
3789 |
rli->cur_log= cur_log= rli->relay_log.get_log_file(); |
|
3790 |
rli->cur_log_old_open_count= rli->relay_log.get_open_count(); |
|
3791 |
DBUG_ASSERT(rli->cur_log_fd == -1); |
|
3792 |
||
3793 |
/*
|
|
3794 |
Read pointer has to be at the start since we are the only
|
|
3795 |
reader.
|
|
3796 |
We must keep the LOCK_log to read the 4 first bytes, as this is a hot
|
|
3797 |
log (same as when we call read_log_event() above: for a hot log we
|
|
3798 |
take the mutex).
|
|
3799 |
*/
|
|
3800 |
if (check_binlog_magic(cur_log,&errmsg)) |
|
3801 |
{
|
|
3802 |
if (!hot_log) pthread_mutex_unlock(log_lock); |
|
3803 |
goto err; |
|
3804 |
}
|
|
3805 |
if (!hot_log) pthread_mutex_unlock(log_lock); |
|
3806 |
continue; |
|
3807 |
}
|
|
3808 |
if (!hot_log) pthread_mutex_unlock(log_lock); |
|
3809 |
/*
|
|
3810 |
if we get here, the log was not hot, so we will have to open it
|
|
3811 |
ourselves. We are sure that the log is still not hot now (a log can get
|
|
3812 |
from hot to cold, but not from cold to hot). No need for LOCK_log.
|
|
3813 |
*/
|
|
3814 |
#ifdef EXTRA_DEBUG
|
|
3815 |
if (global_system_variables.log_warnings) |
|
3816 |
sql_print_information("next log '%s' is not active", |
|
3817 |
rli->linfo.log_file_name); |
|
3818 |
#endif
|
|
3819 |
// open_binlog() will check the magic header
|
|
3820 |
if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name, |
|
3821 |
&errmsg)) <0) |
|
3822 |
goto err; |
|
3823 |
}
|
|
3824 |
else
|
|
3825 |
{
|
|
3826 |
/*
|
|
3827 |
Read failed with a non-EOF error.
|
|
3828 |
TODO: come up with something better to handle this error
|
|
3829 |
*/
|
|
3830 |
if (hot_log) |
|
3831 |
pthread_mutex_unlock(log_lock); |
|
3832 |
sql_print_error("Slave SQL thread: I/O error reading \ |
|
3833 |
event(errno: %d cur_log->error: %d)", |
|
3834 |
my_errno,cur_log->error); |
|
3835 |
// set read position to the beginning of the event
|
|
3836 |
my_b_seek(cur_log,rli->event_relay_log_pos); |
|
3837 |
/* otherwise, we have had a partial read */
|
|
3838 |
errmsg = "Aborting slave SQL thread because of partial event read"; |
|
3839 |
break; // To end of function |
|
3840 |
}
|
|
3841 |
}
|
|
3842 |
if (!errmsg && global_system_variables.log_warnings) |
|
3843 |
{
|
|
3844 |
sql_print_information("Error reading relay log event: %s", |
|
3845 |
"slave SQL thread was killed"); |
|
3846 |
DBUG_RETURN(0); |
|
3847 |
}
|
|
3848 |
||
3849 |
err: |
|
3850 |
if (errmsg) |
|
3851 |
sql_print_error("Error reading relay log event: %s", errmsg); |
|
3852 |
DBUG_RETURN(0); |
|
3853 |
}
|
|
3854 |
||
3855 |
/*
|
|
3856 |
Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
|
|
3857 |
because of size is simpler because when we do it we already have all relevant
|
|
3858 |
locks; here we don't, so this function is mainly taking locks).
|
|
3859 |
Returns nothing as we cannot catch any error (MYSQL_BIN_LOG::new_file()
|
|
3860 |
is void).
|
|
3861 |
*/
|
|
3862 |
||
3863 |
void rotate_relay_log(Master_info* mi) |
|
3864 |
{
|
|
3865 |
DBUG_ENTER("rotate_relay_log"); |
|
3866 |
Relay_log_info* rli= &mi->rli; |
|
3867 |
||
3868 |
/* We don't lock rli->run_lock. This would lead to deadlocks. */
|
|
3869 |
pthread_mutex_lock(&mi->run_lock); |
|
3870 |
||
3871 |
/*
|
|
3872 |
We need to test inited because otherwise, new_file() will attempt to lock
|
|
3873 |
LOCK_log, which may not be inited (if we're not a slave).
|
|
3874 |
*/
|
|
3875 |
if (!rli->inited) |
|
3876 |
{
|
|
3877 |
DBUG_PRINT("info", ("rli->inited == 0")); |
|
3878 |
goto end; |
|
3879 |
}
|
|
3880 |
||
3881 |
/* If the relay log is closed, new_file() will do nothing. */
|
|
3882 |
rli->relay_log.new_file(); |
|
3883 |
||
3884 |
/*
|
|
3885 |
We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
|
|
3886 |
be counted, so imagine a succession of FLUSH LOGS and assume the slave
|
|
3887 |
threads are started:
|
|
3888 |
relay_log_space decreases by the size of the deleted relay log, but does
|
|
3889 |
not increase, so flush-after-flush we may become negative, which is wrong.
|
|
3890 |
Even if this will be corrected as soon as a query is replicated on the
|
|
3891 |
slave (because the I/O thread will then call harvest_bytes_written() which
|
|
3892 |
will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
|
|
3893 |
output in SHOW SLAVE STATUS meanwhile. So we harvest now.
|
|
3894 |
If the log is closed, then this will just harvest the last writes, probably
|
|
3895 |
0 as they probably have been harvested.
|
|
3896 |
*/
|
|
3897 |
rli->relay_log.harvest_bytes_written(&rli->log_space_total); |
|
3898 |
end: |
|
3899 |
pthread_mutex_unlock(&mi->run_lock); |
|
3900 |
DBUG_VOID_RETURN; |
|
3901 |
}
|
|
3902 |
||
3903 |
||
3904 |
/**
|
|
3905 |
Detects, based on master's version (as found in the relay log), if master
|
|
3906 |
has a certain bug.
|
|
3907 |
@param rli Relay_log_info which tells the master's version
|
|
3908 |
@param bug_id Number of the bug as found in bugs.mysql.com
|
|
3909 |
@param report bool report error message, default TRUE
|
|
3910 |
@return TRUE if master has the bug, FALSE if it does not.
|
|
3911 |
*/
|
|
3912 |
bool rpl_master_has_bug(Relay_log_info *rli, uint bug_id, bool report) |
|
3913 |
{
|
|
3914 |
struct st_version_range_for_one_bug { |
|
3915 |
uint bug_id; |
|
3916 |
const uchar introduced_in[3]; // first version with bug |
|
3917 |
const uchar fixed_in[3]; // first version with fix |
|
3918 |
};
|
|
3919 |
static struct st_version_range_for_one_bug versions_for_all_bugs[]= |
|
3920 |
{
|
|
3921 |
{24432, { 5, 0, 24 }, { 5, 0, 38 } }, |
|
3922 |
{24432, { 5, 1, 12 }, { 5, 1, 17 } }, |
|
3923 |
{33029, { 5, 0, 0 }, { 5, 0, 58 } }, |
|
3924 |
{33029, { 5, 1, 0 }, { 5, 1, 12 } }, |
|
3925 |
};
|
|
3926 |
const uchar *master_ver= |
|
3927 |
rli->relay_log.description_event_for_exec->server_version_split; |
|
3928 |
||
3929 |
DBUG_ASSERT(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3); |
|
3930 |
||
3931 |
for (uint i= 0; |
|
3932 |
i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++) |
|
3933 |
{
|
|
3934 |
const uchar *introduced_in= versions_for_all_bugs[i].introduced_in, |
|
3935 |
*fixed_in= versions_for_all_bugs[i].fixed_in; |
|
3936 |
if ((versions_for_all_bugs[i].bug_id == bug_id) && |
|
3937 |
(memcmp(introduced_in, master_ver, 3) <= 0) && |
|
3938 |
(memcmp(fixed_in, master_ver, 3) > 0)) |
|
3939 |
{
|
|
3940 |
if (!report) |
|
3941 |
return TRUE; |
|
3942 |
||
3943 |
// a short message for SHOW SLAVE STATUS (message length constraints)
|
|
3944 |
my_printf_error(ER_UNKNOWN_ERROR, "master may suffer from" |
|
3945 |
" http://bugs.mysql.com/bug.php?id=%u"
|
|
3946 |
" so slave stops; check error log on slave"
|
|
3947 |
" for more info", MYF(0), bug_id); |
|
3948 |
// a verbose message for the error log
|
|
3949 |
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, |
|
3950 |
"According to the master's version ('%s'),"
|
|
3951 |
" it is probable that master suffers from this bug:"
|
|
3952 |
" http://bugs.mysql.com/bug.php?id=%u"
|
|
3953 |
" and thus replicating the current binary log event"
|
|
3954 |
" may make the slave's data become different from the"
|
|
3955 |
" master's data."
|
|
3956 |
" To take no risk, slave refuses to replicate"
|
|
3957 |
" this event and stops."
|
|
3958 |
" We recommend that all updates be stopped on the"
|
|
3959 |
" master and slave, that the data of both be"
|
|
3960 |
" manually synchronized,"
|
|
3961 |
" that master's binary logs be deleted,"
|
|
3962 |
" that master be upgraded to a version at least"
|
|
3963 |
" equal to '%d.%d.%d'. Then replication can be"
|
|
3964 |
" restarted.", |
|
3965 |
rli->relay_log.description_event_for_exec->server_version, |
|
3966 |
bug_id, |
|
3967 |
fixed_in[0], fixed_in[1], fixed_in[2]); |
|
3968 |
return TRUE; |
|
3969 |
}
|
|
3970 |
}
|
|
3971 |
return FALSE; |
|
3972 |
}
|
|
3973 |
||
3974 |
/**
|
|
3975 |
BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
|
|
3976 |
exclusive, if one statement in a SP generated AUTO_INCREMENT value
|
|
3977 |
by the top statement, all statements after it would be considered
|
|
3978 |
generated AUTO_INCREMENT value by the top statement, and a
|
|
3979 |
erroneous INSERT_ID value might be associated with these statement,
|
|
3980 |
which could cause duplicate entry error and stop the slave.
|
|
3981 |
||
3982 |
Detect buggy master to work around.
|
|
3983 |
*/
|
|
3984 |
bool rpl_master_erroneous_autoinc(THD *thd) |
|
3985 |
{
|
|
3986 |
if (active_mi && active_mi->rli.sql_thd == thd) |
|
3987 |
{
|
|
3988 |
Relay_log_info *rli= &active_mi->rli; |
|
3989 |
return rpl_master_has_bug(rli, 33029, FALSE); |
|
3990 |
}
|
|
3991 |
return FALSE; |
|
3992 |
}
|
|
3993 |
||
3994 |
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
|
|
3995 |
template class I_List_iterator<i_string>; |
|
3996 |
template class I_List_iterator<i_string_pair>; |
|
3997 |
#endif
|
|
3998 |
||
3999 |
/**
|
|
4000 |
@} (end of group Replication)
|
|
4001 |
*/
|
|
4002 |
||
4003 |
#endif /* HAVE_REPLICATION */ |