~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to sql/slave.cc

  • Committer: brian
  • Date: 2008-06-25 05:29:13 UTC
  • Revision ID: brian@localhost.localdomain-20080625052913-6upwo0jsrl4lnapl
clean slate

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/**
 
18
  @addtogroup Replication
 
19
  @{
 
20
 
 
21
  @file
 
22
 
 
23
  @brief Code to run the io thread and the sql thread on the
 
24
  replication slave.
 
25
*/
 
26
 
 
27
#include "mysql_priv.h"
 
28
 
 
29
#include <mysql.h>
 
30
#include <myisam.h>
 
31
#include "slave.h"
 
32
#include "rpl_mi.h"
 
33
#include "rpl_rli.h"
 
34
#include "sql_repl.h"
 
35
#include "rpl_filter.h"
 
36
#include "repl_failsafe.h"
 
37
#include <thr_alarm.h>
 
38
#include <my_dir.h>
 
39
#include <sql_common.h>
 
40
#include <errmsg.h>
 
41
#include <mysys_err.h>
 
42
 
 
43
#ifdef HAVE_REPLICATION
 
44
 
 
45
#include "rpl_tblmap.h"
 
46
 
 
47
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
 
48
 
 
49
#define MAX_SLAVE_RETRY_PAUSE 5
 
50
bool use_slave_mask = 0;
 
51
MY_BITMAP slave_error_mask;
 
52
 
 
53
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
 
54
 
 
55
char* slave_load_tmpdir = 0;
 
56
Master_info *active_mi= 0;
 
57
my_bool replicate_same_server_id;
 
58
ulonglong relay_log_space_limit = 0;
 
59
 
 
60
/*
 
61
  When slave thread exits, we need to remember the temporary tables so we
 
62
  can re-use them on slave start.
 
63
 
 
64
  TODO: move the vars below under Master_info
 
65
*/
 
66
 
 
67
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
 
68
int events_till_abort = -1;
 
69
 
 
70
enum enum_slave_reconnect_actions
 
71
{
 
72
  SLAVE_RECON_ACT_REG= 0,
 
73
  SLAVE_RECON_ACT_DUMP= 1,
 
74
  SLAVE_RECON_ACT_EVENT= 2,
 
75
  SLAVE_RECON_ACT_MAX
 
76
};
 
77
 
 
78
enum enum_slave_reconnect_messages
 
79
{
 
80
  SLAVE_RECON_MSG_WAIT= 0,
 
81
  SLAVE_RECON_MSG_KILLED_WAITING= 1,
 
82
  SLAVE_RECON_MSG_AFTER= 2,
 
83
  SLAVE_RECON_MSG_FAILED= 3,
 
84
  SLAVE_RECON_MSG_COMMAND= 4,
 
85
  SLAVE_RECON_MSG_KILLED_AFTER= 5,
 
86
  SLAVE_RECON_MSG_MAX
 
87
};
 
88
 
 
89
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
 
90
{
 
91
  {
 
92
    "Waiting to reconnect after a failed registration on master",
 
93
    "Slave I/O thread killed while waitnig to reconnect after a failed \
 
94
registration on master",
 
95
    "Reconnecting after a failed registration on master",
 
96
    "failed registering on master, reconnecting to try again, \
 
97
log '%s' at postion %s",
 
98
    "COM_REGISTER_SLAVE",
 
99
    "Slave I/O thread killed during or after reconnect"
 
100
  },
 
101
  {
 
102
    "Waiting to reconnect after a failed binlog dump request",
 
103
    "Slave I/O thread killed while retrying master dump",
 
104
    "Reconnecting after a failed binlog dump request",
 
105
    "failed dump request, reconnecting to try again, log '%s' at postion %s",
 
106
    "COM_BINLOG_DUMP",
 
107
    "Slave I/O thread killed during or after reconnect"
 
108
  },
 
109
  {
 
110
    "Waiting to reconnect after a failed master event read",
 
111
    "Slave I/O thread killed while waiting to reconnect after a failed read",
 
112
    "Reconnecting after a failed master event read",
 
113
    "Slave I/O thread: Failed reading log event, reconnecting to retry, \
 
114
log '%s' at postion %s",
 
115
    "",
 
116
    "Slave I/O thread killed during or after a reconnect done to recover from \
 
117
failed read"
 
118
  }
 
119
};
 
120
 
 
121
 
 
122
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
 
123
 
 
124
static int process_io_rotate(Master_info* mi, Rotate_log_event* rev);
 
125
static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
 
126
static bool wait_for_relay_log_space(Relay_log_info* rli);
 
127
static inline bool io_slave_killed(THD* thd,Master_info* mi);
 
128
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
 
129
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
 
130
static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi);
 
131
static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
 
132
                          bool suppress_warnings);
 
133
static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
 
134
                             bool reconnect, bool suppress_warnings);
 
135
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
 
136
                      void* thread_killed_arg);
 
137
static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi);
 
138
static Log_event* next_event(Relay_log_info* rli);
 
139
static int queue_event(Master_info* mi,const char* buf,ulong event_len);
 
140
static int terminate_slave_thread(THD *thd,
 
141
                                  pthread_mutex_t* term_lock,
 
142
                                  pthread_cond_t* term_cond,
 
143
                                  volatile uint *slave_running,
 
144
                                  bool skip_lock);
 
145
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
 
146
 
 
147
/*
 
148
  Find out which replications threads are running
 
149
 
 
150
  SYNOPSIS
 
151
    init_thread_mask()
 
152
    mask                Return value here
 
153
    mi                  master_info for slave
 
154
    inverse             If set, returns which threads are not running
 
155
 
 
156
  IMPLEMENTATION
 
157
    Get a bit mask for which threads are running so that we can later restart
 
158
    these threads.
 
159
 
 
160
  RETURN
 
161
    mask        If inverse == 0, running threads
 
162
                If inverse == 1, stopped threads
 
163
*/
 
164
 
 
165
void init_thread_mask(int* mask,Master_info* mi,bool inverse)
 
166
{
 
167
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
 
168
  register int tmp_mask=0;
 
169
  DBUG_ENTER("init_thread_mask");
 
170
 
 
171
  if (set_io)
 
172
    tmp_mask |= SLAVE_IO;
 
173
  if (set_sql)
 
174
    tmp_mask |= SLAVE_SQL;
 
175
  if (inverse)
 
176
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
 
177
  *mask = tmp_mask;
 
178
  DBUG_VOID_RETURN;
 
179
}
 
180
 
 
181
 
 
182
/*
 
183
  lock_slave_threads()
 
184
*/
 
185
 
 
186
void lock_slave_threads(Master_info* mi)
 
187
{
 
188
  DBUG_ENTER("lock_slave_threads");
 
189
 
 
190
  //TODO: see if we can do this without dual mutex
 
191
  pthread_mutex_lock(&mi->run_lock);
 
192
  pthread_mutex_lock(&mi->rli.run_lock);
 
193
  DBUG_VOID_RETURN;
 
194
}
 
195
 
 
196
 
 
197
/*
 
198
  unlock_slave_threads()
 
199
*/
 
200
 
 
201
void unlock_slave_threads(Master_info* mi)
 
202
{
 
203
  DBUG_ENTER("unlock_slave_threads");
 
204
 
 
205
  //TODO: see if we can do this without dual mutex
 
206
  pthread_mutex_unlock(&mi->rli.run_lock);
 
207
  pthread_mutex_unlock(&mi->run_lock);
 
208
  DBUG_VOID_RETURN;
 
209
}
 
210
 
 
211
 
 
212
/* Initialize slave structures */
 
213
 
 
214
int init_slave()
 
215
{
 
216
  DBUG_ENTER("init_slave");
 
217
 
 
218
  /*
 
219
    This is called when mysqld starts. Before client connections are
 
220
    accepted. However bootstrap may conflict with us if it does START SLAVE.
 
221
    So it's safer to take the lock.
 
222
  */
 
223
  pthread_mutex_lock(&LOCK_active_mi);
 
224
  /*
 
225
    TODO: re-write this to interate through the list of files
 
226
    for multi-master
 
227
  */
 
228
  active_mi= new Master_info;
 
229
 
 
230
  /*
 
231
    If master_host is not specified, try to read it from the master_info file.
 
232
    If master_host is specified, create the master_info file if it doesn't
 
233
    exists.
 
234
  */
 
235
  if (!active_mi)
 
236
  {
 
237
    sql_print_error("Failed to allocate memory for the master info structure");
 
238
    goto err;
 
239
  }
 
240
 
 
241
  if (init_master_info(active_mi,master_info_file,relay_log_info_file,
 
242
                       1, (SLAVE_IO | SLAVE_SQL)))
 
243
  {
 
244
    sql_print_error("Failed to initialize the master info structure");
 
245
    goto err;
 
246
  }
 
247
 
 
248
  /* If server id is not set, start_slave_thread() will say it */
 
249
 
 
250
  if (active_mi->host[0] && !opt_skip_slave_start)
 
251
  {
 
252
    if (start_slave_threads(1 /* need mutex */,
 
253
                            0 /* no wait for start*/,
 
254
                            active_mi,
 
255
                            master_info_file,
 
256
                            relay_log_info_file,
 
257
                            SLAVE_IO | SLAVE_SQL))
 
258
    {
 
259
      sql_print_error("Failed to create slave threads");
 
260
      goto err;
 
261
    }
 
262
  }
 
263
  pthread_mutex_unlock(&LOCK_active_mi);
 
264
  DBUG_RETURN(0);
 
265
 
 
266
err:
 
267
  pthread_mutex_unlock(&LOCK_active_mi);
 
268
  DBUG_RETURN(1);
 
269
}
 
270
 
 
271
 
 
272
/*
 
273
  Init function to set up array for errors that should be skipped for slave
 
274
 
 
275
  SYNOPSIS
 
276
    init_slave_skip_errors()
 
277
    arg         List of errors numbers to skip, separated with ','
 
278
 
 
279
  NOTES
 
280
    Called from get_options() in mysqld.cc on start-up
 
281
*/
 
282
 
 
283
void init_slave_skip_errors(const char* arg)
 
284
{
 
285
  const char *p;
 
286
  DBUG_ENTER("init_slave_skip_errors");
 
287
 
 
288
  if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
 
289
  {
 
290
    fprintf(stderr, "Badly out of memory, please check your system status\n");
 
291
    exit(1);
 
292
  }
 
293
  use_slave_mask = 1;
 
294
  for (;my_isspace(system_charset_info,*arg);++arg)
 
295
    /* empty */;
 
296
  if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
 
297
  {
 
298
    bitmap_set_all(&slave_error_mask);
 
299
    DBUG_VOID_RETURN;
 
300
  }
 
301
  for (p= arg ; *p; )
 
302
  {
 
303
    long err_code;
 
304
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
 
305
      break;
 
306
    if (err_code < MAX_SLAVE_ERROR)
 
307
       bitmap_set_bit(&slave_error_mask,(uint)err_code);
 
308
    while (!my_isdigit(system_charset_info,*p) && *p)
 
309
      p++;
 
310
  }
 
311
  DBUG_VOID_RETURN;
 
312
}
 
313
 
 
314
 
 
315
int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
 
316
{
 
317
  DBUG_ENTER("terminate_slave_threads");
 
318
 
 
319
  if (!mi->inited)
 
320
    DBUG_RETURN(0); /* successfully do nothing */
 
321
  int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
 
322
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
 
323
 
 
324
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
 
325
  {
 
326
    DBUG_PRINT("info",("Terminating IO thread"));
 
327
    mi->abort_slave=1;
 
328
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
 
329
                                      &mi->stop_cond,
 
330
                                      &mi->slave_running,
 
331
                                      skip_lock)) &&
 
332
        !force_all)
 
333
      DBUG_RETURN(error);
 
334
  }
 
335
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
 
336
  {
 
337
    DBUG_PRINT("info",("Terminating SQL thread"));
 
338
    mi->rli.abort_slave=1;
 
339
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
 
340
                                      &mi->rli.stop_cond,
 
341
                                      &mi->rli.slave_running,
 
342
                                      skip_lock)) &&
 
343
        !force_all)
 
344
      DBUG_RETURN(error);
 
345
  }
 
346
  DBUG_RETURN(0);
 
347
}
 
348
 
 
349
 
 
350
/**
 
351
   Wait for a slave thread to terminate.
 
352
 
 
353
   This function is called after requesting the thread to terminate
 
354
   (by setting @c abort_slave member of @c Relay_log_info or @c
 
355
   Master_info structure to 1). Termination of the thread is
 
356
   controlled with the the predicate <code>*slave_running</code>.
 
357
 
 
358
   Function will acquire @c term_lock before waiting on the condition
 
359
   unless @c skip_lock is true in which case the mutex should be owned
 
360
   by the caller of this function and will remain acquired after
 
361
   return from the function.
 
362
 
 
363
   @param term_lock
 
364
          Associated lock to use when waiting for @c term_cond
 
365
 
 
366
   @param term_cond
 
367
          Condition that is signalled when the thread has terminated
 
368
 
 
369
   @param slave_running
 
370
          Pointer to predicate to check for slave thread termination
 
371
 
 
372
   @param skip_lock
 
373
          If @c true the lock will not be acquired before waiting on
 
374
          the condition. In this case, it is assumed that the calling
 
375
          function acquires the lock before calling this function.
 
376
 
 
377
   @retval 0 All OK
 
378
 */
 
379
static int
 
380
terminate_slave_thread(THD *thd,
 
381
                       pthread_mutex_t* term_lock,
 
382
                       pthread_cond_t* term_cond,
 
383
                       volatile uint *slave_running,
 
384
                       bool skip_lock)
 
385
{
 
386
  int error;
 
387
 
 
388
  DBUG_ENTER("terminate_slave_thread");
 
389
 
 
390
  if (!skip_lock)
 
391
    pthread_mutex_lock(term_lock);
 
392
 
 
393
  safe_mutex_assert_owner(term_lock);
 
394
 
 
395
  if (!*slave_running)
 
396
  {
 
397
    if (!skip_lock)
 
398
      pthread_mutex_unlock(term_lock);
 
399
    DBUG_RETURN(ER_SLAVE_NOT_RUNNING);
 
400
  }
 
401
  DBUG_ASSERT(thd != 0);
 
402
  THD_CHECK_SENTRY(thd);
 
403
 
 
404
  /*
 
405
    Is is critical to test if the slave is running. Otherwise, we might
 
406
    be referening freed memory trying to kick it
 
407
  */
 
408
 
 
409
  while (*slave_running)                        // Should always be true
 
410
  {
 
411
    DBUG_PRINT("loop", ("killing slave thread"));
 
412
 
 
413
    pthread_mutex_lock(&thd->LOCK_delete);
 
414
#ifndef DONT_USE_THR_ALARM
 
415
    /*
 
416
      Error codes from pthread_kill are:
 
417
      EINVAL: invalid signal number (can't happen)
 
418
      ESRCH: thread already killed (can happen, should be ignored)
 
419
    */
 
420
    IF_DBUG(int err= ) pthread_kill(thd->real_id, thr_client_alarm);
 
421
    DBUG_ASSERT(err != EINVAL);
 
422
#endif
 
423
    thd->awake(THD::NOT_KILLED);
 
424
    pthread_mutex_unlock(&thd->LOCK_delete);
 
425
 
 
426
    /*
 
427
      There is a small chance that slave thread might miss the first
 
428
      alarm. To protect againts it, resend the signal until it reacts
 
429
    */
 
430
    struct timespec abstime;
 
431
    set_timespec(abstime,2);
 
432
    error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
 
433
    DBUG_ASSERT(error == ETIMEDOUT || error == 0);
 
434
  }
 
435
 
 
436
  DBUG_ASSERT(*slave_running == 0);
 
437
 
 
438
  if (!skip_lock)
 
439
    pthread_mutex_unlock(term_lock);
 
440
  DBUG_RETURN(0);
 
441
}
 
442
 
 
443
 
 
444
int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
 
445
                       pthread_mutex_t *cond_lock,
 
446
                       pthread_cond_t *start_cond,
 
447
                       volatile uint *slave_running,
 
448
                       volatile ulong *slave_run_id,
 
449
                       Master_info* mi,
 
450
                       bool high_priority)
 
451
{
 
452
  pthread_t th;
 
453
  ulong start_id;
 
454
  DBUG_ENTER("start_slave_thread");
 
455
 
 
456
  DBUG_ASSERT(mi->inited);
 
457
 
 
458
  if (start_lock)
 
459
    pthread_mutex_lock(start_lock);
 
460
  if (!server_id)
 
461
  {
 
462
    if (start_cond)
 
463
      pthread_cond_broadcast(start_cond);
 
464
    if (start_lock)
 
465
      pthread_mutex_unlock(start_lock);
 
466
    sql_print_error("Server id not set, will not start slave");
 
467
    DBUG_RETURN(ER_BAD_SLAVE);
 
468
  }
 
469
 
 
470
  if (*slave_running)
 
471
  {
 
472
    if (start_cond)
 
473
      pthread_cond_broadcast(start_cond);
 
474
    if (start_lock)
 
475
      pthread_mutex_unlock(start_lock);
 
476
    DBUG_RETURN(ER_SLAVE_MUST_STOP);
 
477
  }
 
478
  start_id= *slave_run_id;
 
479
  DBUG_PRINT("info",("Creating new slave thread"));
 
480
  if (high_priority)
 
481
    my_pthread_attr_setprio(&connection_attrib,CONNECT_PRIOR);
 
482
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
 
483
  {
 
484
    if (start_lock)
 
485
      pthread_mutex_unlock(start_lock);
 
486
    DBUG_RETURN(ER_SLAVE_THREAD);
 
487
  }
 
488
  if (start_cond && cond_lock) // caller has cond_lock
 
489
  {
 
490
    THD* thd = current_thd;
 
491
    while (start_id == *slave_run_id)
 
492
    {
 
493
      DBUG_PRINT("sleep",("Waiting for slave thread to start"));
 
494
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
 
495
                                            "Waiting for slave thread to start");
 
496
      pthread_cond_wait(start_cond,cond_lock);
 
497
      thd->exit_cond(old_msg);
 
498
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
 
499
      if (thd->killed)
 
500
        DBUG_RETURN(thd->killed_errno());
 
501
    }
 
502
  }
 
503
  if (start_lock)
 
504
    pthread_mutex_unlock(start_lock);
 
505
  DBUG_RETURN(0);
 
506
}
 
507
 
 
508
 
 
509
/*
 
510
  start_slave_threads()
 
511
 
 
512
  NOTES
 
513
    SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
 
514
    sense to do that for starting a slave--we always care if it actually
 
515
    started the threads that were not previously running
 
516
*/
 
517
 
 
518
int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
 
519
                        Master_info* mi, const char* master_info_fname,
 
520
                        const char* slave_info_fname, int thread_mask)
 
521
{
 
522
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
 
523
  pthread_cond_t* cond_io=0,*cond_sql=0;
 
524
  int error=0;
 
525
  DBUG_ENTER("start_slave_threads");
 
526
 
 
527
  if (need_slave_mutex)
 
528
  {
 
529
    lock_io = &mi->run_lock;
 
530
    lock_sql = &mi->rli.run_lock;
 
531
  }
 
532
  if (wait_for_start)
 
533
  {
 
534
    cond_io = &mi->start_cond;
 
535
    cond_sql = &mi->rli.start_cond;
 
536
    lock_cond_io = &mi->run_lock;
 
537
    lock_cond_sql = &mi->rli.run_lock;
 
538
  }
 
539
 
 
540
  if (thread_mask & SLAVE_IO)
 
541
    error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
 
542
                             cond_io,
 
543
                             &mi->slave_running, &mi->slave_run_id,
 
544
                             mi, 1); //high priority, to read the most possible
 
545
  if (!error && (thread_mask & SLAVE_SQL))
 
546
  {
 
547
    error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
 
548
                             cond_sql,
 
549
                             &mi->rli.slave_running, &mi->rli.slave_run_id,
 
550
                             mi, 0);
 
551
    if (error)
 
552
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
 
553
  }
 
554
  DBUG_RETURN(error);
 
555
}
 
556
 
 
557
 
 
558
#ifdef NOT_USED_YET
 
559
static int end_slave_on_walk(Master_info* mi, uchar* /*unused*/)
 
560
{
 
561
  DBUG_ENTER("end_slave_on_walk");
 
562
 
 
563
  end_master_info(mi);
 
564
  DBUG_RETURN(0);
 
565
}
 
566
#endif
 
567
 
 
568
 
 
569
/*
 
570
  Free all resources used by slave
 
571
 
 
572
  SYNOPSIS
 
573
    end_slave()
 
574
*/
 
575
 
 
576
void end_slave()
 
577
{
 
578
  DBUG_ENTER("end_slave");
 
579
 
 
580
  /*
 
581
    This is called when the server terminates, in close_connections().
 
582
    It terminates slave threads. However, some CHANGE MASTER etc may still be
 
583
    running presently. If a START SLAVE was in progress, the mutex lock below
 
584
    will make us wait until slave threads have started, and START SLAVE
 
585
    returns, then we terminate them here.
 
586
  */
 
587
  pthread_mutex_lock(&LOCK_active_mi);
 
588
  if (active_mi)
 
589
  {
 
590
    /*
 
591
      TODO: replace the line below with
 
592
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
 
593
      once multi-master code is ready.
 
594
    */
 
595
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
 
596
    end_master_info(active_mi);
 
597
    delete active_mi;
 
598
    active_mi= 0;
 
599
  }
 
600
  pthread_mutex_unlock(&LOCK_active_mi);
 
601
  DBUG_VOID_RETURN;
 
602
}
 
603
 
 
604
 
 
605
static bool io_slave_killed(THD* thd, Master_info* mi)
 
606
{
 
607
  DBUG_ENTER("io_slave_killed");
 
608
 
 
609
  DBUG_ASSERT(mi->io_thd == thd);
 
610
  DBUG_ASSERT(mi->slave_running); // tracking buffer overrun
 
611
  DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed);
 
612
}
 
613
 
 
614
 
 
615
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
 
616
{
 
617
  DBUG_ENTER("sql_slave_killed");
 
618
 
 
619
  DBUG_ASSERT(rli->sql_thd == thd);
 
620
  DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
 
621
  if (abort_loop || thd->killed || rli->abort_slave)
 
622
  {
 
623
    /*
 
624
      If we are in an unsafe situation (stopping could corrupt replication),
 
625
      we give one minute to the slave SQL thread of grace before really
 
626
      terminating, in the hope that it will be able to read more events and
 
627
      the unsafe situation will soon be left. Note that this one minute starts
 
628
      from the last time anything happened in the slave SQL thread. So it's
 
629
      really one minute of idleness, we don't timeout if the slave SQL thread
 
630
      is actively working.
 
631
    */
 
632
    if (rli->last_event_start_time == 0)
 
633
      DBUG_RETURN(1);
 
634
    DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving "
 
635
                        "it some grace period"));
 
636
    if (difftime(time(0), rli->last_event_start_time) > 60)
 
637
    {
 
638
      rli->report(ERROR_LEVEL, 0,
 
639
                  "SQL thread had to stop in an unsafe situation, in "
 
640
                  "the middle of applying updates to a "
 
641
                  "non-transactional table without any primary key. "
 
642
                  "There is a risk of duplicate updates when the slave "
 
643
                  "SQL thread is restarted. Please check your tables' "
 
644
                  "contents after restart.");
 
645
      DBUG_RETURN(1);
 
646
    }
 
647
  }
 
648
  DBUG_RETURN(0);
 
649
}
 
650
 
 
651
 
 
652
/*
 
653
  skip_load_data_infile()
 
654
 
 
655
  NOTES
 
656
    This is used to tell a 3.23 master to break send_file()
 
657
*/
 
658
 
 
659
void skip_load_data_infile(NET *net)
 
660
{
 
661
  DBUG_ENTER("skip_load_data_infile");
 
662
 
 
663
  (void)net_request_file(net, "/dev/null");
 
664
  (void)my_net_read(net);                               // discard response
 
665
  (void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok
 
666
  DBUG_VOID_RETURN;
 
667
}
 
668
 
 
669
 
 
670
bool net_request_file(NET* net, const char* fname)
 
671
{
 
672
  DBUG_ENTER("net_request_file");
 
673
  DBUG_RETURN(net_write_command(net, 251, (uchar*) fname, strlen(fname),
 
674
                                (uchar*) "", 0));
 
675
}
 
676
 
 
677
/*
 
678
  From other comments and tests in code, it looks like
 
679
  sometimes Query_log_event and Load_log_event can have db == 0
 
680
  (see rewrite_db() above for example)
 
681
  (cases where this happens are unclear; it may be when the master is 3.23).
 
682
*/
 
683
 
 
684
const char *print_slave_db_safe(const char* db)
 
685
{
 
686
  DBUG_ENTER("*print_slave_db_safe");
 
687
 
 
688
  DBUG_RETURN((db ? db : ""));
 
689
}
 
690
 
 
691
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
 
692
                                 const char *default_val)
 
693
{
 
694
  uint length;
 
695
  DBUG_ENTER("init_strvar_from_file");
 
696
 
 
697
  if ((length=my_b_gets(f,var, max_size)))
 
698
  {
 
699
    char* last_p = var + length -1;
 
700
    if (*last_p == '\n')
 
701
      *last_p = 0; // if we stopped on newline, kill it
 
702
    else
 
703
    {
 
704
      /*
 
705
        If we truncated a line or stopped on last char, remove all chars
 
706
        up to and including newline.
 
707
      */
 
708
      int c;
 
709
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF));
 
710
    }
 
711
    DBUG_RETURN(0);
 
712
  }
 
713
  else if (default_val)
 
714
  {
 
715
    strmake(var,  default_val, max_size-1);
 
716
    DBUG_RETURN(0);
 
717
  }
 
718
  DBUG_RETURN(1);
 
719
}
 
720
 
 
721
 
 
722
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
 
723
{
 
724
  char buf[32];
 
725
  DBUG_ENTER("init_intvar_from_file");
 
726
 
 
727
 
 
728
  if (my_b_gets(f, buf, sizeof(buf)))
 
729
  {
 
730
    *var = atoi(buf);
 
731
    DBUG_RETURN(0);
 
732
  }
 
733
  else if (default_val)
 
734
  {
 
735
    *var = default_val;
 
736
    DBUG_RETURN(0);
 
737
  }
 
738
  DBUG_RETURN(1);
 
739
}
 
740
 
 
741
int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
 
742
{
 
743
  char buf[16];
 
744
  DBUG_ENTER("init_floatvar_from_file");
 
745
 
 
746
 
 
747
  if (my_b_gets(f, buf, sizeof(buf)))
 
748
  {
 
749
    if (sscanf(buf, "%f", var) != 1)
 
750
      DBUG_RETURN(1);
 
751
    else
 
752
      DBUG_RETURN(0);
 
753
  }
 
754
  else if (default_val != 0.0)
 
755
  {
 
756
    *var = default_val;
 
757
    DBUG_RETURN(0);
 
758
  }
 
759
  DBUG_RETURN(1);
 
760
}
 
761
 
 
762
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
 
763
{
 
764
  if (io_slave_killed(thd, mi))
 
765
  {
 
766
    if (info && global_system_variables.log_warnings)
 
767
      sql_print_information(info);
 
768
    return TRUE;
 
769
  }
 
770
  return FALSE;
 
771
}
 
772
 
 
773
 
 
774
/*
 
775
  Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
 
776
  relying on the binlog's version. This is not perfect: imagine an upgrade
 
777
  of the master without waiting that all slaves are in sync with the master;
 
778
  then a slave could be fooled about the binlog's format. This is what happens
 
779
  when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
 
780
  slaves are fooled. So we do this only to distinguish between 3.23 and more
 
781
  recent masters (it's too late to change things for 3.23).
 
782
 
 
783
  RETURNS
 
784
  0       ok
 
785
  1       error
 
786
*/
 
787
 
 
788
static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
 
789
{
 
790
  char error_buf[512];
 
791
  String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
 
792
  char err_buff[MAX_SLAVE_ERRMSG];
 
793
  const char* errmsg= 0;
 
794
  int err_code= 0;
 
795
  MYSQL_RES *master_res= 0;
 
796
  MYSQL_ROW master_row;
 
797
  DBUG_ENTER("get_master_version_and_clock");
 
798
 
 
799
  err_msg.length(0);
 
800
  /*
 
801
    Free old description_event_for_queue (that is needed if we are in
 
802
    a reconnection).
 
803
  */
 
804
  delete mi->rli.relay_log.description_event_for_queue;
 
805
  mi->rli.relay_log.description_event_for_queue= 0;
 
806
 
 
807
  if (!my_isdigit(&my_charset_bin,*mysql->server_version))
 
808
  {
 
809
    errmsg = "Master reported unrecognized MySQL version";
 
810
    err_code= ER_SLAVE_FATAL_ERROR;
 
811
    sprintf(err_buff, ER(err_code), errmsg);
 
812
    err_msg.append(err_buff);
 
813
  }
 
814
  else
 
815
  {
 
816
    /*
 
817
      Note the following switch will bug when we have MySQL branch 30 ;)
 
818
    */
 
819
    switch (*mysql->server_version)
 
820
    {
 
821
    case '0':
 
822
    case '1':
 
823
    case '2':
 
824
      errmsg = "Master reported unrecognized MySQL version";
 
825
      err_code= ER_SLAVE_FATAL_ERROR;
 
826
      sprintf(err_buff, ER(err_code), errmsg);
 
827
      err_msg.append(err_buff);
 
828
      break;
 
829
    case '3':
 
830
      mi->rli.relay_log.description_event_for_queue= new
 
831
        Format_description_log_event(1, mysql->server_version);
 
832
      break;
 
833
    case '4':
 
834
      mi->rli.relay_log.description_event_for_queue= new
 
835
        Format_description_log_event(3, mysql->server_version);
 
836
      break;
 
837
    default:
 
838
      /*
 
839
        Master is MySQL >=5.0. Give a default Format_desc event, so that we can
 
840
        take the early steps (like tests for "is this a 3.23 master") which we
 
841
        have to take before we receive the real master's Format_desc which will
 
842
        override this one. Note that the Format_desc we create below is garbage
 
843
        (it has the format of the *slave*); it's only good to help know if the
 
844
        master is 3.23, 4.0, etc.
 
845
      */
 
846
      mi->rli.relay_log.description_event_for_queue= new
 
847
        Format_description_log_event(4, mysql->server_version);
 
848
      break;
 
849
    }
 
850
  }
 
851
 
 
852
  /*
 
853
     This does not mean that a 5.0 slave will be able to read a 6.0 master; but
 
854
     as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
 
855
     can't read a 6.0 master, this will show up when the slave can't read some
 
856
     events sent by the master, and there will be error messages.
 
857
  */
 
858
 
 
859
  if (err_msg.length() != 0)
 
860
    goto err;
 
861
 
 
862
  /* as we are here, we tried to allocate the event */
 
863
  if (!mi->rli.relay_log.description_event_for_queue)
 
864
  {
 
865
    errmsg= "default Format_description_log_event";
 
866
    err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
 
867
    sprintf(err_buff, ER(err_code), errmsg);
 
868
    err_msg.append(err_buff);
 
869
    goto err;
 
870
  }
 
871
 
 
872
  /*
 
873
    Compare the master and slave's clock. Do not die if master's clock is
 
874
    unavailable (very old master not supporting UNIX_TIMESTAMP()?).
 
875
  */
 
876
 
 
877
  if (!mysql_real_query(mysql, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
 
878
      (master_res= mysql_store_result(mysql)) &&
 
879
      (master_row= mysql_fetch_row(master_res)))
 
880
  {
 
881
    mi->clock_diff_with_master=
 
882
      (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
 
883
  }
 
884
  else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
 
885
  {
 
886
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
 
887
    sql_print_warning("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
 
888
                      "do not trust column Seconds_Behind_Master of SHOW "
 
889
                      "SLAVE STATUS. Error: %s (%d)",
 
890
                      mysql_error(mysql), mysql_errno(mysql));
 
891
  }
 
892
  if (master_res)
 
893
    mysql_free_result(master_res);
 
894
 
 
895
  /*
 
896
    Check that the master's server id and ours are different. Because if they
 
897
    are equal (which can result from a simple copy of master's datadir to slave,
 
898
    thus copying some my.cnf), replication will work but all events will be
 
899
    skipped.
 
900
    Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
 
901
    master?).
 
902
    Note: we could have put a @@SERVER_ID in the previous SELECT
 
903
    UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
 
904
  */
 
905
  if (!mysql_real_query(mysql,
 
906
                        STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
 
907
      (master_res= mysql_store_result(mysql)))
 
908
  {
 
909
    if ((master_row= mysql_fetch_row(master_res)) &&
 
910
        (::server_id == strtoul(master_row[1], 0, 10)) &&
 
911
        !mi->rli.replicate_same_server_id)
 
912
    {
 
913
      errmsg=
 
914
        "The slave I/O thread stops because master and slave have equal"
 
915
        " MySQL server ids; these ids must be different for replication to work (or"
 
916
        " the --replicate-same-server-id option must be used on slave but this does"
 
917
        " not always make sense; please check the manual before using it).";
 
918
      err_code= ER_SLAVE_FATAL_ERROR;
 
919
      sprintf(err_buff, ER(err_code), errmsg);
 
920
      err_msg.append(err_buff);
 
921
    }
 
922
    mysql_free_result(master_res);
 
923
    if (errmsg)
 
924
      goto err;
 
925
  }
 
926
 
 
927
  /*
 
928
    Check that the master's global character_set_server and ours are the same.
 
929
    Not fatal if query fails (old master?).
 
930
    Note that we don't check for equality of global character_set_client and
 
931
    collation_connection (neither do we prevent their setting in
 
932
    set_var.cc). That's because from what I (Guilhem) have tested, the global
 
933
    values of these 2 are never used (new connections don't use them).
 
934
    We don't test equality of global collation_database either as it's is
 
935
    going to be deprecated (made read-only) in 4.1 very soon.
 
936
    The test is only relevant if master < 5.0.3 (we'll test only if it's older
 
937
    than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
 
938
    charset info in each binlog event.
 
939
    We don't do it for 3.23 because masters <3.23.50 hang on
 
940
    SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
 
941
    test only if master is 4.x.
 
942
  */
 
943
 
 
944
  /* redundant with rest of code but safer against later additions */
 
945
  if (*mysql->server_version == '3')
 
946
    goto err;
 
947
 
 
948
  if ((*mysql->server_version == '4') &&
 
949
      !mysql_real_query(mysql,
 
950
                        STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
 
951
      (master_res= mysql_store_result(mysql)))
 
952
  {
 
953
    if ((master_row= mysql_fetch_row(master_res)) &&
 
954
        strcmp(master_row[0], global_system_variables.collation_server->name))
 
955
    {
 
956
      errmsg=
 
957
        "The slave I/O thread stops because master and slave have"
 
958
        " different values for the COLLATION_SERVER global variable."
 
959
        " The values must be equal for replication to work";
 
960
      err_code= ER_SLAVE_FATAL_ERROR;
 
961
      sprintf(err_buff, ER(err_code), errmsg);
 
962
      err_msg.append(err_buff);
 
963
    }
 
964
    mysql_free_result(master_res);
 
965
    if (errmsg)
 
966
      goto err;
 
967
  }
 
968
 
 
969
  /*
 
970
    Perform analogous check for time zone. Theoretically we also should
 
971
    perform check here to verify that SYSTEM time zones are the same on
 
972
    slave and master, but we can't rely on value of @@system_time_zone
 
973
    variable (it is time zone abbreviation) since it determined at start
 
974
    time and so could differ for slave and master even if they are really
 
975
    in the same system time zone. So we are omiting this check and just
 
976
    relying on documentation. Also according to Monty there are many users
 
977
    who are using replication between servers in various time zones. Hence
 
978
    such check will broke everything for them. (And now everything will
 
979
    work for them because by default both their master and slave will have
 
980
    'SYSTEM' time zone).
 
981
    This check is only necessary for 4.x masters (and < 5.0.4 masters but
 
982
    those were alpha).
 
983
  */
 
984
  if ((*mysql->server_version == '4') &&
 
985
      !mysql_real_query(mysql, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
 
986
      (master_res= mysql_store_result(mysql)))
 
987
  {
 
988
    if ((master_row= mysql_fetch_row(master_res)) &&
 
989
        strcmp(master_row[0],
 
990
               global_system_variables.time_zone->get_name()->ptr()))
 
991
    {
 
992
      errmsg=
 
993
        "The slave I/O thread stops because master and slave have"
 
994
        " different values for the TIME_ZONE global variable."
 
995
        " The values must be equal for replication to work";
 
996
      err_code= ER_SLAVE_FATAL_ERROR;
 
997
      sprintf(err_buff, ER(err_code), errmsg);
 
998
      err_msg.append(err_buff);
 
999
    }
 
1000
    mysql_free_result(master_res);
 
1001
 
 
1002
    if (errmsg)
 
1003
      goto err;
 
1004
  }
 
1005
 
 
1006
  if (mi->heartbeat_period != 0.0)
 
1007
  {
 
1008
    char llbuf[22];
 
1009
    const char query_format[]= "SET @master_heartbeat_period= %s";
 
1010
    char query[sizeof(query_format) - 2 + sizeof(llbuf)];
 
1011
    /* 
 
1012
       the period is an ulonglong of nano-secs. 
 
1013
    */
 
1014
    llstr((ulonglong) (mi->heartbeat_period*1000000000UL), llbuf);
 
1015
    my_sprintf(query, (query, query_format, llbuf));
 
1016
 
 
1017
    if (mysql_real_query(mysql, query, strlen(query))
 
1018
        && !check_io_slave_killed(mi->io_thd, mi, NULL))
 
1019
    {
 
1020
      err_msg.append("The slave I/O thread stops because querying master with '");
 
1021
      err_msg.append(query);
 
1022
      err_msg.append("' failed;");
 
1023
      err_msg.append(" error: ");
 
1024
      err_code= mysql_errno(mysql);
 
1025
      err_msg.qs_append(err_code);
 
1026
      err_msg.append("  '");
 
1027
      err_msg.append(mysql_error(mysql));
 
1028
      err_msg.append("'");
 
1029
      mysql_free_result(mysql_store_result(mysql));
 
1030
      goto err;
 
1031
    }
 
1032
    mysql_free_result(mysql_store_result(mysql));
 
1033
  }
 
1034
  
 
1035
err:
 
1036
  if (err_msg.length() != 0)
 
1037
  {
 
1038
    sql_print_error(err_msg.ptr());
 
1039
    DBUG_ASSERT(err_code != 0);
 
1040
    mi->report(ERROR_LEVEL, err_code, err_msg.ptr());
 
1041
    DBUG_RETURN(1);
 
1042
  }
 
1043
 
 
1044
  DBUG_RETURN(0);
 
1045
}
 
1046
 
 
1047
 
 
1048
static bool wait_for_relay_log_space(Relay_log_info* rli)
 
1049
{
 
1050
  bool slave_killed=0;
 
1051
  Master_info* mi = rli->mi;
 
1052
  const char *save_proc_info;
 
1053
  THD* thd = mi->io_thd;
 
1054
  DBUG_ENTER("wait_for_relay_log_space");
 
1055
 
 
1056
  pthread_mutex_lock(&rli->log_space_lock);
 
1057
  save_proc_info= thd->enter_cond(&rli->log_space_cond,
 
1058
                                  &rli->log_space_lock,
 
1059
                                  "\
 
1060
Waiting for the slave SQL thread to free enough relay log space");
 
1061
  while (rli->log_space_limit < rli->log_space_total &&
 
1062
         !(slave_killed=io_slave_killed(thd,mi)) &&
 
1063
         !rli->ignore_log_space_limit)
 
1064
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
 
1065
  thd->exit_cond(save_proc_info);
 
1066
  DBUG_RETURN(slave_killed);
 
1067
}
 
1068
 
 
1069
 
 
1070
/*
 
1071
  Builds a Rotate from the ignored events' info and writes it to relay log.
 
1072
 
 
1073
  SYNOPSIS
 
1074
  write_ignored_events_info_to_relay_log()
 
1075
    thd             pointer to I/O thread's thd
 
1076
    mi
 
1077
 
 
1078
  DESCRIPTION
 
1079
    Slave I/O thread, going to die, must leave a durable trace of the
 
1080
    ignored events' end position for the use of the slave SQL thread, by
 
1081
    calling this function. Only that thread can call it (see assertion).
 
1082
 */
 
1083
static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
 
1084
{
 
1085
  Relay_log_info *rli= &mi->rli;
 
1086
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
 
1087
  DBUG_ENTER("write_ignored_events_info_to_relay_log");
 
1088
 
 
1089
  DBUG_ASSERT(thd == mi->io_thd);
 
1090
  pthread_mutex_lock(log_lock);
 
1091
  if (rli->ign_master_log_name_end[0])
 
1092
  {
 
1093
    DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
 
1094
    Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
 
1095
                                               0, rli->ign_master_log_pos_end,
 
1096
                                               Rotate_log_event::DUP_NAME);
 
1097
    rli->ign_master_log_name_end[0]= 0;
 
1098
    /* can unlock before writing as slave SQL thd will soon see our Rotate */
 
1099
    pthread_mutex_unlock(log_lock);
 
1100
    if (likely((bool)ev))
 
1101
    {
 
1102
      ev->server_id= 0; // don't be ignored by slave SQL thread
 
1103
      if (unlikely(rli->relay_log.append(ev)))
 
1104
        mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
1105
                   ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
1106
                   "failed to write a Rotate event"
 
1107
                   " to the relay log, SHOW SLAVE STATUS may be"
 
1108
                   " inaccurate");
 
1109
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
1110
      if (flush_master_info(mi, 1))
 
1111
        sql_print_error("Failed to flush master info file");
 
1112
      delete ev;
 
1113
    }
 
1114
    else
 
1115
      mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
 
1116
                 ER(ER_SLAVE_CREATE_EVENT_FAILURE),
 
1117
                 "Rotate_event (out of memory?),"
 
1118
                 " SHOW SLAVE STATUS may be inaccurate");
 
1119
  }
 
1120
  else
 
1121
    pthread_mutex_unlock(log_lock);
 
1122
  DBUG_VOID_RETURN;
 
1123
}
 
1124
 
 
1125
 
 
1126
int register_slave_on_master(MYSQL* mysql, Master_info *mi,
 
1127
                             bool *suppress_warnings)
 
1128
{
 
1129
  uchar buf[1024], *pos= buf;
 
1130
  uint report_host_len, report_user_len=0, report_password_len=0;
 
1131
  DBUG_ENTER("register_slave_on_master");
 
1132
 
 
1133
  *suppress_warnings= FALSE;
 
1134
  if (!report_host)
 
1135
    DBUG_RETURN(0);
 
1136
  report_host_len= strlen(report_host);
 
1137
  if (report_user)
 
1138
    report_user_len= strlen(report_user);
 
1139
  if (report_password)
 
1140
    report_password_len= strlen(report_password);
 
1141
  /* 30 is a good safety margin */
 
1142
  if (report_host_len + report_user_len + report_password_len + 30 >
 
1143
      sizeof(buf))
 
1144
    DBUG_RETURN(0);                                     // safety
 
1145
 
 
1146
  int4store(pos, server_id); pos+= 4;
 
1147
  pos= net_store_data(pos, (uchar*) report_host, report_host_len);
 
1148
  pos= net_store_data(pos, (uchar*) report_user, report_user_len);
 
1149
  pos= net_store_data(pos, (uchar*) report_password, report_password_len);
 
1150
  int2store(pos, (uint16) report_port); pos+= 2;
 
1151
  int4store(pos, rpl_recovery_rank);    pos+= 4;
 
1152
  /* The master will fill in master_id */
 
1153
  int4store(pos, 0);                    pos+= 4;
 
1154
 
 
1155
  if (simple_command(mysql, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
 
1156
  {
 
1157
    if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
 
1158
    {
 
1159
      *suppress_warnings= TRUE;                 // Suppress reconnect warning
 
1160
    }
 
1161
    else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
 
1162
    {
 
1163
      char buf[256];
 
1164
      my_snprintf(buf, sizeof(buf), "%s (Errno: %d)", mysql_error(mysql), 
 
1165
                  mysql_errno(mysql));
 
1166
      mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
 
1167
                 ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
 
1168
    }
 
1169
    DBUG_RETURN(1);
 
1170
  }
 
1171
  DBUG_RETURN(0);
 
1172
}
 
1173
 
 
1174
 
 
1175
bool show_master_info(THD* thd, Master_info* mi)
 
1176
{
 
1177
  // TODO: fix this for multi-master
 
1178
  List<Item> field_list;
 
1179
  Protocol *protocol= thd->protocol;
 
1180
  DBUG_ENTER("show_master_info");
 
1181
 
 
1182
  field_list.push_back(new Item_empty_string("Slave_IO_State",
 
1183
                                                     14));
 
1184
  field_list.push_back(new Item_empty_string("Master_Host",
 
1185
                                                     sizeof(mi->host)));
 
1186
  field_list.push_back(new Item_empty_string("Master_User",
 
1187
                                                     sizeof(mi->user)));
 
1188
  field_list.push_back(new Item_return_int("Master_Port", 7,
 
1189
                                           MYSQL_TYPE_LONG));
 
1190
  field_list.push_back(new Item_return_int("Connect_Retry", 10,
 
1191
                                           MYSQL_TYPE_LONG));
 
1192
  field_list.push_back(new Item_empty_string("Master_Log_File",
 
1193
                                             FN_REFLEN));
 
1194
  field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
 
1195
                                           MYSQL_TYPE_LONGLONG));
 
1196
  field_list.push_back(new Item_empty_string("Relay_Log_File",
 
1197
                                             FN_REFLEN));
 
1198
  field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
 
1199
                                           MYSQL_TYPE_LONGLONG));
 
1200
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
 
1201
                                             FN_REFLEN));
 
1202
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
 
1203
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
 
1204
  field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
 
1205
  field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
 
1206
  field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
 
1207
  field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
 
1208
  field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
 
1209
  field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
 
1210
                                             28));
 
1211
  field_list.push_back(new Item_return_int("Last_Errno", 4, MYSQL_TYPE_LONG));
 
1212
  field_list.push_back(new Item_empty_string("Last_Error", 20));
 
1213
  field_list.push_back(new Item_return_int("Skip_Counter", 10,
 
1214
                                           MYSQL_TYPE_LONG));
 
1215
  field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
 
1216
                                           MYSQL_TYPE_LONGLONG));
 
1217
  field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
 
1218
                                           MYSQL_TYPE_LONGLONG));
 
1219
  field_list.push_back(new Item_empty_string("Until_Condition", 6));
 
1220
  field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
 
1221
  field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
 
1222
                                           MYSQL_TYPE_LONGLONG));
 
1223
  field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
 
1224
  field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
 
1225
                                             sizeof(mi->ssl_ca)));
 
1226
  field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
 
1227
                                             sizeof(mi->ssl_capath)));
 
1228
  field_list.push_back(new Item_empty_string("Master_SSL_Cert",
 
1229
                                             sizeof(mi->ssl_cert)));
 
1230
  field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
 
1231
                                             sizeof(mi->ssl_cipher)));
 
1232
  field_list.push_back(new Item_empty_string("Master_SSL_Key",
 
1233
                                             sizeof(mi->ssl_key)));
 
1234
  field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
 
1235
                                           MYSQL_TYPE_LONGLONG));
 
1236
  field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
 
1237
                                             3));
 
1238
  field_list.push_back(new Item_return_int("Last_IO_Errno", 4, MYSQL_TYPE_LONG));
 
1239
  field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
 
1240
  field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, MYSQL_TYPE_LONG));
 
1241
  field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
 
1242
 
 
1243
  if (protocol->send_fields(&field_list,
 
1244
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1245
    DBUG_RETURN(TRUE);
 
1246
 
 
1247
  if (mi->host[0])
 
1248
  {
 
1249
    DBUG_PRINT("info",("host is set: '%s'", mi->host));
 
1250
    String *packet= &thd->packet;
 
1251
    protocol->prepare_for_resend();
 
1252
 
 
1253
    /*
 
1254
      slave_running can be accessed without run_lock but not other
 
1255
      non-volotile members like mi->io_thd, which is guarded by the mutex.
 
1256
    */
 
1257
    pthread_mutex_lock(&mi->run_lock);
 
1258
    protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
 
1259
    pthread_mutex_unlock(&mi->run_lock);
 
1260
 
 
1261
    pthread_mutex_lock(&mi->data_lock);
 
1262
    pthread_mutex_lock(&mi->rli.data_lock);
 
1263
    protocol->store(mi->host, &my_charset_bin);
 
1264
    protocol->store(mi->user, &my_charset_bin);
 
1265
    protocol->store((uint32) mi->port);
 
1266
    protocol->store((uint32) mi->connect_retry);
 
1267
    protocol->store(mi->master_log_name, &my_charset_bin);
 
1268
    protocol->store((ulonglong) mi->master_log_pos);
 
1269
    protocol->store(mi->rli.group_relay_log_name +
 
1270
                    dirname_length(mi->rli.group_relay_log_name),
 
1271
                    &my_charset_bin);
 
1272
    protocol->store((ulonglong) mi->rli.group_relay_log_pos);
 
1273
    protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
 
1274
    protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ?
 
1275
                    "Yes" : "No", &my_charset_bin);
 
1276
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
 
1277
    protocol->store(rpl_filter->get_do_db());
 
1278
    protocol->store(rpl_filter->get_ignore_db());
 
1279
 
 
1280
    char buf[256];
 
1281
    String tmp(buf, sizeof(buf), &my_charset_bin);
 
1282
    rpl_filter->get_do_table(&tmp);
 
1283
    protocol->store(&tmp);
 
1284
    rpl_filter->get_ignore_table(&tmp);
 
1285
    protocol->store(&tmp);
 
1286
    rpl_filter->get_wild_do_table(&tmp);
 
1287
    protocol->store(&tmp);
 
1288
    rpl_filter->get_wild_ignore_table(&tmp);
 
1289
    protocol->store(&tmp);
 
1290
 
 
1291
    protocol->store(mi->rli.last_error().number);
 
1292
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
 
1293
    protocol->store((uint32) mi->rli.slave_skip_counter);
 
1294
    protocol->store((ulonglong) mi->rli.group_master_log_pos);
 
1295
    protocol->store((ulonglong) mi->rli.log_space_total);
 
1296
 
 
1297
    protocol->store(
 
1298
      mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
 
1299
        ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
 
1300
          "Relay"), &my_charset_bin);
 
1301
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
 
1302
    protocol->store((ulonglong) mi->rli.until_log_pos);
 
1303
 
 
1304
    protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
 
1305
    protocol->store(mi->ssl_ca, &my_charset_bin);
 
1306
    protocol->store(mi->ssl_capath, &my_charset_bin);
 
1307
    protocol->store(mi->ssl_cert, &my_charset_bin);
 
1308
    protocol->store(mi->ssl_cipher, &my_charset_bin);
 
1309
    protocol->store(mi->ssl_key, &my_charset_bin);
 
1310
 
 
1311
    /*
 
1312
      Seconds_Behind_Master: if SQL thread is running and I/O thread is
 
1313
      connected, we can compute it otherwise show NULL (i.e. unknown).
 
1314
    */
 
1315
    if ((mi->slave_running == MYSQL_SLAVE_RUN_CONNECT) &&
 
1316
        mi->rli.slave_running)
 
1317
    {
 
1318
      long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
 
1319
                       - mi->clock_diff_with_master);
 
1320
      /*
 
1321
        Apparently on some systems time_diff can be <0. Here are possible
 
1322
        reasons related to MySQL:
 
1323
        - the master is itself a slave of another master whose time is ahead.
 
1324
        - somebody used an explicit SET TIMESTAMP on the master.
 
1325
        Possible reason related to granularity-to-second of time functions
 
1326
        (nothing to do with MySQL), which can explain a value of -1:
 
1327
        assume the master's and slave's time are perfectly synchronized, and
 
1328
        that at slave's connection time, when the master's timestamp is read,
 
1329
        it is at the very end of second 1, and (a very short time later) when
 
1330
        the slave's timestamp is read it is at the very beginning of second
 
1331
        2. Then the recorded value for master is 1 and the recorded value for
 
1332
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
 
1333
        between timestamp of slave and rli->last_master_timestamp is 0
 
1334
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
 
1335
        This confuses users, so we don't go below 0: hence the max().
 
1336
 
 
1337
        last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
 
1338
        special marker to say "consider we have caught up".
 
1339
      */
 
1340
      protocol->store((longlong)(mi->rli.last_master_timestamp ?
 
1341
                                 max(0, time_diff) : 0));
 
1342
    }
 
1343
    else
 
1344
    {
 
1345
      protocol->store_null();
 
1346
    }
 
1347
    protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
 
1348
 
 
1349
    // Last_IO_Errno
 
1350
    protocol->store(mi->last_error().number);
 
1351
    // Last_IO_Error
 
1352
    protocol->store(mi->last_error().message, &my_charset_bin);
 
1353
    // Last_SQL_Errno
 
1354
    protocol->store(mi->rli.last_error().number);
 
1355
    // Last_SQL_Error
 
1356
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
 
1357
 
 
1358
    pthread_mutex_unlock(&mi->rli.data_lock);
 
1359
    pthread_mutex_unlock(&mi->data_lock);
 
1360
 
 
1361
    if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
 
1362
      DBUG_RETURN(TRUE);
 
1363
  }
 
1364
  my_eof(thd);
 
1365
  DBUG_RETURN(FALSE);
 
1366
}
 
1367
 
 
1368
 
 
1369
void set_slave_thread_options(THD* thd)
 
1370
{
 
1371
  DBUG_ENTER("set_slave_thread_options");
 
1372
  /*
 
1373
     It's nonsense to constrain the slave threads with max_join_size; if a
 
1374
     query succeeded on master, we HAVE to execute it. So set
 
1375
     OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
 
1376
     (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
 
1377
     SELECT examining more than 4 billion rows would still fail (yes, because
 
1378
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
 
1379
     only for client threads.
 
1380
  */
 
1381
  ulonglong options= thd->options | OPTION_BIG_SELECTS;
 
1382
  if (opt_log_slave_updates)
 
1383
    options|= OPTION_BIN_LOG;
 
1384
  else
 
1385
    options&= ~OPTION_BIN_LOG;
 
1386
  thd->options= options;
 
1387
  thd->variables.completion_type= 0;
 
1388
  DBUG_VOID_RETURN;
 
1389
}
 
1390
 
 
1391
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
 
1392
{
 
1393
  DBUG_ENTER("set_slave_thread_default_charset");
 
1394
 
 
1395
  thd->variables.character_set_client=
 
1396
    global_system_variables.character_set_client;
 
1397
  thd->variables.collation_connection=
 
1398
    global_system_variables.collation_connection;
 
1399
  thd->variables.collation_server=
 
1400
    global_system_variables.collation_server;
 
1401
  thd->update_charset();
 
1402
 
 
1403
  /*
 
1404
    We use a const cast here since the conceptual (and externally
 
1405
    visible) behavior of the function is to set the default charset of
 
1406
    the thread.  That the cache has to be invalidated is a secondary
 
1407
    effect.
 
1408
   */
 
1409
  const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
 
1410
  DBUG_VOID_RETURN;
 
1411
}
 
1412
 
 
1413
/*
 
1414
  init_slave_thread()
 
1415
*/
 
1416
 
 
1417
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
 
1418
{
 
1419
  DBUG_ENTER("init_slave_thread");
 
1420
#if !defined(DBUG_OFF)
 
1421
  int simulate_error= 0;
 
1422
#endif
 
1423
  thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
 
1424
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
 
1425
  thd->security_ctx->skip_grants();
 
1426
  my_net_init(&thd->net, 0);
 
1427
/*
 
1428
  Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
 
1429
  slave threads, since a replication event can become this much larger
 
1430
  than the corresponding packet (query) sent from client to master.
 
1431
*/
 
1432
  thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
 
1433
    + MAX_LOG_EVENT_HEADER;  /* note, incr over the global not session var */
 
1434
  thd->slave_thread = 1;
 
1435
  thd->enable_slow_log= opt_log_slow_slave_statements;
 
1436
  set_slave_thread_options(thd);
 
1437
  thd->client_capabilities = CLIENT_LOCAL_FILES;
 
1438
  pthread_mutex_lock(&LOCK_thread_count);
 
1439
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
 
1440
  pthread_mutex_unlock(&LOCK_thread_count);
 
1441
 
 
1442
  DBUG_EXECUTE_IF("simulate_io_slave_error_on_init",
 
1443
                  simulate_error|= (1 << SLAVE_THD_IO););
 
1444
  DBUG_EXECUTE_IF("simulate_sql_slave_error_on_init",
 
1445
                  simulate_error|= (1 << SLAVE_THD_SQL););
 
1446
#if !defined(DBUG_OFF)
 
1447
  if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
 
1448
#else
 
1449
  if (init_thr_lock() || thd->store_globals())
 
1450
#endif
 
1451
  {
 
1452
    thd->cleanup();
 
1453
    DBUG_RETURN(-1);
 
1454
  }
 
1455
  lex_start(thd);
 
1456
 
 
1457
  if (thd_type == SLAVE_THD_SQL)
 
1458
    thd_proc_info(thd, "Waiting for the next event in relay log");
 
1459
  else
 
1460
    thd_proc_info(thd, "Waiting for master update");
 
1461
  thd->version=refresh_version;
 
1462
  thd->set_time();
 
1463
  DBUG_RETURN(0);
 
1464
}
 
1465
 
 
1466
 
 
1467
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
 
1468
                      void* thread_killed_arg)
 
1469
{
 
1470
  int nap_time;
 
1471
  thr_alarm_t alarmed;
 
1472
  DBUG_ENTER("safe_sleep");
 
1473
 
 
1474
  thr_alarm_init(&alarmed);
 
1475
  time_t start_time= my_time(0);
 
1476
  time_t end_time= start_time+sec;
 
1477
 
 
1478
  while ((nap_time= (int) (end_time - start_time)) > 0)
 
1479
  {
 
1480
    ALARM alarm_buff;
 
1481
    /*
 
1482
      The only reason we are asking for alarm is so that
 
1483
      we will be woken up in case of murder, so if we do not get killed,
 
1484
      set the alarm so it goes off after we wake up naturally
 
1485
    */
 
1486
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
 
1487
    sleep(nap_time);
 
1488
    thr_end_alarm(&alarmed);
 
1489
 
 
1490
    if ((*thread_killed)(thd,thread_killed_arg))
 
1491
      DBUG_RETURN(1);
 
1492
    start_time= my_time(0);
 
1493
  }
 
1494
  DBUG_RETURN(0);
 
1495
}
 
1496
 
 
1497
 
 
1498
static int request_dump(MYSQL* mysql, Master_info* mi,
 
1499
                        bool *suppress_warnings)
 
1500
{
 
1501
  uchar buf[FN_REFLEN + 10];
 
1502
  int len;
 
1503
  int binlog_flags = 0; // for now
 
1504
  char* logname = mi->master_log_name;
 
1505
  DBUG_ENTER("request_dump");
 
1506
  
 
1507
  *suppress_warnings= FALSE;
 
1508
 
 
1509
  // TODO if big log files: Change next to int8store()
 
1510
  int4store(buf, (ulong) mi->master_log_pos);
 
1511
  int2store(buf + 4, binlog_flags);
 
1512
  int4store(buf + 6, server_id);
 
1513
  len = (uint) strlen(logname);
 
1514
  memcpy(buf + 10, logname,len);
 
1515
  if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
 
1516
  {
 
1517
    /*
 
1518
      Something went wrong, so we will just reconnect and retry later
 
1519
      in the future, we should do a better error analysis, but for
 
1520
      now we just fill up the error log :-)
 
1521
    */
 
1522
    if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
 
1523
      *suppress_warnings= TRUE;                 // Suppress reconnect warning
 
1524
    else
 
1525
      sql_print_error("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs",
 
1526
                      mysql_errno(mysql), mysql_error(mysql),
 
1527
                      mi->connect_retry);
 
1528
    DBUG_RETURN(1);
 
1529
  }
 
1530
 
 
1531
  DBUG_RETURN(0);
 
1532
}
 
1533
 
 
1534
/*
 
1535
  Read one event from the master
 
1536
 
 
1537
  SYNOPSIS
 
1538
    read_event()
 
1539
    mysql               MySQL connection
 
1540
    mi                  Master connection information
 
1541
    suppress_warnings   TRUE when a normal net read timeout has caused us to
 
1542
                        try a reconnect.  We do not want to print anything to
 
1543
                        the error log in this case because this a anormal
 
1544
                        event in an idle server.
 
1545
 
 
1546
    RETURN VALUES
 
1547
    'packet_error'      Error
 
1548
    number              Length of packet
 
1549
*/
 
1550
 
 
1551
static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
 
1552
{
 
1553
  ulong len;
 
1554
  DBUG_ENTER("read_event");
 
1555
 
 
1556
  *suppress_warnings= FALSE;
 
1557
  /*
 
1558
    my_real_read() will time us out
 
1559
    We check if we were told to die, and if not, try reading again
 
1560
  */
 
1561
#ifndef DBUG_OFF
 
1562
  if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
 
1563
    DBUG_RETURN(packet_error);
 
1564
#endif
 
1565
 
 
1566
  len = cli_safe_read(mysql);
 
1567
  if (len == packet_error || (long) len < 1)
 
1568
  {
 
1569
    if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
 
1570
    {
 
1571
      /*
 
1572
        We are trying a normal reconnect after a read timeout;
 
1573
        we suppress prints to .err file as long as the reconnect
 
1574
        happens without problems
 
1575
      */
 
1576
      *suppress_warnings= TRUE;
 
1577
    }
 
1578
    else
 
1579
      sql_print_error("Error reading packet from server: %s ( server_errno=%d)",
 
1580
                      mysql_error(mysql), mysql_errno(mysql));
 
1581
    DBUG_RETURN(packet_error);
 
1582
  }
 
1583
 
 
1584
  /* Check if eof packet */
 
1585
  if (len < 8 && mysql->net.read_pos[0] == 254)
 
1586
  {
 
1587
    sql_print_information("Slave: received end packet from server, apparent "
 
1588
                          "master shutdown: %s",
 
1589
                     mysql_error(mysql));
 
1590
     DBUG_RETURN(packet_error);
 
1591
  }
 
1592
 
 
1593
  DBUG_PRINT("exit", ("len: %lu  net->read_pos[4]: %d",
 
1594
                      len, mysql->net.read_pos[4]));
 
1595
  DBUG_RETURN(len - 1);
 
1596
}
 
1597
 
 
1598
 
 
1599
int check_expected_error(THD* thd, Relay_log_info const *rli,
 
1600
                         int expected_error)
 
1601
{
 
1602
  DBUG_ENTER("check_expected_error");
 
1603
 
 
1604
  switch (expected_error) {
 
1605
  case ER_NET_READ_ERROR:
 
1606
  case ER_NET_ERROR_ON_WRITE:
 
1607
  case ER_QUERY_INTERRUPTED:
 
1608
  case ER_SERVER_SHUTDOWN:
 
1609
  case ER_NEW_ABORTING_CONNECTION:
 
1610
    DBUG_RETURN(1);
 
1611
  default:
 
1612
    DBUG_RETURN(0);
 
1613
  }
 
1614
}
 
1615
 
 
1616
 
 
1617
/*
 
1618
  Check if the current error is of temporary nature of not.
 
1619
  Some errors are temporary in nature, such as
 
1620
  ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT.  Ndb also signals
 
1621
  that the error is temporary by pushing a warning with the error code
 
1622
  ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
 
1623
*/
 
1624
static int has_temporary_error(THD *thd)
 
1625
{
 
1626
  DBUG_ENTER("has_temporary_error");
 
1627
 
 
1628
  if (thd->is_fatal_error)
 
1629
    DBUG_RETURN(0);
 
1630
 
 
1631
  DBUG_EXECUTE_IF("all_errors_are_temporary_errors",
 
1632
                  if (thd->main_da.is_error())
 
1633
                  {
 
1634
                    thd->clear_error();
 
1635
                    my_error(ER_LOCK_DEADLOCK, MYF(0));
 
1636
                  });
 
1637
 
 
1638
  /*
 
1639
    If there is no message in THD, we can't say if it's a temporary
 
1640
    error or not. This is currently the case for Incident_log_event,
 
1641
    which sets no message. Return FALSE.
 
1642
  */
 
1643
  if (!thd->is_error())
 
1644
    DBUG_RETURN(0);
 
1645
 
 
1646
  /*
 
1647
    Temporary error codes:
 
1648
    currently, InnoDB deadlock detected by InnoDB or lock
 
1649
    wait timeout (innodb_lock_wait_timeout exceeded
 
1650
  */
 
1651
  if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
 
1652
      thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
 
1653
    DBUG_RETURN(1);
 
1654
 
 
1655
  DBUG_RETURN(0);
 
1656
}
 
1657
 
 
1658
 
 
1659
/**
 
1660
  Applies the given event and advances the relay log position.
 
1661
 
 
1662
  In essence, this function does:
 
1663
 
 
1664
  @code
 
1665
    ev->apply_event(rli);
 
1666
    ev->update_pos(rli);
 
1667
  @endcode
 
1668
 
 
1669
  But it also does some maintainance, such as skipping events if
 
1670
  needed and reporting errors.
 
1671
 
 
1672
  If the @c skip flag is set, then it is tested whether the event
 
1673
  should be skipped, by looking at the slave_skip_counter and the
 
1674
  server id.  The skip flag should be set when calling this from a
 
1675
  replication thread but not set when executing an explicit BINLOG
 
1676
  statement.
 
1677
 
 
1678
  @retval 0 OK.
 
1679
 
 
1680
  @retval 1 Error calling ev->apply_event().
 
1681
 
 
1682
  @retval 2 No error calling ev->apply_event(), but error calling
 
1683
  ev->update_pos().
 
1684
*/
 
1685
int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
 
1686
                               bool skip)
 
1687
{
 
1688
  int exec_res= 0;
 
1689
 
 
1690
  DBUG_ENTER("apply_event_and_update_pos");
 
1691
 
 
1692
  DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)",
 
1693
                           ev->get_type_str(), ev->get_type_code(),
 
1694
                           ev->server_id));
 
1695
  DBUG_PRINT("info", ("thd->options: %s%s; rli->last_event_start_time: %lu",
 
1696
                      FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
 
1697
                      FLAGSTR(thd->options, OPTION_BEGIN),
 
1698
                      rli->last_event_start_time));
 
1699
 
 
1700
  /*
 
1701
    Execute the event to change the database and update the binary
 
1702
    log coordinates, but first we set some data that is needed for
 
1703
    the thread.
 
1704
 
 
1705
    The event will be executed unless it is supposed to be skipped.
 
1706
 
 
1707
    Queries originating from this server must be skipped.  Low-level
 
1708
    events (Format_description_log_event, Rotate_log_event,
 
1709
    Stop_log_event) from this server must also be skipped. But for
 
1710
    those we don't want to modify 'group_master_log_pos', because
 
1711
    these events did not exist on the master.
 
1712
    Format_description_log_event is not completely skipped.
 
1713
 
 
1714
    Skip queries specified by the user in 'slave_skip_counter'.  We
 
1715
    can't however skip events that has something to do with the log
 
1716
    files themselves.
 
1717
 
 
1718
    Filtering on own server id is extremely important, to ignore
 
1719
    execution of events created by the creation/rotation of the relay
 
1720
    log (remember that now the relay log starts with its Format_desc,
 
1721
    has a Rotate etc).
 
1722
  */
 
1723
 
 
1724
  thd->server_id = ev->server_id; // use the original server id for logging
 
1725
  thd->set_time();                            // time the query
 
1726
  thd->lex->current_select= 0;
 
1727
  if (!ev->when)
 
1728
    ev->when= my_time(0);
 
1729
  ev->thd = thd; // because up to this point, ev->thd == 0
 
1730
 
 
1731
  if (skip)
 
1732
  {
 
1733
    int reason= ev->shall_skip(rli);
 
1734
    if (reason == Log_event::EVENT_SKIP_COUNT)
 
1735
      --rli->slave_skip_counter;
 
1736
    pthread_mutex_unlock(&rli->data_lock);
 
1737
    if (reason == Log_event::EVENT_SKIP_NOT)
 
1738
      exec_res= ev->apply_event(rli);
 
1739
#ifndef DBUG_OFF
 
1740
    /*
 
1741
      This only prints information to the debug trace.
 
1742
 
 
1743
      TODO: Print an informational message to the error log?
 
1744
    */
 
1745
    static const char *const explain[] = {
 
1746
      // EVENT_SKIP_NOT,
 
1747
      "not skipped",
 
1748
      // EVENT_SKIP_IGNORE,
 
1749
      "skipped because event should be ignored",
 
1750
      // EVENT_SKIP_COUNT
 
1751
      "skipped because event skip counter was non-zero"
 
1752
    };
 
1753
    DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
 
1754
                        thd->options & OPTION_BEGIN ? 1 : 0,
 
1755
                        rli->get_flag(Relay_log_info::IN_STMT)));
 
1756
    DBUG_PRINT("skip_event", ("%s event was %s",
 
1757
                              ev->get_type_str(), explain[reason]));
 
1758
#endif
 
1759
  }
 
1760
  else
 
1761
    exec_res= ev->apply_event(rli);
 
1762
 
 
1763
  DBUG_PRINT("info", ("apply_event error = %d", exec_res));
 
1764
  if (exec_res == 0)
 
1765
  {
 
1766
    int error= ev->update_pos(rli);
 
1767
#ifdef HAVE_purify
 
1768
    if (!rli->is_fake)
 
1769
#endif
 
1770
    {
 
1771
#ifndef DBUG_OFF
 
1772
      char buf[22];
 
1773
#endif
 
1774
      DBUG_PRINT("info", ("update_pos error = %d", error));
 
1775
      DBUG_PRINT("info", ("group %s %s",
 
1776
                          llstr(rli->group_relay_log_pos, buf),
 
1777
                          rli->group_relay_log_name));
 
1778
      DBUG_PRINT("info", ("event %s %s",
 
1779
                          llstr(rli->event_relay_log_pos, buf),
 
1780
                          rli->event_relay_log_name));
 
1781
    }
 
1782
    /*
 
1783
      The update should not fail, so print an error message and
 
1784
      return an error code.
 
1785
 
 
1786
      TODO: Replace this with a decent error message when merged
 
1787
      with BUG#24954 (which adds several new error message).
 
1788
    */
 
1789
    if (error)
 
1790
    {
 
1791
      char buf[22];
 
1792
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
 
1793
                  "It was not possible to update the positions"
 
1794
                  " of the relay log information: the slave may"
 
1795
                  " be in an inconsistent state."
 
1796
                  " Stopped in %s position %s",
 
1797
                  rli->group_relay_log_name,
 
1798
                  llstr(rli->group_relay_log_pos, buf));
 
1799
      DBUG_RETURN(2);
 
1800
    }
 
1801
  }
 
1802
 
 
1803
  DBUG_RETURN(exec_res ? 1 : 0);
 
1804
}
 
1805
 
 
1806
 
 
1807
/**
 
1808
  Top-level function for executing the next event from the relay log.
 
1809
 
 
1810
  This function reads the event from the relay log, executes it, and
 
1811
  advances the relay log position.  It also handles errors, etc.
 
1812
 
 
1813
  This function may fail to apply the event for the following reasons:
 
1814
 
 
1815
   - The position specfied by the UNTIL condition of the START SLAVE
 
1816
     command is reached.
 
1817
 
 
1818
   - It was not possible to read the event from the log.
 
1819
 
 
1820
   - The slave is killed.
 
1821
 
 
1822
   - An error occurred when applying the event, and the event has been
 
1823
     tried slave_trans_retries times.  If the event has been retried
 
1824
     fewer times, 0 is returned.
 
1825
 
 
1826
   - init_master_info or init_relay_log_pos failed. (These are called
 
1827
     if a failure occurs when applying the event.)</li>
 
1828
 
 
1829
   - An error occurred when updating the binlog position.
 
1830
 
 
1831
  @retval 0 The event was applied.
 
1832
 
 
1833
  @retval 1 The event was not applied.
 
1834
*/
 
1835
static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
 
1836
{
 
1837
  DBUG_ENTER("exec_relay_log_event");
 
1838
 
 
1839
  /*
 
1840
     We acquire this mutex since we need it for all operations except
 
1841
     event execution. But we will release it in places where we will
 
1842
     wait for something for example inside of next_event().
 
1843
   */
 
1844
  pthread_mutex_lock(&rli->data_lock);
 
1845
 
 
1846
  Log_event * ev = next_event(rli);
 
1847
 
 
1848
  DBUG_ASSERT(rli->sql_thd==thd);
 
1849
 
 
1850
  if (sql_slave_killed(thd,rli))
 
1851
  {
 
1852
    pthread_mutex_unlock(&rli->data_lock);
 
1853
    delete ev;
 
1854
    DBUG_RETURN(1);
 
1855
  }
 
1856
  if (ev)
 
1857
  {
 
1858
    int exec_res;
 
1859
 
 
1860
    /*
 
1861
      This tests if the position of the beginning of the current event
 
1862
      hits the UNTIL barrier.
 
1863
    */
 
1864
    if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
 
1865
        rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
 
1866
                                rli->group_master_log_pos :
 
1867
                                ev->log_pos - ev->data_written))
 
1868
    {
 
1869
      char buf[22];
 
1870
      sql_print_information("Slave SQL thread stopped because it reached its"
 
1871
                            " UNTIL position %s", llstr(rli->until_pos(), buf));
 
1872
      /*
 
1873
        Setting abort_slave flag because we do not want additional message about
 
1874
        error in query execution to be printed.
 
1875
      */
 
1876
      rli->abort_slave= 1;
 
1877
      pthread_mutex_unlock(&rli->data_lock);
 
1878
      delete ev;
 
1879
      DBUG_RETURN(1);
 
1880
    }
 
1881
    exec_res= apply_event_and_update_pos(ev, thd, rli, TRUE);
 
1882
 
 
1883
    /*
 
1884
      Format_description_log_event should not be deleted because it will be
 
1885
      used to read info about the relay log's format; it will be deleted when
 
1886
      the SQL thread does not need it, i.e. when this thread terminates.
 
1887
    */
 
1888
    if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
 
1889
    {
 
1890
      DBUG_PRINT("info", ("Deleting the event after it has been executed"));
 
1891
      delete ev;
 
1892
    }
 
1893
 
 
1894
    /*
 
1895
      update_log_pos failed: this should not happen, so we don't
 
1896
      retry.
 
1897
    */
 
1898
    if (exec_res == 2)
 
1899
      DBUG_RETURN(1);
 
1900
 
 
1901
    if (slave_trans_retries)
 
1902
    {
 
1903
      int temp_err= 0;
 
1904
      if (exec_res && (temp_err= has_temporary_error(thd)))
 
1905
      {
 
1906
        const char *errmsg;
 
1907
        /*
 
1908
          We were in a transaction which has been rolled back because of a
 
1909
          temporary error;
 
1910
          let's seek back to BEGIN log event and retry it all again.
 
1911
          Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
 
1912
          there is no rollback since 5.0.13 (ref: manual).
 
1913
          We have to not only seek but also
 
1914
          a) init_master_info(), to seek back to hot relay log's start for later
 
1915
          (for when we will come back to this hot log after re-processing the
 
1916
          possibly existing old logs where BEGIN is: check_binlog_magic() will
 
1917
          then need the cache to be at position 0 (see comments at beginning of
 
1918
          init_master_info()).
 
1919
          b) init_relay_log_pos(), because the BEGIN may be an older relay log.
 
1920
        */
 
1921
        if (rli->trans_retries < slave_trans_retries)
 
1922
        {
 
1923
          if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
 
1924
            sql_print_error("Failed to initialize the master info structure");
 
1925
          else if (init_relay_log_pos(rli,
 
1926
                                      rli->group_relay_log_name,
 
1927
                                      rli->group_relay_log_pos,
 
1928
                                      1, &errmsg, 1))
 
1929
            sql_print_error("Error initializing relay log position: %s",
 
1930
                            errmsg);
 
1931
          else
 
1932
          {
 
1933
            exec_res= 0;
 
1934
            end_trans(thd, ROLLBACK);
 
1935
            /* chance for concurrent connection to get more locks */
 
1936
            safe_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
 
1937
                       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
 
1938
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
 
1939
            rli->trans_retries++;
 
1940
            rli->retried_trans++;
 
1941
            pthread_mutex_unlock(&rli->data_lock);
 
1942
            DBUG_PRINT("info", ("Slave retries transaction "
 
1943
                                "rli->trans_retries: %lu", rli->trans_retries));
 
1944
          }
 
1945
        }
 
1946
        else
 
1947
          sql_print_error("Slave SQL thread retried transaction %lu time(s) "
 
1948
                          "in vain, giving up. Consider raising the value of "
 
1949
                          "the slave_transaction_retries variable.",
 
1950
                          slave_trans_retries);
 
1951
      }
 
1952
      else if (exec_res && !temp_err ||
 
1953
               (opt_using_transactions &&
 
1954
                rli->group_relay_log_pos == rli->event_relay_log_pos))
 
1955
      {
 
1956
        /*
 
1957
          Only reset the retry counter if the entire group succeeded
 
1958
          or failed with a non-transient error.  On a successful
 
1959
          event, the execution will proceed as usual; in the case of a
 
1960
          non-transient error, the slave will stop with an error.
 
1961
         */
 
1962
        rli->trans_retries= 0; // restart from fresh
 
1963
        DBUG_PRINT("info", ("Resetting retry counter, rli->trans_retries: %lu",
 
1964
                            rli->trans_retries));
 
1965
      }
 
1966
    }
 
1967
    DBUG_RETURN(exec_res);
 
1968
  }
 
1969
  pthread_mutex_unlock(&rli->data_lock);
 
1970
  rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
 
1971
              ER(ER_SLAVE_RELAY_LOG_READ_FAILURE), "\
 
1972
Could not parse relay log event entry. The possible reasons are: the master's \
 
1973
binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
 
1974
binary log), the slave's relay log is corrupted (you can check this by running \
 
1975
'mysqlbinlog' on the relay log), a network problem, or a bug in the master's \
 
1976
or slave's MySQL code. If you want to check the master's binary log or slave's \
 
1977
relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' \
 
1978
on this slave.\
 
1979
");
 
1980
  DBUG_RETURN(1);
 
1981
}
 
1982
 
 
1983
 
 
1984
/**
 
1985
  @brief Try to reconnect slave IO thread.
 
1986
 
 
1987
  @details Terminates current connection to master, sleeps for
 
1988
  @c mi->connect_retry msecs and initiates new connection with
 
1989
  @c safe_reconnect(). Variable pointed by @c retry_count is increased -
 
1990
  if it exceeds @c master_retry_count then connection is not re-established
 
1991
  and function signals error.
 
1992
  Unless @c suppres_warnings is TRUE, a warning is put in the server error log
 
1993
  when reconnecting. The warning message and messages used to report errors
 
1994
  are taken from @c messages array. In case @c master_retry_count is exceeded,
 
1995
  no messages are added to the log.
 
1996
 
 
1997
  @param[in]     thd                 Thread context.
 
1998
  @param[in]     mysql               MySQL connection.
 
1999
  @param[in]     mi                  Master connection information.
 
2000
  @param[in,out] retry_count         Number of attempts to reconnect.
 
2001
  @param[in]     suppress_warnings   TRUE when a normal net read timeout 
 
2002
                                     has caused to reconnecting.
 
2003
  @param[in]     messages            Messages to print/log, see 
 
2004
                                     reconnect_messages[] array.
 
2005
 
 
2006
  @retval        0                   OK.
 
2007
  @retval        1                   There was an error.
 
2008
*/
 
2009
 
 
2010
static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
 
2011
                            uint *retry_count, bool suppress_warnings,
 
2012
                            const char *messages[SLAVE_RECON_MSG_MAX])
 
2013
{
 
2014
  mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
 
2015
  thd->proc_info= messages[SLAVE_RECON_MSG_WAIT];
 
2016
#ifdef SIGNAL_WITH_VIO_CLOSE  
 
2017
  thd->clear_active_vio();
 
2018
#endif
 
2019
  end_server(mysql);
 
2020
  if ((*retry_count)++)
 
2021
  {
 
2022
    if (*retry_count > master_retry_count)
 
2023
      return 1;                             // Don't retry forever
 
2024
    safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
 
2025
               (void *) mi);
 
2026
  }
 
2027
  if (check_io_slave_killed(thd, mi, messages[SLAVE_RECON_MSG_KILLED_WAITING]))
 
2028
    return 1;
 
2029
  thd->proc_info = messages[SLAVE_RECON_MSG_AFTER];
 
2030
  if (!suppress_warnings) 
 
2031
  {
 
2032
    char buf[256], llbuff[22];
 
2033
    my_snprintf(buf, sizeof(buf), messages[SLAVE_RECON_MSG_FAILED], 
 
2034
                IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
 
2035
    /* 
 
2036
      Raise a warining during registering on master/requesting dump.
 
2037
      Log a message reading event.
 
2038
    */
 
2039
    if (messages[SLAVE_RECON_MSG_COMMAND][0])
 
2040
    {
 
2041
      mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
 
2042
                 ER(ER_SLAVE_MASTER_COM_FAILURE), 
 
2043
                 messages[SLAVE_RECON_MSG_COMMAND], buf);
 
2044
    }
 
2045
    else
 
2046
    {
 
2047
      sql_print_information(buf);
 
2048
    }
 
2049
  }
 
2050
  if (safe_reconnect(thd, mysql, mi, 1) || io_slave_killed(thd, mi))
 
2051
  {
 
2052
    if (global_system_variables.log_warnings)
 
2053
      sql_print_information(messages[SLAVE_RECON_MSG_KILLED_AFTER]);
 
2054
    return 1;
 
2055
  }
 
2056
  return 0;
 
2057
}
 
2058
 
 
2059
 
 
2060
/* Slave I/O Thread entry point */
 
2061
 
 
2062
pthread_handler_t handle_slave_io(void *arg)
 
2063
{
 
2064
  THD *thd; // needs to be first for thread_stack
 
2065
  MYSQL *mysql;
 
2066
  Master_info *mi = (Master_info*)arg;
 
2067
  Relay_log_info *rli= &mi->rli;
 
2068
  char llbuff[22];
 
2069
  uint retry_count;
 
2070
  bool suppress_warnings;
 
2071
#ifndef DBUG_OFF
 
2072
  uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
 
2073
#endif
 
2074
  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
 
2075
  my_thread_init();
 
2076
  DBUG_ENTER("handle_slave_io");
 
2077
 
 
2078
  DBUG_ASSERT(mi->inited);
 
2079
  mysql= NULL ;
 
2080
  retry_count= 0;
 
2081
 
 
2082
  pthread_mutex_lock(&mi->run_lock);
 
2083
  /* Inform waiting threads that slave has started */
 
2084
  mi->slave_run_id++;
 
2085
 
 
2086
#ifndef DBUG_OFF
 
2087
  mi->events_till_disconnect = disconnect_slave_event_count;
 
2088
#endif
 
2089
 
 
2090
  thd= new THD; // note that contructor of THD uses DBUG_ !
 
2091
  THD_CHECK_SENTRY(thd);
 
2092
  mi->io_thd = thd;
 
2093
 
 
2094
  pthread_detach_this_thread();
 
2095
  thd->thread_stack= (char*) &thd; // remember where our stack is
 
2096
  if (init_slave_thread(thd, SLAVE_THD_IO))
 
2097
  {
 
2098
    pthread_cond_broadcast(&mi->start_cond);
 
2099
    pthread_mutex_unlock(&mi->run_lock);
 
2100
    sql_print_error("Failed during slave I/O thread initialization");
 
2101
    goto err;
 
2102
  }
 
2103
  pthread_mutex_lock(&LOCK_thread_count);
 
2104
  threads.append(thd);
 
2105
  pthread_mutex_unlock(&LOCK_thread_count);
 
2106
  mi->slave_running = 1;
 
2107
  mi->abort_slave = 0;
 
2108
  pthread_mutex_unlock(&mi->run_lock);
 
2109
  pthread_cond_broadcast(&mi->start_cond);
 
2110
 
 
2111
  DBUG_PRINT("master_info",("log_file_name: '%s'  position: %s",
 
2112
                            mi->master_log_name,
 
2113
                            llstr(mi->master_log_pos,llbuff)));
 
2114
 
 
2115
  if (!(mi->mysql = mysql = mysql_init(NULL)))
 
2116
  {
 
2117
    mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
 
2118
               ER(ER_SLAVE_FATAL_ERROR), "error in mysql_init()");
 
2119
    goto err;
 
2120
  }
 
2121
 
 
2122
  thd_proc_info(thd, "Connecting to master");
 
2123
  // we can get killed during safe_connect
 
2124
  if (!safe_connect(thd, mysql, mi))
 
2125
  {
 
2126
    sql_print_information("Slave I/O thread: connected to master '%s@%s:%d',"
 
2127
                          "replication started in log '%s' at position %s",
 
2128
                          mi->user, mi->host, mi->port,
 
2129
                          IO_RPL_LOG_NAME,
 
2130
                          llstr(mi->master_log_pos,llbuff));
 
2131
  /*
 
2132
    Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
 
2133
    thread, since a replication event can become this much larger than
 
2134
    the corresponding packet (query) sent from client to master.
 
2135
  */
 
2136
    mysql->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
 
2137
  }
 
2138
  else
 
2139
  {
 
2140
    sql_print_information("Slave I/O thread killed while connecting to master");
 
2141
    goto err;
 
2142
  }
 
2143
 
 
2144
connected:
 
2145
 
 
2146
  // TODO: the assignment below should be under mutex (5.0)
 
2147
  mi->slave_running= MYSQL_SLAVE_RUN_CONNECT;
 
2148
  thd->slave_net = &mysql->net;
 
2149
  thd_proc_info(thd, "Checking master version");
 
2150
  if (get_master_version_and_clock(mysql, mi))
 
2151
    goto err;
 
2152
 
 
2153
  if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
 
2154
  {
 
2155
    /*
 
2156
      Register ourselves with the master.
 
2157
    */
 
2158
    thd_proc_info(thd, "Registering slave on master");
 
2159
    if (register_slave_on_master(mysql, mi, &suppress_warnings))
 
2160
    {
 
2161
      if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
 
2162
                                "while registering slave on master"))
 
2163
      {
 
2164
        sql_print_error("Slave I/O thread couldn't register on master");
 
2165
        if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
 
2166
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
 
2167
          goto err;
 
2168
      }
 
2169
      else
 
2170
        goto err;
 
2171
      goto connected;
 
2172
    }
 
2173
    DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_REG", 
 
2174
      if (!retry_count_reg)
 
2175
      {
 
2176
        retry_count_reg++;
 
2177
        sql_print_information("Forcing to reconnect slave I/O thread");
 
2178
        if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
 
2179
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
 
2180
          goto err;
 
2181
        goto connected;
 
2182
      });
 
2183
  }
 
2184
 
 
2185
  DBUG_PRINT("info",("Starting reading binary log from master"));
 
2186
  while (!io_slave_killed(thd,mi))
 
2187
  {
 
2188
    thd_proc_info(thd, "Requesting binlog dump");
 
2189
    if (request_dump(mysql, mi, &suppress_warnings))
 
2190
    {
 
2191
      sql_print_error("Failed on request_dump()");
 
2192
      if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
 
2193
requesting master dump") ||
 
2194
          try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
 
2195
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
 
2196
        goto err;
 
2197
      goto connected;
 
2198
    }
 
2199
    DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_DUMP", 
 
2200
      if (!retry_count_dump)
 
2201
      {
 
2202
        retry_count_dump++;
 
2203
        sql_print_information("Forcing to reconnect slave I/O thread");
 
2204
        if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
 
2205
                             reconnect_messages[SLAVE_RECON_ACT_DUMP]))
 
2206
          goto err;
 
2207
        goto connected;
 
2208
      });
 
2209
 
 
2210
    while (!io_slave_killed(thd,mi))
 
2211
    {
 
2212
      ulong event_len;
 
2213
      /*
 
2214
         We say "waiting" because read_event() will wait if there's nothing to
 
2215
         read. But if there's something to read, it will not wait. The
 
2216
         important thing is to not confuse users by saying "reading" whereas
 
2217
         we're in fact receiving nothing.
 
2218
      */
 
2219
      thd_proc_info(thd, "Waiting for master to send event");
 
2220
      event_len= read_event(mysql, mi, &suppress_warnings);
 
2221
      if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
 
2222
reading event"))
 
2223
        goto err;
 
2224
      DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_EVENT",
 
2225
        if (!retry_count_event)
 
2226
        {
 
2227
          retry_count_event++;
 
2228
          sql_print_information("Forcing to reconnect slave I/O thread");
 
2229
          if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
 
2230
                               reconnect_messages[SLAVE_RECON_ACT_EVENT]))
 
2231
            goto err;
 
2232
          goto connected;
 
2233
        });
 
2234
 
 
2235
      if (event_len == packet_error)
 
2236
      {
 
2237
        uint mysql_error_number= mysql_errno(mysql);
 
2238
        switch (mysql_error_number) {
 
2239
        case CR_NET_PACKET_TOO_LARGE:
 
2240
          sql_print_error("\
 
2241
Log entry on master is longer than max_allowed_packet (%ld) on \
 
2242
slave. If the entry is correct, restart the server with a higher value of \
 
2243
max_allowed_packet",
 
2244
                          thd->variables.max_allowed_packet);
 
2245
          goto err;
 
2246
        case ER_MASTER_FATAL_ERROR_READING_BINLOG:
 
2247
          sql_print_error(ER(mysql_error_number), mysql_error_number,
 
2248
                          mysql_error(mysql));
 
2249
          goto err;
 
2250
        case EE_OUTOFMEMORY:
 
2251
        case ER_OUTOFMEMORY:
 
2252
          sql_print_error("\
 
2253
Stopping slave I/O thread due to out-of-memory error from master");
 
2254
          goto err;
 
2255
        }
 
2256
        if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
 
2257
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
 
2258
          goto err;
 
2259
        goto connected;
 
2260
      } // if (event_len == packet_error)
 
2261
 
 
2262
      retry_count=0;                    // ok event, reset retry counter
 
2263
      thd_proc_info(thd, "Queueing master event to the relay log");
 
2264
      if (queue_event(mi,(const char*)mysql->net.read_pos + 1, event_len))
 
2265
      {
 
2266
        goto err;
 
2267
      }
 
2268
      if (flush_master_info(mi, 1))
 
2269
      {
 
2270
        sql_print_error("Failed to flush master info file");
 
2271
        goto err;
 
2272
      }
 
2273
      /*
 
2274
        See if the relay logs take too much space.
 
2275
        We don't lock mi->rli.log_space_lock here; this dirty read saves time
 
2276
        and does not introduce any problem:
 
2277
        - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
 
2278
        the clean value is 0), then we are reading only one more event as we
 
2279
        should, and we'll block only at the next event. No big deal.
 
2280
        - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
 
2281
        the clean value is 1), then we are going into wait_for_relay_log_space()
 
2282
        for no reason, but this function will do a clean read, notice the clean
 
2283
        value and exit immediately.
 
2284
      */
 
2285
#ifndef DBUG_OFF
 
2286
      {
 
2287
        char llbuf1[22], llbuf2[22];
 
2288
        DBUG_PRINT("info", ("log_space_limit=%s log_space_total=%s \
 
2289
ignore_log_space_limit=%d",
 
2290
                            llstr(rli->log_space_limit,llbuf1),
 
2291
                            llstr(rli->log_space_total,llbuf2),
 
2292
                            (int) rli->ignore_log_space_limit));
 
2293
      }
 
2294
#endif
 
2295
 
 
2296
      if (rli->log_space_limit && rli->log_space_limit <
 
2297
          rli->log_space_total &&
 
2298
          !rli->ignore_log_space_limit)
 
2299
        if (wait_for_relay_log_space(rli))
 
2300
        {
 
2301
          sql_print_error("Slave I/O thread aborted while waiting for relay \
 
2302
log space");
 
2303
          goto err;
 
2304
        }
 
2305
    }
 
2306
  }
 
2307
 
 
2308
  // error = 0;
 
2309
err:
 
2310
  // print the current replication position
 
2311
  sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
 
2312
                  IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
 
2313
  VOID(pthread_mutex_lock(&LOCK_thread_count));
 
2314
  thd->query = thd->db = 0; // extra safety
 
2315
  thd->query_length= thd->db_length= 0;
 
2316
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
 
2317
  if (mysql)
 
2318
  {
 
2319
    /*
 
2320
      Here we need to clear the active VIO before closing the
 
2321
      connection with the master.  The reason is that THD::awake()
 
2322
      might be called from terminate_slave_thread() because somebody
 
2323
      issued a STOP SLAVE.  If that happends, the close_active_vio()
 
2324
      can be called in the middle of closing the VIO associated with
 
2325
      the 'mysql' object, causing a crash.
 
2326
    */
 
2327
#ifdef SIGNAL_WITH_VIO_CLOSE
 
2328
    thd->clear_active_vio();
 
2329
#endif
 
2330
    mysql_close(mysql);
 
2331
    mi->mysql=0;
 
2332
  }
 
2333
  write_ignored_events_info_to_relay_log(thd, mi);
 
2334
  thd_proc_info(thd, "Waiting for slave mutex on exit");
 
2335
  pthread_mutex_lock(&mi->run_lock);
 
2336
 
 
2337
  /* Forget the relay log's format */
 
2338
  delete mi->rli.relay_log.description_event_for_queue;
 
2339
  mi->rli.relay_log.description_event_for_queue= 0;
 
2340
  // TODO: make rpl_status part of Master_info
 
2341
  change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
 
2342
  DBUG_ASSERT(thd->net.buff != 0);
 
2343
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
 
2344
  close_thread_tables(thd);
 
2345
  pthread_mutex_lock(&LOCK_thread_count);
 
2346
  THD_CHECK_SENTRY(thd);
 
2347
  delete thd;
 
2348
  pthread_mutex_unlock(&LOCK_thread_count);
 
2349
  mi->abort_slave= 0;
 
2350
  mi->slave_running= 0;
 
2351
  mi->io_thd= 0;
 
2352
  /*
 
2353
    Note: the order of the two following calls (first broadcast, then unlock)
 
2354
    is important. Otherwise a killer_thread can execute between the calls and
 
2355
    delete the mi structure leading to a crash! (see BUG#25306 for details)
 
2356
   */ 
 
2357
  pthread_cond_broadcast(&mi->stop_cond);       // tell the world we are done
 
2358
  pthread_mutex_unlock(&mi->run_lock);
 
2359
  my_thread_end();
 
2360
  pthread_exit(0);
 
2361
  DBUG_RETURN(0);                               // Can't return anything here
 
2362
}
 
2363
 
 
2364
 
 
2365
/* Slave SQL Thread entry point */
 
2366
 
 
2367
pthread_handler_t handle_slave_sql(void *arg)
 
2368
{
 
2369
  THD *thd;                     /* needs to be first for thread_stack */
 
2370
  char llbuff[22],llbuff1[22];
 
2371
 
 
2372
  Relay_log_info* rli = &((Master_info*)arg)->rli;
 
2373
  const char *errmsg;
 
2374
 
 
2375
  // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
 
2376
  my_thread_init();
 
2377
  DBUG_ENTER("handle_slave_sql");
 
2378
 
 
2379
  DBUG_ASSERT(rli->inited);
 
2380
  pthread_mutex_lock(&rli->run_lock);
 
2381
  DBUG_ASSERT(!rli->slave_running);
 
2382
  errmsg= 0;
 
2383
#ifndef DBUG_OFF
 
2384
  rli->events_till_abort = abort_slave_event_count;
 
2385
#endif
 
2386
 
 
2387
  thd = new THD; // note that contructor of THD uses DBUG_ !
 
2388
  thd->thread_stack = (char*)&thd; // remember where our stack is
 
2389
  rli->sql_thd= thd;
 
2390
  
 
2391
  /* Inform waiting threads that slave has started */
 
2392
  rli->slave_run_id++;
 
2393
  rli->slave_running = 1;
 
2394
 
 
2395
  pthread_detach_this_thread();
 
2396
  if (init_slave_thread(thd, SLAVE_THD_SQL))
 
2397
  {
 
2398
    /*
 
2399
      TODO: this is currently broken - slave start and change master
 
2400
      will be stuck if we fail here
 
2401
    */
 
2402
    pthread_cond_broadcast(&rli->start_cond);
 
2403
    pthread_mutex_unlock(&rli->run_lock);
 
2404
    sql_print_error("Failed during slave thread initialization");
 
2405
    goto err;
 
2406
  }
 
2407
  thd->init_for_queries();
 
2408
  thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
 
2409
  pthread_mutex_lock(&LOCK_thread_count);
 
2410
  threads.append(thd);
 
2411
  pthread_mutex_unlock(&LOCK_thread_count);
 
2412
  /*
 
2413
    We are going to set slave_running to 1. Assuming slave I/O thread is
 
2414
    alive and connected, this is going to make Seconds_Behind_Master be 0
 
2415
    i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
 
2416
    the moment we start we can think we are caught up, and the next second we
 
2417
    start receiving data so we realize we are not caught up and
 
2418
    Seconds_Behind_Master grows. No big deal.
 
2419
  */
 
2420
  rli->abort_slave = 0;
 
2421
  pthread_mutex_unlock(&rli->run_lock);
 
2422
  pthread_cond_broadcast(&rli->start_cond);
 
2423
 
 
2424
  /*
 
2425
    Reset errors for a clean start (otherwise, if the master is idle, the SQL
 
2426
    thread may execute no Query_log_event, so the error will remain even
 
2427
    though there's no problem anymore). Do not reset the master timestamp
 
2428
    (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
 
2429
    as we are not sure that we are going to receive a query, we want to
 
2430
    remember the last master timestamp (to say how many seconds behind we are
 
2431
    now.
 
2432
    But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
 
2433
  */
 
2434
  rli->clear_error();
 
2435
 
 
2436
  //tell the I/O thread to take relay_log_space_limit into account from now on
 
2437
  pthread_mutex_lock(&rli->log_space_lock);
 
2438
  rli->ignore_log_space_limit= 0;
 
2439
  pthread_mutex_unlock(&rli->log_space_lock);
 
2440
  rli->trans_retries= 0; // start from "no error"
 
2441
  DBUG_PRINT("info", ("rli->trans_retries: %lu", rli->trans_retries));
 
2442
 
 
2443
  if (init_relay_log_pos(rli,
 
2444
                         rli->group_relay_log_name,
 
2445
                         rli->group_relay_log_pos,
 
2446
                         1 /*need data lock*/, &errmsg,
 
2447
                         1 /*look for a description_event*/))
 
2448
  {
 
2449
    sql_print_error("Error initializing relay log position: %s",
 
2450
                    errmsg);
 
2451
    goto err;
 
2452
  }
 
2453
  THD_CHECK_SENTRY(thd);
 
2454
#ifndef DBUG_OFF
 
2455
  {
 
2456
    char llbuf1[22], llbuf2[22];
 
2457
    DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
 
2458
                        llstr(my_b_tell(rli->cur_log),llbuf1),
 
2459
                        llstr(rli->event_relay_log_pos,llbuf2)));
 
2460
    DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
 
2461
    /*
 
2462
      Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
 
2463
      correct position when it's called just after my_b_seek() (the questionable
 
2464
      stuff is those "seek is done on next read" comments in the my_b_seek()
 
2465
      source code).
 
2466
      The crude reality is that this assertion randomly fails whereas
 
2467
      replication seems to work fine. And there is no easy explanation why it
 
2468
      fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
 
2469
      init_relay_log_pos() called above). Maybe the assertion would be
 
2470
      meaningful if we held rli->data_lock between the my_b_seek() and the
 
2471
      DBUG_ASSERT().
 
2472
    */
 
2473
#ifdef SHOULD_BE_CHECKED
 
2474
    DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
 
2475
#endif
 
2476
  }
 
2477
#endif
 
2478
  DBUG_ASSERT(rli->sql_thd == thd);
 
2479
 
 
2480
  DBUG_PRINT("master_info",("log_file_name: %s  position: %s",
 
2481
                            rli->group_master_log_name,
 
2482
                            llstr(rli->group_master_log_pos,llbuff)));
 
2483
  if (global_system_variables.log_warnings)
 
2484
    sql_print_information("Slave SQL thread initialized, starting replication in \
 
2485
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
 
2486
                    llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
 
2487
                    llstr(rli->group_relay_log_pos,llbuff1));
 
2488
 
 
2489
  /* execute init_slave variable */
 
2490
  if (sys_init_slave.value_length)
 
2491
  {
 
2492
    execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
 
2493
    if (thd->is_slave_error)
 
2494
    {
 
2495
      sql_print_error("\
 
2496
Slave SQL thread aborted. Can't execute init_slave query");
 
2497
      goto err;
 
2498
    }
 
2499
  }
 
2500
 
 
2501
  /*
 
2502
    First check until condition - probably there is nothing to execute. We
 
2503
    do not want to wait for next event in this case.
 
2504
  */
 
2505
  pthread_mutex_lock(&rli->data_lock);
 
2506
  if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
 
2507
      rli->is_until_satisfied(rli->group_master_log_pos))
 
2508
  {
 
2509
    char buf[22];
 
2510
    sql_print_information("Slave SQL thread stopped because it reached its"
 
2511
                          " UNTIL position %s", llstr(rli->until_pos(), buf));
 
2512
    pthread_mutex_unlock(&rli->data_lock);
 
2513
    goto err;
 
2514
  }
 
2515
  pthread_mutex_unlock(&rli->data_lock);
 
2516
 
 
2517
  /* Read queries from the IO/THREAD until this thread is killed */
 
2518
 
 
2519
  while (!sql_slave_killed(thd,rli))
 
2520
  {
 
2521
    thd_proc_info(thd, "Reading event from the relay log");
 
2522
    DBUG_ASSERT(rli->sql_thd == thd);
 
2523
    THD_CHECK_SENTRY(thd);
 
2524
    if (exec_relay_log_event(thd,rli))
 
2525
    {
 
2526
      DBUG_PRINT("info", ("exec_relay_log_event() failed"));
 
2527
      // do not scare the user if SQL thread was simply killed or stopped
 
2528
      if (!sql_slave_killed(thd,rli))
 
2529
      {
 
2530
        /*
 
2531
          retrieve as much info as possible from the thd and, error
 
2532
          codes and warnings and print this to the error log as to
 
2533
          allow the user to locate the error
 
2534
        */
 
2535
        uint32 const last_errno= rli->last_error().number;
 
2536
 
 
2537
        if (thd->is_error())
 
2538
        {
 
2539
          char const *const errmsg= thd->main_da.message();
 
2540
 
 
2541
          DBUG_PRINT("info",
 
2542
                     ("thd->main_da.sql_errno()=%d; rli->last_error.number=%d",
 
2543
                      thd->main_da.sql_errno(), last_errno));
 
2544
          if (last_errno == 0)
 
2545
          {
 
2546
            rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), errmsg);
 
2547
          }
 
2548
          else if (last_errno != thd->main_da.sql_errno())
 
2549
          {
 
2550
            sql_print_error("Slave (additional info): %s Error_code: %d",
 
2551
                            errmsg, thd->main_da.sql_errno());
 
2552
          }
 
2553
        }
 
2554
 
 
2555
        /* Print any warnings issued */
 
2556
        List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
 
2557
        MYSQL_ERROR *err;
 
2558
        /*
 
2559
          Added controlled slave thread cancel for replication
 
2560
          of user-defined variables.
 
2561
        */
 
2562
        bool udf_error = false;
 
2563
        while ((err= it++))
 
2564
        {
 
2565
          if (err->code == ER_CANT_OPEN_LIBRARY)
 
2566
            udf_error = true;
 
2567
          sql_print_warning("Slave: %s Error_code: %d",err->msg, err->code);
 
2568
        }
 
2569
        if (udf_error)
 
2570
          sql_print_error("Error loading user-defined library, slave SQL "
 
2571
            "thread aborted. Install the missing library, and restart the "
 
2572
            "slave SQL thread with \"SLAVE START\". We stopped at log '%s' "
 
2573
            "position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, 
 
2574
            llbuff));
 
2575
        else
 
2576
        sql_print_error("\
 
2577
Error running query, slave SQL thread aborted. Fix the problem, and restart \
 
2578
the slave SQL thread with \"SLAVE START\". We stopped at log \
 
2579
'%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff));
 
2580
      }
 
2581
      goto err;
 
2582
    }
 
2583
  }
 
2584
 
 
2585
  /* Thread stopped. Print the current replication position to the log */
 
2586
  sql_print_information("Slave SQL thread exiting, replication stopped in log "
 
2587
                        "'%s' at position %s",
 
2588
                        RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
 
2589
 
 
2590
 err:
 
2591
 
 
2592
  /*
 
2593
    Some events set some playgrounds, which won't be cleared because thread
 
2594
    stops. Stopping of this thread may not be known to these events ("stop"
 
2595
    request is detected only by the present function, not by events), so we
 
2596
    must "proactively" clear playgrounds:
 
2597
  */
 
2598
  rli->cleanup_context(thd, 1);
 
2599
  VOID(pthread_mutex_lock(&LOCK_thread_count));
 
2600
  /*
 
2601
    Some extra safety, which should not been needed (normally, event deletion
 
2602
    should already have done these assignments (each event which sets these
 
2603
    variables is supposed to set them to 0 before terminating)).
 
2604
  */
 
2605
  thd->query= thd->db= thd->catalog= 0;
 
2606
  thd->query_length= thd->db_length= 0;
 
2607
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
 
2608
  thd_proc_info(thd, "Waiting for slave mutex on exit");
 
2609
  pthread_mutex_lock(&rli->run_lock);
 
2610
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
 
2611
  pthread_mutex_lock(&rli->data_lock);
 
2612
  DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
 
2613
  /* When master_pos_wait() wakes up it will check this and terminate */
 
2614
  rli->slave_running= 0;
 
2615
  /* Forget the relay log's format */
 
2616
  delete rli->relay_log.description_event_for_exec;
 
2617
  rli->relay_log.description_event_for_exec= 0;
 
2618
  /* Wake up master_pos_wait() */
 
2619
  pthread_mutex_unlock(&rli->data_lock);
 
2620
  DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
 
2621
  pthread_cond_broadcast(&rli->data_cond);
 
2622
  rli->ignore_log_space_limit= 0; /* don't need any lock */
 
2623
  /* we die so won't remember charset - re-update them on next thread start */
 
2624
  rli->cached_charset_invalidate();
 
2625
  rli->save_temporary_tables = thd->temporary_tables;
 
2626
 
 
2627
  /*
 
2628
    TODO: see if we can do this conditionally in next_event() instead
 
2629
    to avoid unneeded position re-init
 
2630
  */
 
2631
  thd->temporary_tables = 0; // remove tempation from destructor to close them
 
2632
  DBUG_ASSERT(thd->net.buff != 0);
 
2633
  net_end(&thd->net); // destructor will not free it, because we are weird
 
2634
  DBUG_ASSERT(rli->sql_thd == thd);
 
2635
  THD_CHECK_SENTRY(thd);
 
2636
  rli->sql_thd= 0;
 
2637
  pthread_mutex_lock(&LOCK_thread_count);
 
2638
  THD_CHECK_SENTRY(thd);
 
2639
  delete thd;
 
2640
  pthread_mutex_unlock(&LOCK_thread_count);
 
2641
 /*
 
2642
  Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
 
2643
  is important. Otherwise a killer_thread can execute between the calls and
 
2644
  delete the mi structure leading to a crash! (see BUG#25306 for details)
 
2645
 */ 
 
2646
  pthread_cond_broadcast(&rli->stop_cond);
 
2647
  pthread_mutex_unlock(&rli->run_lock);  // tell the world we are done
 
2648
  
 
2649
  my_thread_end();
 
2650
  pthread_exit(0);
 
2651
  DBUG_RETURN(0);                               // Can't return anything here
 
2652
}
 
2653
 
 
2654
 
 
2655
/*
 
2656
  process_io_create_file()
 
2657
*/
 
2658
 
 
2659
static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
 
2660
{
 
2661
  int error = 1;
 
2662
  ulong num_bytes;
 
2663
  bool cev_not_written;
 
2664
  THD *thd = mi->io_thd;
 
2665
  NET *net = &mi->mysql->net;
 
2666
  DBUG_ENTER("process_io_create_file");
 
2667
 
 
2668
  if (unlikely(!cev->is_valid()))
 
2669
    DBUG_RETURN(1);
 
2670
 
 
2671
  if (!rpl_filter->db_ok(cev->db))
 
2672
  {
 
2673
    skip_load_data_infile(net);
 
2674
    DBUG_RETURN(0);
 
2675
  }
 
2676
  DBUG_ASSERT(cev->inited_from_old);
 
2677
  thd->file_id = cev->file_id = mi->file_id++;
 
2678
  thd->server_id = cev->server_id;
 
2679
  cev_not_written = 1;
 
2680
 
 
2681
  if (unlikely(net_request_file(net,cev->fname)))
 
2682
  {
 
2683
    sql_print_error("Slave I/O: failed requesting download of '%s'",
 
2684
                    cev->fname);
 
2685
    goto err;
 
2686
  }
 
2687
 
 
2688
  /*
 
2689
    This dummy block is so we could instantiate Append_block_log_event
 
2690
    once and then modify it slightly instead of doing it multiple times
 
2691
    in the loop
 
2692
  */
 
2693
  {
 
2694
    Append_block_log_event aev(thd,0,0,0,0);
 
2695
 
 
2696
    for (;;)
 
2697
    {
 
2698
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
 
2699
      {
 
2700
        sql_print_error("Network read error downloading '%s' from master",
 
2701
                        cev->fname);
 
2702
        goto err;
 
2703
      }
 
2704
      if (unlikely(!num_bytes)) /* eof */
 
2705
      {
 
2706
        /* 3.23 master wants it */
 
2707
        net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0);
 
2708
        /*
 
2709
          If we wrote Create_file_log_event, then we need to write
 
2710
          Execute_load_log_event. If we did not write Create_file_log_event,
 
2711
          then this is an empty file and we can just do as if the LOAD DATA
 
2712
          INFILE had not existed, i.e. write nothing.
 
2713
        */
 
2714
        if (unlikely(cev_not_written))
 
2715
          break;
 
2716
        Execute_load_log_event xev(thd,0,0);
 
2717
        xev.log_pos = cev->log_pos;
 
2718
        if (unlikely(mi->rli.relay_log.append(&xev)))
 
2719
        {
 
2720
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2721
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2722
                     "error writing Exec_load event to relay log");
 
2723
          goto err;
 
2724
        }
 
2725
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
 
2726
        break;
 
2727
      }
 
2728
      if (unlikely(cev_not_written))
 
2729
      {
 
2730
        cev->block = net->read_pos;
 
2731
        cev->block_len = num_bytes;
 
2732
        if (unlikely(mi->rli.relay_log.append(cev)))
 
2733
        {
 
2734
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2735
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2736
                     "error writing Create_file event to relay log");
 
2737
          goto err;
 
2738
        }
 
2739
        cev_not_written=0;
 
2740
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
 
2741
      }
 
2742
      else
 
2743
      {
 
2744
        aev.block = net->read_pos;
 
2745
        aev.block_len = num_bytes;
 
2746
        aev.log_pos = cev->log_pos;
 
2747
        if (unlikely(mi->rli.relay_log.append(&aev)))
 
2748
        {
 
2749
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2750
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2751
                     "error writing Append_block event to relay log");
 
2752
          goto err;
 
2753
        }
 
2754
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
 
2755
      }
 
2756
    }
 
2757
  }
 
2758
  error=0;
 
2759
err:
 
2760
  DBUG_RETURN(error);
 
2761
}
 
2762
 
 
2763
 
 
2764
/*
 
2765
  Start using a new binary log on the master
 
2766
 
 
2767
  SYNOPSIS
 
2768
    process_io_rotate()
 
2769
    mi                  master_info for the slave
 
2770
    rev                 The rotate log event read from the binary log
 
2771
 
 
2772
  DESCRIPTION
 
2773
    Updates the master info with the place in the next binary
 
2774
    log where we should start reading.
 
2775
    Rotate the relay log to avoid mixed-format relay logs.
 
2776
 
 
2777
  NOTES
 
2778
    We assume we already locked mi->data_lock
 
2779
 
 
2780
  RETURN VALUES
 
2781
    0           ok
 
2782
    1           Log event is illegal
 
2783
 
 
2784
*/
 
2785
 
 
2786
static int process_io_rotate(Master_info *mi, Rotate_log_event *rev)
 
2787
{
 
2788
  DBUG_ENTER("process_io_rotate");
 
2789
  safe_mutex_assert_owner(&mi->data_lock);
 
2790
 
 
2791
  if (unlikely(!rev->is_valid()))
 
2792
    DBUG_RETURN(1);
 
2793
 
 
2794
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
 
2795
  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
 
2796
  mi->master_log_pos= rev->pos;
 
2797
  DBUG_PRINT("info", ("master_log_pos: '%s' %lu",
 
2798
                      mi->master_log_name, (ulong) mi->master_log_pos));
 
2799
#ifndef DBUG_OFF
 
2800
  /*
 
2801
    If we do not do this, we will be getting the first
 
2802
    rotate event forever, so we need to not disconnect after one.
 
2803
  */
 
2804
  if (disconnect_slave_event_count)
 
2805
    mi->events_till_disconnect++;
 
2806
#endif
 
2807
 
 
2808
  /*
 
2809
    If description_event_for_queue is format <4, there is conversion in the
 
2810
    relay log to the slave's format (4). And Rotate can mean upgrade or
 
2811
    nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
 
2812
    no need to reset description_event_for_queue now. And if it's nothing (same
 
2813
    master version as before), no need (still using the slave's format).
 
2814
  */
 
2815
  if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
 
2816
  {
 
2817
    delete mi->rli.relay_log.description_event_for_queue;
 
2818
    /* start from format 3 (MySQL 4.0) again */
 
2819
    mi->rli.relay_log.description_event_for_queue= new
 
2820
      Format_description_log_event(3);
 
2821
  }
 
2822
  /*
 
2823
    Rotate the relay log makes binlog format detection easier (at next slave
 
2824
    start or mysqlbinlog)
 
2825
  */
 
2826
  rotate_relay_log(mi); /* will take the right mutexes */
 
2827
  DBUG_RETURN(0);
 
2828
}
 
2829
 
 
2830
/*
 
2831
  Reads a 3.23 event and converts it to the slave's format. This code was
 
2832
  copied from MySQL 4.0.
 
2833
*/
 
2834
static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
 
2835
                           ulong event_len)
 
2836
{
 
2837
  const char *errmsg = 0;
 
2838
  ulong inc_pos;
 
2839
  bool ignore_event= 0;
 
2840
  char *tmp_buf = 0;
 
2841
  Relay_log_info *rli= &mi->rli;
 
2842
  DBUG_ENTER("queue_binlog_ver_1_event");
 
2843
 
 
2844
  /*
 
2845
    If we get Load event, we need to pass a non-reusable buffer
 
2846
    to read_log_event, so we do a trick
 
2847
  */
 
2848
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
 
2849
  {
 
2850
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
 
2851
    {
 
2852
      mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
 
2853
                 ER(ER_SLAVE_FATAL_ERROR), "Memory allocation failed");
 
2854
      DBUG_RETURN(1);
 
2855
    }
 
2856
    memcpy(tmp_buf,buf,event_len);
 
2857
    /*
 
2858
      Create_file constructor wants a 0 as last char of buffer, this 0 will
 
2859
      serve as the string-termination char for the file's name (which is at the
 
2860
      end of the buffer)
 
2861
      We must increment event_len, otherwise the event constructor will not see
 
2862
      this end 0, which leads to segfault.
 
2863
    */
 
2864
    tmp_buf[event_len++]=0;
 
2865
    int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
 
2866
    buf = (const char*)tmp_buf;
 
2867
  }
 
2868
  /*
 
2869
    This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
 
2870
    send the loaded file, and write it to the relay log in the form of
 
2871
    Append_block/Exec_load (the SQL thread needs the data, as that thread is not
 
2872
    connected to the master).
 
2873
  */
 
2874
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
 
2875
                                            mi->rli.relay_log.description_event_for_queue);
 
2876
  if (unlikely(!ev))
 
2877
  {
 
2878
    sql_print_error("Read invalid event from master: '%s',\
 
2879
 master could be corrupt but a more likely cause of this is a bug",
 
2880
                    errmsg);
 
2881
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
 
2882
    DBUG_RETURN(1);
 
2883
  }
 
2884
 
 
2885
  pthread_mutex_lock(&mi->data_lock);
 
2886
  ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
 
2887
  switch (ev->get_type_code()) {
 
2888
  case STOP_EVENT:
 
2889
    ignore_event= 1;
 
2890
    inc_pos= event_len;
 
2891
    break;
 
2892
  case ROTATE_EVENT:
 
2893
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
 
2894
    {
 
2895
      delete ev;
 
2896
      pthread_mutex_unlock(&mi->data_lock);
 
2897
      DBUG_RETURN(1);
 
2898
    }
 
2899
    inc_pos= 0;
 
2900
    break;
 
2901
  case CREATE_FILE_EVENT:
 
2902
    /*
 
2903
      Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
 
2904
      queue_old_event() which is for 3.23 events which don't comprise
 
2905
      CREATE_FILE_EVENT. This is because read_log_event() above has just
 
2906
      transformed LOAD_EVENT into CREATE_FILE_EVENT.
 
2907
    */
 
2908
  {
 
2909
    /* We come here when and only when tmp_buf != 0 */
 
2910
    DBUG_ASSERT(tmp_buf != 0);
 
2911
    inc_pos=event_len;
 
2912
    ev->log_pos+= inc_pos;
 
2913
    int error = process_io_create_file(mi,(Create_file_log_event*)ev);
 
2914
    delete ev;
 
2915
    mi->master_log_pos += inc_pos;
 
2916
    DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
 
2917
    pthread_mutex_unlock(&mi->data_lock);
 
2918
    my_free((char*)tmp_buf, MYF(0));
 
2919
    DBUG_RETURN(error);
 
2920
  }
 
2921
  default:
 
2922
    inc_pos= event_len;
 
2923
    break;
 
2924
  }
 
2925
  if (likely(!ignore_event))
 
2926
  {
 
2927
    if (ev->log_pos)
 
2928
      /*
 
2929
         Don't do it for fake Rotate events (see comment in
 
2930
      Log_event::Log_event(const char* buf...) in log_event.cc).
 
2931
      */
 
2932
      ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
 
2933
    if (unlikely(rli->relay_log.append(ev)))
 
2934
    {
 
2935
      delete ev;
 
2936
      pthread_mutex_unlock(&mi->data_lock);
 
2937
      DBUG_RETURN(1);
 
2938
    }
 
2939
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
2940
  }
 
2941
  delete ev;
 
2942
  mi->master_log_pos+= inc_pos;
 
2943
  DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
 
2944
  pthread_mutex_unlock(&mi->data_lock);
 
2945
  DBUG_RETURN(0);
 
2946
}
 
2947
 
 
2948
/*
 
2949
  Reads a 4.0 event and converts it to the slave's format. This code was copied
 
2950
  from queue_binlog_ver_1_event(), with some affordable simplifications.
 
2951
*/
 
2952
static int queue_binlog_ver_3_event(Master_info *mi, const char *buf,
 
2953
                           ulong event_len)
 
2954
{
 
2955
  const char *errmsg = 0;
 
2956
  ulong inc_pos;
 
2957
  char *tmp_buf = 0;
 
2958
  Relay_log_info *rli= &mi->rli;
 
2959
  DBUG_ENTER("queue_binlog_ver_3_event");
 
2960
 
 
2961
  /* read_log_event() will adjust log_pos to be end_log_pos */
 
2962
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
 
2963
                                            mi->rli.relay_log.description_event_for_queue);
 
2964
  if (unlikely(!ev))
 
2965
  {
 
2966
    sql_print_error("Read invalid event from master: '%s',\
 
2967
 master could be corrupt but a more likely cause of this is a bug",
 
2968
                    errmsg);
 
2969
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
 
2970
    DBUG_RETURN(1);
 
2971
  }
 
2972
  pthread_mutex_lock(&mi->data_lock);
 
2973
  switch (ev->get_type_code()) {
 
2974
  case STOP_EVENT:
 
2975
    goto err;
 
2976
  case ROTATE_EVENT:
 
2977
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
 
2978
    {
 
2979
      delete ev;
 
2980
      pthread_mutex_unlock(&mi->data_lock);
 
2981
      DBUG_RETURN(1);
 
2982
    }
 
2983
    inc_pos= 0;
 
2984
    break;
 
2985
  default:
 
2986
    inc_pos= event_len;
 
2987
    break;
 
2988
  }
 
2989
  if (unlikely(rli->relay_log.append(ev)))
 
2990
  {
 
2991
    delete ev;
 
2992
    pthread_mutex_unlock(&mi->data_lock);
 
2993
    DBUG_RETURN(1);
 
2994
  }
 
2995
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
2996
  delete ev;
 
2997
  mi->master_log_pos+= inc_pos;
 
2998
err:
 
2999
  DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
 
3000
  pthread_mutex_unlock(&mi->data_lock);
 
3001
  DBUG_RETURN(0);
 
3002
}
 
3003
 
 
3004
/*
 
3005
  queue_old_event()
 
3006
 
 
3007
  Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
 
3008
  (exactly, slave's) format. To do the conversion, we create a 5.0 event from
 
3009
  the 3.23/4.0 bytes, then write this event to the relay log.
 
3010
 
 
3011
  TODO:
 
3012
    Test this code before release - it has to be tested on a separate
 
3013
    setup with 3.23 master or 4.0 master
 
3014
*/
 
3015
 
 
3016
static int queue_old_event(Master_info *mi, const char *buf,
 
3017
                           ulong event_len)
 
3018
{
 
3019
  DBUG_ENTER("queue_old_event");
 
3020
 
 
3021
  switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
 
3022
  {
 
3023
  case 1:
 
3024
      DBUG_RETURN(queue_binlog_ver_1_event(mi,buf,event_len));
 
3025
  case 3:
 
3026
      DBUG_RETURN(queue_binlog_ver_3_event(mi,buf,event_len));
 
3027
  default: /* unsupported format; eg version 2 */
 
3028
    DBUG_PRINT("info",("unsupported binlog format %d in queue_old_event()",
 
3029
                       mi->rli.relay_log.description_event_for_queue->binlog_version));
 
3030
    DBUG_RETURN(1);
 
3031
  }
 
3032
}
 
3033
 
 
3034
/*
 
3035
  queue_event()
 
3036
 
 
3037
  If the event is 3.23/4.0, passes it to queue_old_event() which will convert
 
3038
  it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
 
3039
  no format conversion, it's pure read/write of bytes.
 
3040
  So a 5.0.0 slave's relay log can contain events in the slave's format or in
 
3041
  any >=5.0.0 format.
 
3042
*/
 
3043
 
 
3044
static int queue_event(Master_info* mi,const char* buf, ulong event_len)
 
3045
{
 
3046
  int error= 0;
 
3047
  String error_msg;
 
3048
  ulong inc_pos;
 
3049
  Relay_log_info *rli= &mi->rli;
 
3050
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
 
3051
  DBUG_ENTER("queue_event");
 
3052
 
 
3053
 
 
3054
  if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
 
3055
      buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
 
3056
    DBUG_RETURN(queue_old_event(mi,buf,event_len));
 
3057
 
 
3058
  pthread_mutex_lock(&mi->data_lock);
 
3059
 
 
3060
  switch (buf[EVENT_TYPE_OFFSET]) {
 
3061
  case STOP_EVENT:
 
3062
    /*
 
3063
      We needn't write this event to the relay log. Indeed, it just indicates a
 
3064
      master server shutdown. The only thing this does is cleaning. But
 
3065
      cleaning is already done on a per-master-thread basis (as the master
 
3066
      server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
 
3067
      prepared statements' deletion are TODO only when we binlog prep stmts).
 
3068
 
 
3069
      We don't even increment mi->master_log_pos, because we may be just after
 
3070
      a Rotate event. Btw, in a few milliseconds we are going to have a Start
 
3071
      event from the next binlog (unless the master is presently running
 
3072
      without --log-bin).
 
3073
    */
 
3074
    goto err;
 
3075
  case ROTATE_EVENT:
 
3076
  {
 
3077
    Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
 
3078
    if (unlikely(process_io_rotate(mi,&rev)))
 
3079
    {
 
3080
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
3081
      goto err;
 
3082
    }
 
3083
    /*
 
3084
      Now the I/O thread has just changed its mi->master_log_name, so
 
3085
      incrementing mi->master_log_pos is nonsense.
 
3086
    */
 
3087
    inc_pos= 0;
 
3088
    break;
 
3089
  }
 
3090
  case FORMAT_DESCRIPTION_EVENT:
 
3091
  {
 
3092
    /*
 
3093
      Create an event, and save it (when we rotate the relay log, we will have
 
3094
      to write this event again).
 
3095
    */
 
3096
    /*
 
3097
      We are the only thread which reads/writes description_event_for_queue.
 
3098
      The relay_log struct does not move (though some members of it can
 
3099
      change), so we needn't any lock (no rli->data_lock, no log lock).
 
3100
    */
 
3101
    Format_description_log_event* tmp;
 
3102
    const char* errmsg;
 
3103
    if (!(tmp= (Format_description_log_event*)
 
3104
          Log_event::read_log_event(buf, event_len, &errmsg,
 
3105
                                    mi->rli.relay_log.description_event_for_queue)))
 
3106
    {
 
3107
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
3108
      goto err;
 
3109
    }
 
3110
    delete mi->rli.relay_log.description_event_for_queue;
 
3111
    mi->rli.relay_log.description_event_for_queue= tmp;
 
3112
    /*
 
3113
       Though this does some conversion to the slave's format, this will
 
3114
       preserve the master's binlog format version, and number of event types.
 
3115
    */
 
3116
    /*
 
3117
       If the event was not requested by the slave (the slave did not ask for
 
3118
       it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
 
3119
    */
 
3120
    inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
 
3121
    DBUG_PRINT("info",("binlog format is now %d",
 
3122
                       mi->rli.relay_log.description_event_for_queue->binlog_version));
 
3123
 
 
3124
  }
 
3125
  break;
 
3126
 
 
3127
  case HEARTBEAT_LOG_EVENT:
 
3128
  {
 
3129
    /*
 
3130
      HB (heartbeat) cannot come before RL (Relay)
 
3131
    */
 
3132
    char  llbuf[22];
 
3133
    Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
 
3134
    if (!hb.is_valid())
 
3135
    {
 
3136
      error= ER_SLAVE_HEARTBEAT_FAILURE;
 
3137
      error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
 
3138
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
 
3139
      error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
 
3140
      error_msg.append(STRING_WITH_LEN(" log_pos "));
 
3141
      llstr(hb.log_pos, llbuf);
 
3142
      error_msg.append(llbuf, strlen(llbuf));
 
3143
      goto err;
 
3144
    }
 
3145
    mi->received_heartbeats++;
 
3146
    /* 
 
3147
       compare local and event's versions of log_file, log_pos.
 
3148
       
 
3149
       Heartbeat is sent only after an event corresponding to the corrdinates
 
3150
       the heartbeat carries.
 
3151
       Slave can not have a difference in coordinates except in the only
 
3152
       special case when mi->master_log_name, master_log_pos have never
 
3153
       been updated by Rotate event i.e when slave does not have any history
 
3154
       with the master (and thereafter mi->master_log_pos is NULL).
 
3155
 
 
3156
       TODO: handling `when' for SHOW SLAVE STATUS' snds behind
 
3157
    */
 
3158
    if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
 
3159
         && mi->master_log_name != NULL)
 
3160
        || mi->master_log_pos != hb.log_pos)
 
3161
    {
 
3162
      /* missed events of heartbeat from the past */
 
3163
      error= ER_SLAVE_HEARTBEAT_FAILURE;
 
3164
      error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
 
3165
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
 
3166
      error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
 
3167
      error_msg.append(STRING_WITH_LEN(" log_pos "));
 
3168
      llstr(hb.log_pos, llbuf);
 
3169
      error_msg.append(llbuf, strlen(llbuf));
 
3170
      goto err;
 
3171
    }
 
3172
    goto skip_relay_logging;
 
3173
  }
 
3174
  break;
 
3175
    
 
3176
  default:
 
3177
    inc_pos= event_len;
 
3178
    break;
 
3179
  }
 
3180
 
 
3181
  /*
 
3182
     If this event is originating from this server, don't queue it.
 
3183
     We don't check this for 3.23 events because it's simpler like this; 3.23
 
3184
     will be filtered anyway by the SQL slave thread which also tests the
 
3185
     server id (we must also keep this test in the SQL thread, in case somebody
 
3186
     upgrades a 4.0 slave which has a not-filtered relay log).
 
3187
 
 
3188
     ANY event coming from ourselves can be ignored: it is obvious for queries;
 
3189
     for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
 
3190
     (--log-slave-updates would not log that) unless this slave is also its
 
3191
     direct master (an unsupported, useless setup!).
 
3192
  */
 
3193
 
 
3194
  pthread_mutex_lock(log_lock);
 
3195
 
 
3196
  if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
 
3197
      !mi->rli.replicate_same_server_id)
 
3198
  {
 
3199
    /*
 
3200
      Do not write it to the relay log.
 
3201
      a) We still want to increment mi->master_log_pos, so that we won't
 
3202
      re-read this event from the master if the slave IO thread is now
 
3203
      stopped/restarted (more efficient if the events we are ignoring are big
 
3204
      LOAD DATA INFILE).
 
3205
      b) We want to record that we are skipping events, for the information of
 
3206
      the slave SQL thread, otherwise that thread may let
 
3207
      rli->group_relay_log_pos stay too small if the last binlog's event is
 
3208
      ignored.
 
3209
      But events which were generated by this slave and which do not exist in
 
3210
      the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
 
3211
      mi->master_log_pos.
 
3212
    */
 
3213
    if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
 
3214
        buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
 
3215
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
 
3216
    {
 
3217
      mi->master_log_pos+= inc_pos;
 
3218
      memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
 
3219
      DBUG_ASSERT(rli->ign_master_log_name_end[0]);
 
3220
      rli->ign_master_log_pos_end= mi->master_log_pos;
 
3221
    }
 
3222
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
 
3223
    DBUG_PRINT("info", ("master_log_pos: %lu, event originating from the same server, ignored",
 
3224
                        (ulong) mi->master_log_pos));
 
3225
  }
 
3226
  else
 
3227
  {
 
3228
    /* write the event to the relay log */
 
3229
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
 
3230
    {
 
3231
      mi->master_log_pos+= inc_pos;
 
3232
      DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
 
3233
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3234
    }
 
3235
    else
 
3236
    {
 
3237
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
3238
    }
 
3239
    rli->ign_master_log_name_end[0]= 0; // last event is not ignored
 
3240
  }
 
3241
  pthread_mutex_unlock(log_lock);
 
3242
 
 
3243
skip_relay_logging:
 
3244
  
 
3245
err:
 
3246
  pthread_mutex_unlock(&mi->data_lock);
 
3247
  DBUG_PRINT("info", ("error: %d", error));
 
3248
  if (error)
 
3249
    mi->report(ERROR_LEVEL, error, ER(error), 
 
3250
               (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
 
3251
               "could not queue event from master" :
 
3252
               error_msg.ptr());
 
3253
  DBUG_RETURN(error);
 
3254
}
 
3255
 
 
3256
 
 
3257
void end_relay_log_info(Relay_log_info* rli)
 
3258
{
 
3259
  DBUG_ENTER("end_relay_log_info");
 
3260
 
 
3261
  if (!rli->inited)
 
3262
    DBUG_VOID_RETURN;
 
3263
  if (rli->info_fd >= 0)
 
3264
  {
 
3265
    end_io_cache(&rli->info_file);
 
3266
    (void) my_close(rli->info_fd, MYF(MY_WME));
 
3267
    rli->info_fd = -1;
 
3268
  }
 
3269
  if (rli->cur_log_fd >= 0)
 
3270
  {
 
3271
    end_io_cache(&rli->cache_buf);
 
3272
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
 
3273
    rli->cur_log_fd = -1;
 
3274
  }
 
3275
  rli->inited = 0;
 
3276
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
3277
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3278
  /*
 
3279
    Delete the slave's temporary tables from memory.
 
3280
    In the future there will be other actions than this, to ensure persistance
 
3281
    of slave's temp tables after shutdown.
 
3282
  */
 
3283
  rli->close_temporary_tables();
 
3284
  DBUG_VOID_RETURN;
 
3285
}
 
3286
 
 
3287
/*
 
3288
  Try to connect until successful or slave killed
 
3289
 
 
3290
  SYNPOSIS
 
3291
    safe_connect()
 
3292
    thd                 Thread handler for slave
 
3293
    mysql               MySQL connection handle
 
3294
    mi                  Replication handle
 
3295
 
 
3296
  RETURN
 
3297
    0   ok
 
3298
    #   Error
 
3299
*/
 
3300
 
 
3301
static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi)
 
3302
{
 
3303
  DBUG_ENTER("safe_connect");
 
3304
 
 
3305
  DBUG_RETURN(connect_to_master(thd, mysql, mi, 0, 0));
 
3306
}
 
3307
 
 
3308
 
 
3309
/*
 
3310
  SYNPOSIS
 
3311
    connect_to_master()
 
3312
 
 
3313
  IMPLEMENTATION
 
3314
    Try to connect until successful or slave killed or we have retried
 
3315
    master_retry_count times
 
3316
*/
 
3317
 
 
3318
static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
 
3319
                             bool reconnect, bool suppress_warnings)
 
3320
{
 
3321
  int slave_was_killed;
 
3322
  int last_errno= -2;                           // impossible error
 
3323
  ulong err_count=0;
 
3324
  char llbuff[22];
 
3325
  DBUG_ENTER("connect_to_master");
 
3326
 
 
3327
#ifndef DBUG_OFF
 
3328
  mi->events_till_disconnect = disconnect_slave_event_count;
 
3329
#endif
 
3330
  ulong client_flag= CLIENT_REMEMBER_OPTIONS;
 
3331
  if (opt_slave_compressed_protocol)
 
3332
    client_flag=CLIENT_COMPRESS;                /* We will use compression */
 
3333
 
 
3334
  mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
 
3335
  mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
 
3336
 
 
3337
  mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
 
3338
  /* This one is not strictly needed but we have it here for completeness */
 
3339
  mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
 
3340
 
 
3341
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
 
3342
         (reconnect ? mysql_reconnect(mysql) != 0 :
 
3343
          mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
 
3344
                             mi->port, 0, client_flag) == 0))
 
3345
  {
 
3346
    /* Don't repeat last error */
 
3347
    if ((int)mysql_errno(mysql) != last_errno)
 
3348
    {
 
3349
      last_errno=mysql_errno(mysql);
 
3350
      suppress_warnings= 0;
 
3351
      mi->report(ERROR_LEVEL, last_errno,
 
3352
                 "error %s to master '%s@%s:%d'"
 
3353
                 " - retry-time: %d  retries: %lu",
 
3354
                 (reconnect ? "reconnecting" : "connecting"),
 
3355
                 mi->user, mi->host, mi->port,
 
3356
                 mi->connect_retry, master_retry_count);
 
3357
    }
 
3358
    /*
 
3359
      By default we try forever. The reason is that failure will trigger
 
3360
      master election, so if the user did not set master_retry_count we
 
3361
      do not want to have election triggered on the first failure to
 
3362
      connect
 
3363
    */
 
3364
    if (++err_count == master_retry_count)
 
3365
    {
 
3366
      slave_was_killed=1;
 
3367
      if (reconnect)
 
3368
        change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
 
3369
      break;
 
3370
    }
 
3371
    safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
 
3372
               (void*)mi);
 
3373
  }
 
3374
 
 
3375
  if (!slave_was_killed)
 
3376
  {
 
3377
    if (reconnect)
 
3378
    {
 
3379
      if (!suppress_warnings && global_system_variables.log_warnings)
 
3380
        sql_print_information("Slave: connected to master '%s@%s:%d',\
 
3381
replication resumed in log '%s' at position %s", mi->user,
 
3382
                        mi->host, mi->port,
 
3383
                        IO_RPL_LOG_NAME,
 
3384
                        llstr(mi->master_log_pos,llbuff));
 
3385
    }
 
3386
    else
 
3387
    {
 
3388
      change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
 
3389
      general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d",
 
3390
                        mi->user, mi->host, mi->port);
 
3391
    }
 
3392
#ifdef SIGNAL_WITH_VIO_CLOSE
 
3393
    thd->set_active_vio(mysql->net.vio);
 
3394
#endif
 
3395
  }
 
3396
  mysql->reconnect= 1;
 
3397
  DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed));
 
3398
  DBUG_RETURN(slave_was_killed);
 
3399
}
 
3400
 
 
3401
 
 
3402
/*
 
3403
  safe_reconnect()
 
3404
 
 
3405
  IMPLEMENTATION
 
3406
    Try to connect until successful or slave killed or we have retried
 
3407
    master_retry_count times
 
3408
*/
 
3409
 
 
3410
static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
 
3411
                          bool suppress_warnings)
 
3412
{
 
3413
  DBUG_ENTER("safe_reconnect");
 
3414
  DBUG_RETURN(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
 
3415
}
 
3416
 
 
3417
 
 
3418
/*
 
3419
  Store the file and position where the execute-slave thread are in the
 
3420
  relay log.
 
3421
 
 
3422
  SYNOPSIS
 
3423
    flush_relay_log_info()
 
3424
    rli                 Relay log information
 
3425
 
 
3426
  NOTES
 
3427
    - As this is only called by the slave thread, we don't need to
 
3428
      have a lock on this.
 
3429
    - If there is an active transaction, then we don't update the position
 
3430
      in the relay log.  This is to ensure that we re-execute statements
 
3431
      if we die in the middle of an transaction that was rolled back.
 
3432
    - As a transaction never spans binary logs, we don't have to handle the
 
3433
      case where we do a relay-log-rotation in the middle of the transaction.
 
3434
      If this would not be the case, we would have to ensure that we
 
3435
      don't delete the relay log file where the transaction started when
 
3436
      we switch to a new relay log file.
 
3437
 
 
3438
  TODO
 
3439
    - Change the log file information to a binary format to avoid calling
 
3440
      longlong2str.
 
3441
 
 
3442
  RETURN VALUES
 
3443
    0   ok
 
3444
    1   write error
 
3445
*/
 
3446
 
 
3447
bool flush_relay_log_info(Relay_log_info* rli)
 
3448
{
 
3449
  bool error=0;
 
3450
  DBUG_ENTER("flush_relay_log_info");
 
3451
 
 
3452
  if (unlikely(rli->no_storage))
 
3453
    DBUG_RETURN(0);
 
3454
 
 
3455
  IO_CACHE *file = &rli->info_file;
 
3456
  char buff[FN_REFLEN*2+22*2+4], *pos;
 
3457
 
 
3458
  my_b_seek(file, 0L);
 
3459
  pos=strmov(buff, rli->group_relay_log_name);
 
3460
  *pos++='\n';
 
3461
  pos=longlong2str(rli->group_relay_log_pos, pos, 10);
 
3462
  *pos++='\n';
 
3463
  pos=strmov(pos, rli->group_master_log_name);
 
3464
  *pos++='\n';
 
3465
  pos=longlong2str(rli->group_master_log_pos, pos, 10);
 
3466
  *pos='\n';
 
3467
  if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
 
3468
    error=1;
 
3469
  if (flush_io_cache(file))
 
3470
    error=1;
 
3471
 
 
3472
  /* Flushing the relay log is done by the slave I/O thread */
 
3473
  DBUG_RETURN(error);
 
3474
}
 
3475
 
 
3476
 
 
3477
/*
 
3478
  Called when we notice that the current "hot" log got rotated under our feet.
 
3479
*/
 
3480
 
 
3481
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
 
3482
{
 
3483
  DBUG_ENTER("reopen_relay_log");
 
3484
  DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
 
3485
  DBUG_ASSERT(rli->cur_log_fd == -1);
 
3486
 
 
3487
  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
 
3488
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
 
3489
                                   errmsg)) <0)
 
3490
    DBUG_RETURN(0);
 
3491
  /*
 
3492
    We want to start exactly where we was before:
 
3493
    relay_log_pos       Current log pos
 
3494
    pending             Number of bytes already processed from the event
 
3495
  */
 
3496
  rli->event_relay_log_pos= max(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE);
 
3497
  my_b_seek(cur_log,rli->event_relay_log_pos);
 
3498
  DBUG_RETURN(cur_log);
 
3499
}
 
3500
 
 
3501
 
 
3502
static Log_event* next_event(Relay_log_info* rli)
 
3503
{
 
3504
  Log_event* ev;
 
3505
  IO_CACHE* cur_log = rli->cur_log;
 
3506
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
 
3507
  const char* errmsg=0;
 
3508
  THD* thd = rli->sql_thd;
 
3509
  DBUG_ENTER("next_event");
 
3510
 
 
3511
  DBUG_ASSERT(thd != 0);
 
3512
 
 
3513
#ifndef DBUG_OFF
 
3514
  if (abort_slave_event_count && !rli->events_till_abort--)
 
3515
    DBUG_RETURN(0);
 
3516
#endif
 
3517
 
 
3518
  /*
 
3519
    For most operations we need to protect rli members with data_lock,
 
3520
    so we assume calling function acquired this mutex for us and we will
 
3521
    hold it for the most of the loop below However, we will release it
 
3522
    whenever it is worth the hassle,  and in the cases when we go into a
 
3523
    pthread_cond_wait() with the non-data_lock mutex
 
3524
  */
 
3525
  safe_mutex_assert_owner(&rli->data_lock);
 
3526
 
 
3527
  while (!sql_slave_killed(thd,rli))
 
3528
  {
 
3529
    /*
 
3530
      We can have two kinds of log reading:
 
3531
      hot_log:
 
3532
        rli->cur_log points at the IO_CACHE of relay_log, which
 
3533
        is actively being updated by the I/O thread. We need to be careful
 
3534
        in this case and make sure that we are not looking at a stale log that
 
3535
        has already been rotated. If it has been, we reopen the log.
 
3536
 
 
3537
      The other case is much simpler:
 
3538
        We just have a read only log that nobody else will be updating.
 
3539
    */
 
3540
    bool hot_log;
 
3541
    if ((hot_log = (cur_log != &rli->cache_buf)))
 
3542
    {
 
3543
      DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
 
3544
      pthread_mutex_lock(log_lock);
 
3545
 
 
3546
      /*
 
3547
        Reading xxx_file_id is safe because the log will only
 
3548
        be rotated when we hold relay_log.LOCK_log
 
3549
      */
 
3550
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
 
3551
      {
 
3552
        // The master has switched to a new log file; Reopen the old log file
 
3553
        cur_log=reopen_relay_log(rli, &errmsg);
 
3554
        pthread_mutex_unlock(log_lock);
 
3555
        if (!cur_log)                           // No more log files
 
3556
          goto err;
 
3557
        hot_log=0;                              // Using old binary log
 
3558
      }
 
3559
    }
 
3560
    /* 
 
3561
      As there is no guarantee that the relay is open (for example, an I/O
 
3562
      error during a write by the slave I/O thread may have closed it), we
 
3563
      have to test it.
 
3564
    */
 
3565
    if (!my_b_inited(cur_log))
 
3566
      goto err;
 
3567
#ifndef DBUG_OFF
 
3568
    {
 
3569
      /* This is an assertion which sometimes fails, let's try to track it */
 
3570
      char llbuf1[22], llbuf2[22];
 
3571
      DBUG_PRINT("info", ("my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
 
3572
                          llstr(my_b_tell(cur_log),llbuf1),
 
3573
                          llstr(rli->event_relay_log_pos,llbuf2)));
 
3574
      DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
 
3575
      DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
 
3576
    }
 
3577
#endif
 
3578
    /*
 
3579
      Relay log is always in new format - if the master is 3.23, the
 
3580
      I/O thread will convert the format for us.
 
3581
      A problem: the description event may be in a previous relay log. So if
 
3582
      the slave has been shutdown meanwhile, we would have to look in old relay
 
3583
      logs, which may even have been deleted. So we need to write this
 
3584
      description event at the beginning of the relay log.
 
3585
      When the relay log is created when the I/O thread starts, easy: the
 
3586
      master will send the description event and we will queue it.
 
3587
      But if the relay log is created by new_file(): then the solution is:
 
3588
      MYSQL_BIN_LOG::open() will write the buffered description event.
 
3589
    */
 
3590
    if ((ev=Log_event::read_log_event(cur_log,0,
 
3591
                                      rli->relay_log.description_event_for_exec)))
 
3592
 
 
3593
    {
 
3594
      DBUG_ASSERT(thd==rli->sql_thd);
 
3595
      /*
 
3596
        read it while we have a lock, to avoid a mutex lock in
 
3597
        inc_event_relay_log_pos()
 
3598
      */
 
3599
      rli->future_event_relay_log_pos= my_b_tell(cur_log);
 
3600
      if (hot_log)
 
3601
        pthread_mutex_unlock(log_lock);
 
3602
      DBUG_RETURN(ev);
 
3603
    }
 
3604
    DBUG_ASSERT(thd==rli->sql_thd);
 
3605
    if (opt_reckless_slave)                     // For mysql-test
 
3606
      cur_log->error = 0;
 
3607
    if (cur_log->error < 0)
 
3608
    {
 
3609
      errmsg = "slave SQL thread aborted because of I/O error";
 
3610
      if (hot_log)
 
3611
        pthread_mutex_unlock(log_lock);
 
3612
      goto err;
 
3613
    }
 
3614
    if (!cur_log->error) /* EOF */
 
3615
    {
 
3616
      /*
 
3617
        On a hot log, EOF means that there are no more updates to
 
3618
        process and we must block until I/O thread adds some and
 
3619
        signals us to continue
 
3620
      */
 
3621
      if (hot_log)
 
3622
      {
 
3623
        /*
 
3624
          We say in Seconds_Behind_Master that we have "caught up". Note that
 
3625
          for example if network link is broken but I/O slave thread hasn't
 
3626
          noticed it (slave_net_timeout not elapsed), then we'll say "caught
 
3627
          up" whereas we're not really caught up. Fixing that would require
 
3628
          internally cutting timeout in smaller pieces in network read, no
 
3629
          thanks. Another example: SQL has caught up on I/O, now I/O has read
 
3630
          a new event and is queuing it; the false "0" will exist until SQL
 
3631
          finishes executing the new event; it will be look abnormal only if
 
3632
          the events have old timestamps (then you get "many", 0, "many").
 
3633
 
 
3634
          Transient phases like this can be fixed with implemeting
 
3635
          Heartbeat event which provides the slave the status of the
 
3636
          master at time the master does not have any new update to send.
 
3637
          Seconds_Behind_Master would be zero only when master has no
 
3638
          more updates in binlog for slave. The heartbeat can be sent
 
3639
          in a (small) fraction of slave_net_timeout. Until it's done
 
3640
          rli->last_master_timestamp is temporarely (for time of
 
3641
          waiting for the following event) reset whenever EOF is
 
3642
          reached.
 
3643
        */
 
3644
        time_t save_timestamp= rli->last_master_timestamp;
 
3645
        rli->last_master_timestamp= 0;
 
3646
 
 
3647
        DBUG_ASSERT(rli->relay_log.get_open_count() ==
 
3648
                    rli->cur_log_old_open_count);
 
3649
 
 
3650
        if (rli->ign_master_log_name_end[0])
 
3651
        {
 
3652
          /* We generate and return a Rotate, to make our positions advance */
 
3653
          DBUG_PRINT("info",("seeing an ignored end segment"));
 
3654
          ev= new Rotate_log_event(rli->ign_master_log_name_end,
 
3655
                                   0, rli->ign_master_log_pos_end,
 
3656
                                   Rotate_log_event::DUP_NAME);
 
3657
          rli->ign_master_log_name_end[0]= 0;
 
3658
          pthread_mutex_unlock(log_lock);
 
3659
          if (unlikely(!ev))
 
3660
          {
 
3661
            errmsg= "Slave SQL thread failed to create a Rotate event "
 
3662
              "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
 
3663
            goto err;
 
3664
          }
 
3665
          ev->server_id= 0; // don't be ignored by slave SQL thread
 
3666
          DBUG_RETURN(ev);
 
3667
        }
 
3668
 
 
3669
        /*
 
3670
          We can, and should release data_lock while we are waiting for
 
3671
          update. If we do not, show slave status will block
 
3672
        */
 
3673
        pthread_mutex_unlock(&rli->data_lock);
 
3674
 
 
3675
        /*
 
3676
          Possible deadlock :
 
3677
          - the I/O thread has reached log_space_limit
 
3678
          - the SQL thread has read all relay logs, but cannot purge for some
 
3679
          reason:
 
3680
            * it has already purged all logs except the current one
 
3681
            * there are other logs than the current one but they're involved in
 
3682
            a transaction that finishes in the current one (or is not finished)
 
3683
          Solution :
 
3684
          Wake up the possibly waiting I/O thread, and set a boolean asking
 
3685
          the I/O thread to temporarily ignore the log_space_limit
 
3686
          constraint, because we do not want the I/O thread to block because of
 
3687
          space (it's ok if it blocks for any other reason (e.g. because the
 
3688
          master does not send anything). Then the I/O thread stops waiting
 
3689
          and reads more events.
 
3690
          The SQL thread decides when the I/O thread should take log_space_limit
 
3691
          into account again : ignore_log_space_limit is reset to 0
 
3692
          in purge_first_log (when the SQL thread purges the just-read relay
 
3693
          log), and also when the SQL thread starts. We should also reset
 
3694
          ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
 
3695
          fact, no need as RESET SLAVE requires that the slave
 
3696
          be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
 
3697
          it stops.
 
3698
        */
 
3699
        pthread_mutex_lock(&rli->log_space_lock);
 
3700
        // prevent the I/O thread from blocking next times
 
3701
        rli->ignore_log_space_limit= 1;
 
3702
        /*
 
3703
          If the I/O thread is blocked, unblock it.  Ok to broadcast
 
3704
          after unlock, because the mutex is only destroyed in
 
3705
          ~Relay_log_info(), i.e. when rli is destroyed, and rli will
 
3706
          not be destroyed before we exit the present function.
 
3707
        */
 
3708
        pthread_mutex_unlock(&rli->log_space_lock);
 
3709
        pthread_cond_broadcast(&rli->log_space_cond);
 
3710
        // Note that wait_for_update_relay_log unlocks lock_log !
 
3711
        rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
 
3712
        // re-acquire data lock since we released it earlier
 
3713
        pthread_mutex_lock(&rli->data_lock);
 
3714
        rli->last_master_timestamp= save_timestamp;
 
3715
        continue;
 
3716
      }
 
3717
      /*
 
3718
        If the log was not hot, we need to move to the next log in
 
3719
        sequence. The next log could be hot or cold, we deal with both
 
3720
        cases separately after doing some common initialization
 
3721
      */
 
3722
      end_io_cache(cur_log);
 
3723
      DBUG_ASSERT(rli->cur_log_fd >= 0);
 
3724
      my_close(rli->cur_log_fd, MYF(MY_WME));
 
3725
      rli->cur_log_fd = -1;
 
3726
 
 
3727
      if (relay_log_purge)
 
3728
      {
 
3729
        /*
 
3730
          purge_first_log will properly set up relay log coordinates in rli.
 
3731
          If the group's coordinates are equal to the event's coordinates
 
3732
          (i.e. the relay log was not rotated in the middle of a group),
 
3733
          we can purge this relay log too.
 
3734
          We do ulonglong and string comparisons, this may be slow but
 
3735
          - purging the last relay log is nice (it can save 1GB of disk), so we
 
3736
          like to detect the case where we can do it, and given this,
 
3737
          - I see no better detection method
 
3738
          - purge_first_log is not called that often
 
3739
        */
 
3740
        if (rli->relay_log.purge_first_log
 
3741
            (rli,
 
3742
             rli->group_relay_log_pos == rli->event_relay_log_pos
 
3743
             && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
 
3744
        {
 
3745
          errmsg = "Error purging processed logs";
 
3746
          goto err;
 
3747
        }
 
3748
      }
 
3749
      else
 
3750
      {
 
3751
        /*
 
3752
          If hot_log is set, then we already have a lock on
 
3753
          LOCK_log.  If not, we have to get the lock.
 
3754
 
 
3755
          According to Sasha, the only time this code will ever be executed
 
3756
          is if we are recovering from a bug.
 
3757
        */
 
3758
        if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
 
3759
        {
 
3760
          errmsg = "error switching to the next log";
 
3761
          goto err;
 
3762
        }
 
3763
        rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
 
3764
        strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
 
3765
                sizeof(rli->event_relay_log_name)-1);
 
3766
        flush_relay_log_info(rli);
 
3767
      }
 
3768
 
 
3769
      /*
 
3770
        Now we want to open this next log. To know if it's a hot log (the one
 
3771
        being written by the I/O thread now) or a cold log, we can use
 
3772
        is_active(); if it is hot, we use the I/O cache; if it's cold we open
 
3773
        the file normally. But if is_active() reports that the log is hot, this
 
3774
        may change between the test and the consequence of the test. So we may
 
3775
        open the I/O cache whereas the log is now cold, which is nonsense.
 
3776
        To guard against this, we need to have LOCK_log.
 
3777
      */
 
3778
 
 
3779
      DBUG_PRINT("info",("hot_log: %d",hot_log));
 
3780
      if (!hot_log) /* if hot_log, we already have this mutex */
 
3781
        pthread_mutex_lock(log_lock);
 
3782
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
 
3783
      {
 
3784
#ifdef EXTRA_DEBUG
 
3785
        if (global_system_variables.log_warnings)
 
3786
          sql_print_information("next log '%s' is currently active",
 
3787
                                rli->linfo.log_file_name);
 
3788
#endif
 
3789
        rli->cur_log= cur_log= rli->relay_log.get_log_file();
 
3790
        rli->cur_log_old_open_count= rli->relay_log.get_open_count();
 
3791
        DBUG_ASSERT(rli->cur_log_fd == -1);
 
3792
 
 
3793
        /*
 
3794
          Read pointer has to be at the start since we are the only
 
3795
          reader.
 
3796
          We must keep the LOCK_log to read the 4 first bytes, as this is a hot
 
3797
          log (same as when we call read_log_event() above: for a hot log we
 
3798
          take the mutex).
 
3799
        */
 
3800
        if (check_binlog_magic(cur_log,&errmsg))
 
3801
        {
 
3802
          if (!hot_log) pthread_mutex_unlock(log_lock);
 
3803
          goto err;
 
3804
        }
 
3805
        if (!hot_log) pthread_mutex_unlock(log_lock);
 
3806
        continue;
 
3807
      }
 
3808
      if (!hot_log) pthread_mutex_unlock(log_lock);
 
3809
      /*
 
3810
        if we get here, the log was not hot, so we will have to open it
 
3811
        ourselves. We are sure that the log is still not hot now (a log can get
 
3812
        from hot to cold, but not from cold to hot). No need for LOCK_log.
 
3813
      */
 
3814
#ifdef EXTRA_DEBUG
 
3815
      if (global_system_variables.log_warnings)
 
3816
        sql_print_information("next log '%s' is not active",
 
3817
                              rli->linfo.log_file_name);
 
3818
#endif
 
3819
      // open_binlog() will check the magic header
 
3820
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
 
3821
                                       &errmsg)) <0)
 
3822
        goto err;
 
3823
    }
 
3824
    else
 
3825
    {
 
3826
      /*
 
3827
        Read failed with a non-EOF error.
 
3828
        TODO: come up with something better to handle this error
 
3829
      */
 
3830
      if (hot_log)
 
3831
        pthread_mutex_unlock(log_lock);
 
3832
      sql_print_error("Slave SQL thread: I/O error reading \
 
3833
event(errno: %d  cur_log->error: %d)",
 
3834
                      my_errno,cur_log->error);
 
3835
      // set read position to the beginning of the event
 
3836
      my_b_seek(cur_log,rli->event_relay_log_pos);
 
3837
      /* otherwise, we have had a partial read */
 
3838
      errmsg = "Aborting slave SQL thread because of partial event read";
 
3839
      break;                                    // To end of function
 
3840
    }
 
3841
  }
 
3842
  if (!errmsg && global_system_variables.log_warnings)
 
3843
  {
 
3844
    sql_print_information("Error reading relay log event: %s",
 
3845
                          "slave SQL thread was killed");
 
3846
    DBUG_RETURN(0);
 
3847
  }
 
3848
 
 
3849
err:
 
3850
  if (errmsg)
 
3851
    sql_print_error("Error reading relay log event: %s", errmsg);
 
3852
  DBUG_RETURN(0);
 
3853
}
 
3854
 
 
3855
/*
 
3856
  Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
 
3857
  because of size is simpler because when we do it we already have all relevant
 
3858
  locks; here we don't, so this function is mainly taking locks).
 
3859
  Returns nothing as we cannot catch any error (MYSQL_BIN_LOG::new_file()
 
3860
  is void).
 
3861
*/
 
3862
 
 
3863
void rotate_relay_log(Master_info* mi)
 
3864
{
 
3865
  DBUG_ENTER("rotate_relay_log");
 
3866
  Relay_log_info* rli= &mi->rli;
 
3867
 
 
3868
  /* We don't lock rli->run_lock. This would lead to deadlocks. */
 
3869
  pthread_mutex_lock(&mi->run_lock);
 
3870
 
 
3871
  /*
 
3872
     We need to test inited because otherwise, new_file() will attempt to lock
 
3873
     LOCK_log, which may not be inited (if we're not a slave).
 
3874
  */
 
3875
  if (!rli->inited)
 
3876
  {
 
3877
    DBUG_PRINT("info", ("rli->inited == 0"));
 
3878
    goto end;
 
3879
  }
 
3880
 
 
3881
  /* If the relay log is closed, new_file() will do nothing. */
 
3882
  rli->relay_log.new_file();
 
3883
 
 
3884
  /*
 
3885
    We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
 
3886
    be counted, so imagine a succession of FLUSH LOGS  and assume the slave
 
3887
    threads are started:
 
3888
    relay_log_space decreases by the size of the deleted relay log, but does
 
3889
    not increase, so flush-after-flush we may become negative, which is wrong.
 
3890
    Even if this will be corrected as soon as a query is replicated on the
 
3891
    slave (because the I/O thread will then call harvest_bytes_written() which
 
3892
    will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
 
3893
    output in SHOW SLAVE STATUS meanwhile. So we harvest now.
 
3894
    If the log is closed, then this will just harvest the last writes, probably
 
3895
    0 as they probably have been harvested.
 
3896
  */
 
3897
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3898
end:
 
3899
  pthread_mutex_unlock(&mi->run_lock);
 
3900
  DBUG_VOID_RETURN;
 
3901
}
 
3902
 
 
3903
 
 
3904
/**
 
3905
   Detects, based on master's version (as found in the relay log), if master
 
3906
   has a certain bug.
 
3907
   @param rli Relay_log_info which tells the master's version
 
3908
   @param bug_id Number of the bug as found in bugs.mysql.com
 
3909
   @param report bool report error message, default TRUE
 
3910
   @return TRUE if master has the bug, FALSE if it does not.
 
3911
*/
 
3912
bool rpl_master_has_bug(Relay_log_info *rli, uint bug_id, bool report)
 
3913
{
 
3914
  struct st_version_range_for_one_bug {
 
3915
    uint        bug_id;
 
3916
    const uchar introduced_in[3]; // first version with bug
 
3917
    const uchar fixed_in[3];      // first version with fix
 
3918
  };
 
3919
  static struct st_version_range_for_one_bug versions_for_all_bugs[]=
 
3920
  {
 
3921
    {24432, { 5, 0, 24 }, { 5, 0, 38 } },
 
3922
    {24432, { 5, 1, 12 }, { 5, 1, 17 } },
 
3923
    {33029, { 5, 0,  0 }, { 5, 0, 58 } },
 
3924
    {33029, { 5, 1,  0 }, { 5, 1, 12 } },
 
3925
  };
 
3926
  const uchar *master_ver=
 
3927
    rli->relay_log.description_event_for_exec->server_version_split;
 
3928
 
 
3929
  DBUG_ASSERT(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
 
3930
 
 
3931
  for (uint i= 0;
 
3932
       i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
 
3933
  {
 
3934
    const uchar *introduced_in= versions_for_all_bugs[i].introduced_in,
 
3935
      *fixed_in= versions_for_all_bugs[i].fixed_in;
 
3936
    if ((versions_for_all_bugs[i].bug_id == bug_id) &&
 
3937
        (memcmp(introduced_in, master_ver, 3) <= 0) &&
 
3938
        (memcmp(fixed_in,      master_ver, 3) >  0))
 
3939
    {
 
3940
      if (!report)
 
3941
        return TRUE;
 
3942
      
 
3943
      // a short message for SHOW SLAVE STATUS (message length constraints)
 
3944
      my_printf_error(ER_UNKNOWN_ERROR, "master may suffer from"
 
3945
                      " http://bugs.mysql.com/bug.php?id=%u"
 
3946
                      " so slave stops; check error log on slave"
 
3947
                      " for more info", MYF(0), bug_id);
 
3948
      // a verbose message for the error log
 
3949
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
 
3950
                  "According to the master's version ('%s'),"
 
3951
                  " it is probable that master suffers from this bug:"
 
3952
                      " http://bugs.mysql.com/bug.php?id=%u"
 
3953
                      " and thus replicating the current binary log event"
 
3954
                      " may make the slave's data become different from the"
 
3955
                      " master's data."
 
3956
                      " To take no risk, slave refuses to replicate"
 
3957
                      " this event and stops."
 
3958
                      " We recommend that all updates be stopped on the"
 
3959
                      " master and slave, that the data of both be"
 
3960
                      " manually synchronized,"
 
3961
                      " that master's binary logs be deleted,"
 
3962
                      " that master be upgraded to a version at least"
 
3963
                      " equal to '%d.%d.%d'. Then replication can be"
 
3964
                      " restarted.",
 
3965
                      rli->relay_log.description_event_for_exec->server_version,
 
3966
                      bug_id,
 
3967
                      fixed_in[0], fixed_in[1], fixed_in[2]);
 
3968
      return TRUE;
 
3969
    }
 
3970
  }
 
3971
  return FALSE;
 
3972
}
 
3973
 
 
3974
/**
 
3975
   BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
 
3976
   exclusive, if one statement in a SP generated AUTO_INCREMENT value
 
3977
   by the top statement, all statements after it would be considered
 
3978
   generated AUTO_INCREMENT value by the top statement, and a
 
3979
   erroneous INSERT_ID value might be associated with these statement,
 
3980
   which could cause duplicate entry error and stop the slave.
 
3981
 
 
3982
   Detect buggy master to work around.
 
3983
 */
 
3984
bool rpl_master_erroneous_autoinc(THD *thd)
 
3985
{
 
3986
  if (active_mi && active_mi->rli.sql_thd == thd)
 
3987
  {
 
3988
    Relay_log_info *rli= &active_mi->rli;
 
3989
    return rpl_master_has_bug(rli, 33029, FALSE);
 
3990
  }
 
3991
  return FALSE;
 
3992
}
 
3993
 
 
3994
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
3995
template class I_List_iterator<i_string>;
 
3996
template class I_List_iterator<i_string_pair>;
 
3997
#endif
 
3998
 
 
3999
/**
 
4000
  @} (end of group Replication)
 
4001
*/
 
4002
 
 
4003
#endif /* HAVE_REPLICATION */