~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/slave.cc

  • Committer: Brian Aker
  • Date: 2009-05-11 17:50:22 UTC
  • Revision ID: brian@gaz-20090511175022-y35q9ky6uh9ldcjt
Replacing Sun employee copyright headers (aka... anything done by a Sun
employee is copyright by Sun).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2000-2003 DRIZZLE AB
2
 
 
3
 
   This program is free software; you can redistribute it and/or modify
4
 
   it under the terms of the GNU General Public License as published by
5
 
   the Free Software Foundation; version 2 of the License.
6
 
 
7
 
   This program is distributed in the hope that it will be useful,
8
 
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 
   GNU General Public License for more details.
11
 
 
12
 
   You should have received a copy of the GNU General Public License
13
 
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
 
 
16
 
 
17
 
/**
18
 
  @addtogroup Replication
19
 
  @{
20
 
 
21
 
  @file
22
 
 
23
 
  @brief Code to run the io thread and the sql thread on the
24
 
  replication slave.
25
 
*/
26
 
#include <drizzled/server_includes.h>
27
 
 
28
 
#include <storage/myisam/myisam.h>
29
 
#include "rpl_mi.h"
30
 
#include "rpl_rli.h"
31
 
#include "sql_repl.h"
32
 
#include "rpl_filter.h"
33
 
#include "repl_failsafe.h"
34
 
#include <mysys/thr_alarm.h>
35
 
#include <libdrizzle/sql_common.h>
36
 
#include <libdrizzle/errmsg.h>
37
 
#include <mysys/mysys_err.h>
38
 
#include <drizzled/drizzled_error_messages.h>
39
 
 
40
 
#include "rpl_tblmap.h"
41
 
 
42
 
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
43
 
 
44
 
#define MAX_SLAVE_RETRY_PAUSE 5
45
 
bool use_slave_mask = 0;
46
 
MY_BITMAP slave_error_mask;
47
 
 
48
 
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
49
 
 
50
 
char* slave_load_tmpdir = 0;
51
 
Master_info *active_mi= 0;
52
 
bool replicate_same_server_id;
53
 
uint64_t relay_log_space_limit = 0;
54
 
 
55
 
/*
56
 
  When slave thread exits, we need to remember the temporary tables so we
57
 
  can re-use them on slave start.
58
 
 
59
 
  TODO: move the vars below under Master_info
60
 
*/
61
 
 
62
 
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
63
 
int32_t events_till_abort = -1;
64
 
 
65
 
enum enum_slave_reconnect_actions
66
 
{
67
 
  SLAVE_RECON_ACT_REG= 0,
68
 
  SLAVE_RECON_ACT_DUMP= 1,
69
 
  SLAVE_RECON_ACT_EVENT= 2,
70
 
  SLAVE_RECON_ACT_MAX
71
 
};
72
 
 
73
 
enum enum_slave_reconnect_messages
74
 
{
75
 
  SLAVE_RECON_MSG_WAIT= 0,
76
 
  SLAVE_RECON_MSG_KILLED_WAITING= 1,
77
 
  SLAVE_RECON_MSG_AFTER= 2,
78
 
  SLAVE_RECON_MSG_FAILED= 3,
79
 
  SLAVE_RECON_MSG_COMMAND= 4,
80
 
  SLAVE_RECON_MSG_KILLED_AFTER= 5,
81
 
  SLAVE_RECON_MSG_MAX
82
 
};
83
 
 
84
 
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
85
 
{
86
 
  {
87
 
    N_("Waiting to reconnect after a failed registration on master"),
88
 
    N_("Slave I/O thread killed while waitnig to reconnect after a "
89
 
                 "failed registration on master"),
90
 
    N_("Reconnecting after a failed registration on master"),
91
 
    N_("failed registering on master, reconnecting to try again, "
92
 
                 "log '%s' at postion %s"),
93
 
    "COM_REGISTER_SLAVE",
94
 
    N_("Slave I/O thread killed during or after reconnect")
95
 
  },
96
 
  {
97
 
    N_("Waiting to reconnect after a failed binlog dump request"),
98
 
    N_("Slave I/O thread killed while retrying master dump"),
99
 
    N_("Reconnecting after a failed binlog dump request"),
100
 
    N_("failed dump request, reconnecting to try again, "
101
 
                 "log '%s' at postion %s"),
102
 
    "COM_BINLOG_DUMP",
103
 
    N_("Slave I/O thread killed during or after reconnect")
104
 
  },
105
 
  {
106
 
    N_("Waiting to reconnect after a failed master event read"),
107
 
    N_("Slave I/O thread killed while waiting to reconnect "
108
 
                 "after a failed read"),
109
 
    N_("Reconnecting after a failed master event read"),
110
 
    N_("Slave I/O thread: Failed reading log event, "
111
 
                 "reconnecting to retry, log '%s' at postion %s"),
112
 
    "",
113
 
    N_("Slave I/O thread killed during or after a "
114
 
                 "reconnect done to recover from failed read")
115
 
  }
116
 
};
117
 
 
118
 
 
119
 
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
120
 
 
121
 
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
122
 
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
123
 
static bool wait_for_relay_log_space(Relay_log_info* rli);
124
 
static inline bool io_slave_killed(THD* thd,Master_info* mi);
125
 
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
126
 
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
127
 
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi);
128
 
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
129
 
                          bool suppress_warnings);
130
 
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
131
 
                             bool reconnect, bool suppress_warnings);
132
 
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
133
 
                      void* thread_killed_arg);
134
 
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
135
 
static Log_event* next_event(Relay_log_info* rli);
136
 
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
137
 
static int32_t terminate_slave_thread(THD *thd,
138
 
                                  pthread_mutex_t* term_lock,
139
 
                                  pthread_cond_t* term_cond,
140
 
                                  volatile uint32_t *slave_running,
141
 
                                  bool skip_lock);
142
 
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
143
 
 
144
 
/*
145
 
  Find out which replications threads are running
146
 
 
147
 
  SYNOPSIS
148
 
    init_thread_mask()
149
 
    mask                Return value here
150
 
    mi                  master_info for slave
151
 
    inverse             If set, returns which threads are not running
152
 
 
153
 
  IMPLEMENTATION
154
 
    Get a bit mask for which threads are running so that we can later restart
155
 
    these threads.
156
 
 
157
 
  RETURN
158
 
    mask        If inverse == 0, running threads
159
 
                If inverse == 1, stopped threads
160
 
*/
161
 
 
162
 
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
163
 
{
164
 
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
165
 
  register int32_t tmp_mask=0;
166
 
 
167
 
  if (set_io)
168
 
    tmp_mask |= SLAVE_IO;
169
 
  if (set_sql)
170
 
    tmp_mask |= SLAVE_SQL;
171
 
  if (inverse)
172
 
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
173
 
  *mask = tmp_mask;
174
 
  return;
175
 
}
176
 
 
177
 
 
178
 
/*
179
 
  lock_slave_threads()
180
 
*/
181
 
 
182
 
void lock_slave_threads(Master_info* mi)
183
 
{
184
 
  //TODO: see if we can do this without dual mutex
185
 
  pthread_mutex_lock(&mi->run_lock);
186
 
  pthread_mutex_lock(&mi->rli.run_lock);
187
 
  return;
188
 
}
189
 
 
190
 
 
191
 
/*
192
 
  unlock_slave_threads()
193
 
*/
194
 
 
195
 
void unlock_slave_threads(Master_info* mi)
196
 
{
197
 
  //TODO: see if we can do this without dual mutex
198
 
  pthread_mutex_unlock(&mi->rli.run_lock);
199
 
  pthread_mutex_unlock(&mi->run_lock);
200
 
  return;
201
 
}
202
 
 
203
 
 
204
 
/* Initialize slave structures */
205
 
 
206
 
int32_t init_slave()
207
 
{
208
 
  /*
209
 
    This is called when mysqld starts. Before client connections are
210
 
    accepted. However bootstrap may conflict with us if it does START SLAVE.
211
 
    So it's safer to take the lock.
212
 
  */
213
 
  pthread_mutex_lock(&LOCK_active_mi);
214
 
  /*
215
 
    TODO: re-write this to interate through the list of files
216
 
    for multi-master
217
 
  */
218
 
  active_mi= new Master_info;
219
 
 
220
 
  /*
221
 
    If master_host is not specified, try to read it from the master_info file.
222
 
    If master_host is specified, create the master_info file if it doesn't
223
 
    exists.
224
 
  */
225
 
  if (!active_mi)
226
 
  {
227
 
    sql_print_error(_("Failed to allocate memory for the master info structure"));
228
 
    goto err;
229
 
  }
230
 
 
231
 
  if (init_master_info(active_mi,master_info_file,relay_log_info_file,
232
 
                       1, (SLAVE_IO | SLAVE_SQL)))
233
 
  {
234
 
    sql_print_error(_("Failed to initialize the master info structure"));
235
 
    goto err;
236
 
  }
237
 
 
238
 
  /* If server id is not set, start_slave_thread() will say it */
239
 
 
240
 
  if (active_mi->host[0] && !opt_skip_slave_start)
241
 
  {
242
 
    if (start_slave_threads(1 /* need mutex */,
243
 
                            0 /* no wait for start*/,
244
 
                            active_mi,
245
 
                            master_info_file,
246
 
                            relay_log_info_file,
247
 
                            SLAVE_IO | SLAVE_SQL))
248
 
    {
249
 
      sql_print_error(_("Failed to create slave threads"));
250
 
      goto err;
251
 
    }
252
 
  }
253
 
  pthread_mutex_unlock(&LOCK_active_mi);
254
 
  return(0);
255
 
 
256
 
err:
257
 
  pthread_mutex_unlock(&LOCK_active_mi);
258
 
  return(1);
259
 
}
260
 
 
261
 
 
262
 
/*
263
 
  Init function to set up array for errors that should be skipped for slave
264
 
 
265
 
  SYNOPSIS
266
 
    init_slave_skip_errors()
267
 
    arg         List of errors numbers to skip, separated with ','
268
 
 
269
 
  NOTES
270
 
    Called from get_options() in mysqld.cc on start-up
271
 
*/
272
 
 
273
 
void init_slave_skip_errors(const char* arg)
274
 
{
275
 
  const char *p;
276
 
 
277
 
  if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
278
 
  {
279
 
    fprintf(stderr, "Badly out of memory, please check your system status\n");
280
 
    exit(1);
281
 
  }
282
 
  use_slave_mask = 1;
283
 
  for (;my_isspace(system_charset_info,*arg);++arg)
284
 
    /* empty */;
285
 
  if (!my_strnncoll(system_charset_info,(unsigned char*)arg,4,(const unsigned char*)"all",4))
286
 
  {
287
 
    bitmap_set_all(&slave_error_mask);
288
 
    return;
289
 
  }
290
 
  for (p= arg ; *p; )
291
 
  {
292
 
    long err_code;
293
 
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
294
 
      break;
295
 
    if (err_code < MAX_SLAVE_ERROR)
296
 
       bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
297
 
    while (!my_isdigit(system_charset_info,*p) && *p)
298
 
      p++;
299
 
  }
300
 
  return;
301
 
}
302
 
 
303
 
 
304
 
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
305
 
{
306
 
  if (!mi->inited)
307
 
    return(0); /* successfully do nothing */
308
 
  int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
309
 
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
310
 
 
311
 
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
312
 
  {
313
 
    mi->abort_slave=1;
314
 
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
315
 
                                      &mi->stop_cond,
316
 
                                      &mi->slave_running,
317
 
                                      skip_lock)) &&
318
 
        !force_all)
319
 
      return(error);
320
 
  }
321
 
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
322
 
  {
323
 
    mi->rli.abort_slave=1;
324
 
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
325
 
                                      &mi->rli.stop_cond,
326
 
                                      &mi->rli.slave_running,
327
 
                                      skip_lock)) &&
328
 
        !force_all)
329
 
      return(error);
330
 
  }
331
 
  return(0);
332
 
}
333
 
 
334
 
 
335
 
/**
336
 
   Wait for a slave thread to terminate.
337
 
 
338
 
   This function is called after requesting the thread to terminate
339
 
   (by setting @c abort_slave member of @c Relay_log_info or @c
340
 
   Master_info structure to 1). Termination of the thread is
341
 
   controlled with the the predicate <code>*slave_running</code>.
342
 
 
343
 
   Function will acquire @c term_lock before waiting on the condition
344
 
   unless @c skip_lock is true in which case the mutex should be owned
345
 
   by the caller of this function and will remain acquired after
346
 
   return from the function.
347
 
 
348
 
   @param term_lock
349
 
          Associated lock to use when waiting for @c term_cond
350
 
 
351
 
   @param term_cond
352
 
          Condition that is signalled when the thread has terminated
353
 
 
354
 
   @param slave_running
355
 
          Pointer to predicate to check for slave thread termination
356
 
 
357
 
   @param skip_lock
358
 
          If @c true the lock will not be acquired before waiting on
359
 
          the condition. In this case, it is assumed that the calling
360
 
          function acquires the lock before calling this function.
361
 
 
362
 
   @retval 0 All OK
363
 
 */
364
 
static int32_t
365
 
terminate_slave_thread(THD *thd,
366
 
                       pthread_mutex_t* term_lock,
367
 
                       pthread_cond_t* term_cond,
368
 
                       volatile uint32_t *slave_running,
369
 
                       bool skip_lock)
370
 
{
371
 
  int32_t error;
372
 
 
373
 
  if (!skip_lock)
374
 
    pthread_mutex_lock(term_lock);
375
 
 
376
 
  safe_mutex_assert_owner(term_lock);
377
 
 
378
 
  if (!*slave_running)
379
 
  {
380
 
    if (!skip_lock)
381
 
      pthread_mutex_unlock(term_lock);
382
 
    return(ER_SLAVE_NOT_RUNNING);
383
 
  }
384
 
  assert(thd != 0);
385
 
  THD_CHECK_SENTRY(thd);
386
 
 
387
 
  /*
388
 
    Is is critical to test if the slave is running. Otherwise, we might
389
 
    be referening freed memory trying to kick it
390
 
  */
391
 
 
392
 
  while (*slave_running)                        // Should always be true
393
 
  {
394
 
    pthread_mutex_lock(&thd->LOCK_delete);
395
 
#ifndef DONT_USE_THR_ALARM
396
 
    /*
397
 
      Error codes from pthread_kill are:
398
 
      EINVAL: invalid signal number (can't happen)
399
 
      ESRCH: thread already killed (can happen, should be ignored)
400
 
    */
401
 
    int32_t err= pthread_kill(thd->real_id, thr_client_alarm);
402
 
    assert(err != EINVAL);
403
 
#endif
404
 
    thd->awake(THD::NOT_KILLED);
405
 
    pthread_mutex_unlock(&thd->LOCK_delete);
406
 
 
407
 
    /*
408
 
      There is a small chance that slave thread might miss the first
409
 
      alarm. To protect againts it, resend the signal until it reacts
410
 
    */
411
 
    struct timespec abstime;
412
 
    set_timespec(abstime,2);
413
 
    error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
414
 
    assert(error == ETIMEDOUT || error == 0);
415
 
  }
416
 
 
417
 
  assert(*slave_running == 0);
418
 
 
419
 
  if (!skip_lock)
420
 
    pthread_mutex_unlock(term_lock);
421
 
  return(0);
422
 
}
423
 
 
424
 
 
425
 
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
426
 
                           pthread_mutex_t *cond_lock,
427
 
                           pthread_cond_t *start_cond,
428
 
                           volatile uint32_t *slave_running,
429
 
                           volatile uint32_t *slave_run_id,
430
 
                           Master_info* mi,
431
 
                           bool high_priority)
432
 
{
433
 
  pthread_t th;
434
 
  uint32_t start_id;
435
 
 
436
 
  assert(mi->inited);
437
 
 
438
 
  if (start_lock)
439
 
    pthread_mutex_lock(start_lock);
440
 
  if (!server_id)
441
 
  {
442
 
    if (start_cond)
443
 
      pthread_cond_broadcast(start_cond);
444
 
    if (start_lock)
445
 
      pthread_mutex_unlock(start_lock);
446
 
    sql_print_error(_("Server id not set, will not start slave"));
447
 
    return(ER_BAD_SLAVE);
448
 
  }
449
 
 
450
 
  if (*slave_running)
451
 
  {
452
 
    if (start_cond)
453
 
      pthread_cond_broadcast(start_cond);
454
 
    if (start_lock)
455
 
      pthread_mutex_unlock(start_lock);
456
 
    return(ER_SLAVE_MUST_STOP);
457
 
  }
458
 
  start_id= *slave_run_id;
459
 
  if (high_priority)
460
 
  {
461
 
    struct sched_param tmp_sched_param;
462
 
 
463
 
    memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
464
 
    tmp_sched_param.sched_priority= CONNECT_PRIOR;
465
 
    (void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
466
 
  }
467
 
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
468
 
  {
469
 
    if (start_lock)
470
 
      pthread_mutex_unlock(start_lock);
471
 
    return(ER_SLAVE_THREAD);
472
 
  }
473
 
  if (start_cond && cond_lock) // caller has cond_lock
474
 
  {
475
 
    THD* thd = current_thd;
476
 
    while (start_id == *slave_run_id)
477
 
    {
478
 
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
479
 
                                            "Waiting for slave thread to start");
480
 
      pthread_cond_wait(start_cond,cond_lock);
481
 
      thd->exit_cond(old_msg);
482
 
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
483
 
      if (thd->killed)
484
 
        return(thd->killed_errno());
485
 
    }
486
 
  }
487
 
  if (start_lock)
488
 
    pthread_mutex_unlock(start_lock);
489
 
  return(0);
490
 
}
491
 
 
492
 
 
493
 
/*
494
 
  start_slave_threads()
495
 
 
496
 
  NOTES
497
 
    SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
498
 
    sense to do that for starting a slave--we always care if it actually
499
 
    started the threads that were not previously running
500
 
*/
501
 
 
502
 
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
503
 
                        Master_info* mi,
504
 
                        const char* master_info_fname __attribute__((unused)),
505
 
                        const char* slave_info_fname __attribute__((unused)),
506
 
                        int32_t thread_mask)
507
 
{
508
 
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
509
 
  pthread_cond_t* cond_io=0,*cond_sql=0;
510
 
  int32_t error=0;
511
 
 
512
 
  if (need_slave_mutex)
513
 
  {
514
 
    lock_io = &mi->run_lock;
515
 
    lock_sql = &mi->rli.run_lock;
516
 
  }
517
 
  if (wait_for_start)
518
 
  {
519
 
    cond_io = &mi->start_cond;
520
 
    cond_sql = &mi->rli.start_cond;
521
 
    lock_cond_io = &mi->run_lock;
522
 
    lock_cond_sql = &mi->rli.run_lock;
523
 
  }
524
 
 
525
 
  if (thread_mask & SLAVE_IO)
526
 
    error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
527
 
                              cond_io,
528
 
                              &mi->slave_running, &mi->slave_run_id,
529
 
                              mi, 1); //high priority, to read the most possible
530
 
  if (!error && (thread_mask & SLAVE_SQL))
531
 
  {
532
 
    error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
533
 
                              cond_sql,
534
 
                              &mi->rli.slave_running, &mi->rli.slave_run_id,
535
 
                              mi, 0);
536
 
    if (error)
537
 
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
538
 
  }
539
 
  return(error);
540
 
}
541
 
 
542
 
 
543
 
#ifdef NOT_USED_YET
544
 
static int32_t end_slave_on_walk(Master_info* mi, unsigned char* /*unused*/)
545
 
{
546
 
  end_master_info(mi);
547
 
  return(0);
548
 
}
549
 
#endif
550
 
 
551
 
 
552
 
/*
553
 
  Free all resources used by slave
554
 
 
555
 
  SYNOPSIS
556
 
    end_slave()
557
 
*/
558
 
 
559
 
void end_slave()
560
 
{
561
 
  /*
562
 
    This is called when the server terminates, in close_connections().
563
 
    It terminates slave threads. However, some CHANGE MASTER etc may still be
564
 
    running presently. If a START SLAVE was in progress, the mutex lock below
565
 
    will make us wait until slave threads have started, and START SLAVE
566
 
    returns, then we terminate them here.
567
 
  */
568
 
  pthread_mutex_lock(&LOCK_active_mi);
569
 
  if (active_mi)
570
 
  {
571
 
    /*
572
 
      TODO: replace the line below with
573
 
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
574
 
      once multi-master code is ready.
575
 
    */
576
 
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
577
 
    end_master_info(active_mi);
578
 
    delete active_mi;
579
 
    active_mi= 0;
580
 
  }
581
 
  pthread_mutex_unlock(&LOCK_active_mi);
582
 
  return;
583
 
}
584
 
 
585
 
 
586
 
static bool io_slave_killed(THD* thd, Master_info* mi)
587
 
{
588
 
  assert(mi->io_thd == thd);
589
 
  assert(mi->slave_running); // tracking buffer overrun
590
 
  return(mi->abort_slave || abort_loop || thd->killed);
591
 
}
592
 
 
593
 
 
594
 
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
595
 
{
596
 
  assert(rli->sql_thd == thd);
597
 
  assert(rli->slave_running == 1);// tracking buffer overrun
598
 
  if (abort_loop || thd->killed || rli->abort_slave)
599
 
  {
600
 
    /*
601
 
      If we are in an unsafe situation (stopping could corrupt replication),
602
 
      we give one minute to the slave SQL thread of grace before really
603
 
      terminating, in the hope that it will be able to read more events and
604
 
      the unsafe situation will soon be left. Note that this one minute starts
605
 
      from the last time anything happened in the slave SQL thread. So it's
606
 
      really one minute of idleness, we don't timeout if the slave SQL thread
607
 
      is actively working.
608
 
    */
609
 
    if (rli->last_event_start_time == 0)
610
 
      return(1);
611
 
    if (difftime(time(0), rli->last_event_start_time) > 60)
612
 
    {
613
 
      rli->report(ERROR_LEVEL, 0,
614
 
                  _("SQL thread had to stop in an unsafe situation, in "
615
 
                  "the middle of applying updates to a "
616
 
                  "non-transactional table without any primary key. "
617
 
                  "There is a risk of duplicate updates when the slave "
618
 
                  "SQL thread is restarted. Please check your tables' "
619
 
                  "contents after restart."));
620
 
      return(1);
621
 
    }
622
 
  }
623
 
  return(0);
624
 
}
625
 
 
626
 
 
627
 
/*
628
 
  skip_load_data_infile()
629
 
 
630
 
  NOTES
631
 
    This is used to tell a 3.23 master to break send_file()
632
 
*/
633
 
 
634
 
void skip_load_data_infile(NET *net)
635
 
{
636
 
  (void)net_request_file(net, "/dev/null");
637
 
  (void)my_net_read(net);                               // discard response
638
 
  (void)net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0); // ok
639
 
  return;
640
 
}
641
 
 
642
 
 
643
 
bool net_request_file(NET* net, const char* fname)
644
 
{
645
 
  return(net_write_command(net, 251, (unsigned char*) fname, strlen(fname),
646
 
                                (unsigned char*) "", 0));
647
 
}
648
 
 
649
 
/*
650
 
  From other comments and tests in code, it looks like
651
 
  sometimes Query_log_event and Load_log_event can have db == 0
652
 
  (see rewrite_db() above for example)
653
 
  (cases where this happens are unclear; it may be when the master is 3.23).
654
 
*/
655
 
 
656
 
const char *print_slave_db_safe(const char* db)
657
 
{
658
 
  return((db ? db : ""));
659
 
}
660
 
 
661
 
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
662
 
                                 const char *default_val)
663
 
{
664
 
  uint32_t length;
665
 
 
666
 
  if ((length=my_b_gets(f,var, max_size)))
667
 
  {
668
 
    char* last_p = var + length -1;
669
 
    if (*last_p == '\n')
670
 
      *last_p = 0; // if we stopped on newline, kill it
671
 
    else
672
 
    {
673
 
      /*
674
 
        If we truncated a line or stopped on last char, remove all chars
675
 
        up to and including newline.
676
 
      */
677
 
      int32_t c;
678
 
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
679
 
    }
680
 
    return(0);
681
 
  }
682
 
  else if (default_val)
683
 
  {
684
 
    strmake(var,  default_val, max_size-1);
685
 
    return(0);
686
 
  }
687
 
  return(1);
688
 
}
689
 
 
690
 
 
691
 
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
692
 
{
693
 
  char buf[32];
694
 
 
695
 
 
696
 
  if (my_b_gets(f, buf, sizeof(buf)))
697
 
  {
698
 
    *var = atoi(buf);
699
 
    return(0);
700
 
  }
701
 
  else if (default_val)
702
 
  {
703
 
    *var = default_val;
704
 
    return(0);
705
 
  }
706
 
  return(1);
707
 
}
708
 
 
709
 
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
710
 
{
711
 
  char buf[16];
712
 
 
713
 
 
714
 
  if (my_b_gets(f, buf, sizeof(buf)))
715
 
  {
716
 
    if (sscanf(buf, "%f", var) != 1)
717
 
      return(1);
718
 
    else
719
 
      return(0);
720
 
  }
721
 
  else if (default_val != 0.0)
722
 
  {
723
 
    *var = default_val;
724
 
    return(0);
725
 
  }
726
 
  return(1);
727
 
}
728
 
 
729
 
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
730
 
{
731
 
  if (io_slave_killed(thd, mi))
732
 
  {
733
 
    if (info && global_system_variables.log_warnings)
734
 
      sql_print_information(info);
735
 
    return true;
736
 
  }
737
 
  return false;
738
 
}
739
 
 
740
 
 
741
 
/*
742
 
  Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
743
 
  relying on the binlog's version. This is not perfect: imagine an upgrade
744
 
  of the master without waiting that all slaves are in sync with the master;
745
 
  then a slave could be fooled about the binlog's format. This is what happens
746
 
  when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
747
 
  slaves are fooled. So we do this only to distinguish between 3.23 and more
748
 
  recent masters (it's too late to change things for 3.23).
749
 
 
750
 
  RETURNS
751
 
  0       ok
752
 
  1       error
753
 
*/
754
 
 
755
 
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
756
 
{
757
 
  char error_buf[512];
758
 
  String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
759
 
  char err_buff[MAX_SLAVE_ERRMSG];
760
 
  const char* errmsg= 0;
761
 
  int32_t err_code= 0;
762
 
  DRIZZLE_RES *master_res= 0;
763
 
  DRIZZLE_ROW master_row;
764
 
 
765
 
  err_msg.length(0);
766
 
  /*
767
 
    Free old description_event_for_queue (that is needed if we are in
768
 
    a reconnection).
769
 
  */
770
 
  delete mi->rli.relay_log.description_event_for_queue;
771
 
  mi->rli.relay_log.description_event_for_queue= 0;
772
 
 
773
 
  if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
774
 
  {
775
 
    errmsg = _("Master reported unrecognized DRIZZLE version");
776
 
    err_code= ER_SLAVE_FATAL_ERROR;
777
 
    sprintf(err_buff, ER(err_code), errmsg);
778
 
    err_msg.append(err_buff);
779
 
  }
780
 
  else
781
 
  {
782
 
    /*
783
 
      Note the following switch will bug when we have DRIZZLE branch 30 ;)
784
 
    */
785
 
    switch (*drizzle->server_version)
786
 
    {
787
 
    case '0':
788
 
    case '1':
789
 
    case '2':
790
 
      errmsg = _("Master reported unrecognized DRIZZLE version");
791
 
      err_code= ER_SLAVE_FATAL_ERROR;
792
 
      sprintf(err_buff, ER(err_code), errmsg);
793
 
      err_msg.append(err_buff);
794
 
      break;
795
 
    case '3':
796
 
      mi->rli.relay_log.description_event_for_queue= new
797
 
        Format_description_log_event(1, drizzle->server_version);
798
 
      break;
799
 
    case '4':
800
 
      mi->rli.relay_log.description_event_for_queue= new
801
 
        Format_description_log_event(3, drizzle->server_version);
802
 
      break;
803
 
    default:
804
 
      /*
805
 
        Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
806
 
        take the early steps (like tests for "is this a 3.23 master") which we
807
 
        have to take before we receive the real master's Format_desc which will
808
 
        override this one. Note that the Format_desc we create below is garbage
809
 
        (it has the format of the *slave*); it's only good to help know if the
810
 
        master is 3.23, 4.0, etc.
811
 
      */
812
 
      mi->rli.relay_log.description_event_for_queue= new
813
 
        Format_description_log_event(4, drizzle->server_version);
814
 
      break;
815
 
    }
816
 
  }
817
 
 
818
 
  /*
819
 
     This does not mean that a 5.0 slave will be able to read a 6.0 master; but
820
 
     as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
821
 
     can't read a 6.0 master, this will show up when the slave can't read some
822
 
     events sent by the master, and there will be error messages.
823
 
  */
824
 
 
825
 
  if (err_msg.length() != 0)
826
 
    goto err;
827
 
 
828
 
  /* as we are here, we tried to allocate the event */
829
 
  if (!mi->rli.relay_log.description_event_for_queue)
830
 
  {
831
 
    errmsg= _("default Format_description_log_event");
832
 
    err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
833
 
    sprintf(err_buff, ER(err_code), errmsg);
834
 
    err_msg.append(err_buff);
835
 
    goto err;
836
 
  }
837
 
 
838
 
  /*
839
 
    Compare the master and slave's clock. Do not die if master's clock is
840
 
    unavailable (very old master not supporting UNIX_TIMESTAMP()?).
841
 
  */
842
 
 
843
 
  if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
844
 
      (master_res= drizzle_store_result(drizzle)) &&
845
 
      (master_row= drizzle_fetch_row(master_res)))
846
 
  {
847
 
    mi->clock_diff_with_master=
848
 
      (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
849
 
  }
850
 
  else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
851
 
  {
852
 
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
853
 
    sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
854
 
                        "do not trust column Seconds_Behind_Master of SHOW "
855
 
                        "SLAVE STATUS. Error: %s (%d)"),
856
 
                      drizzle_error(drizzle), drizzle_errno(drizzle));
857
 
  }
858
 
  if (master_res)
859
 
    drizzle_free_result(master_res);
860
 
 
861
 
  /*
862
 
    Check that the master's server id and ours are different. Because if they
863
 
    are equal (which can result from a simple copy of master's datadir to slave,
864
 
    thus copying some my.cnf), replication will work but all events will be
865
 
    skipped.
866
 
    Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
867
 
    master?).
868
 
    Note: we could have put a @@SERVER_ID in the previous SELECT
869
 
    UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
870
 
  */
871
 
  if (!drizzle_real_query(drizzle,
872
 
                        STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
873
 
      (master_res= drizzle_store_result(drizzle)))
874
 
  {
875
 
    if ((master_row= drizzle_fetch_row(master_res)) &&
876
 
        (::server_id == strtoul(master_row[1], 0, 10)) &&
877
 
        !mi->rli.replicate_same_server_id)
878
 
    {
879
 
      errmsg=
880
 
        _("The slave I/O thread stops because master and slave have equal "
881
 
          "DRIZZLE server ids; these ids must be different "
882
 
          "for replication to work (or "
883
 
          "the --replicate-same-server-id option must be used "
884
 
          "on slave but this does"
885
 
          "not always make sense; please check the manual before using it).");
886
 
      err_code= ER_SLAVE_FATAL_ERROR;
887
 
      sprintf(err_buff, ER(err_code), errmsg);
888
 
      err_msg.append(err_buff);
889
 
    }
890
 
    drizzle_free_result(master_res);
891
 
    if (errmsg)
892
 
      goto err;
893
 
  }
894
 
 
895
 
  /*
896
 
    Check that the master's global character_set_server and ours are the same.
897
 
    Not fatal if query fails (old master?).
898
 
    Note that we don't check for equality of global character_set_client and
899
 
    collation_connection (neither do we prevent their setting in
900
 
    set_var.cc). That's because from what I (Guilhem) have tested, the global
901
 
    values of these 2 are never used (new connections don't use them).
902
 
    We don't test equality of global collation_database either as it's is
903
 
    going to be deprecated (made read-only) in 4.1 very soon.
904
 
    The test is only relevant if master < 5.0.3 (we'll test only if it's older
905
 
    than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
906
 
    charset info in each binlog event.
907
 
    We don't do it for 3.23 because masters <3.23.50 hang on
908
 
    SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
909
 
    test only if master is 4.x.
910
 
  */
911
 
 
912
 
  /* redundant with rest of code but safer against later additions */
913
 
  if (*drizzle->server_version == '3')
914
 
    goto err;
915
 
 
916
 
  if ((*drizzle->server_version == '4') &&
917
 
      !drizzle_real_query(drizzle,
918
 
                        STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
919
 
      (master_res= drizzle_store_result(drizzle)))
920
 
  {
921
 
    if ((master_row= drizzle_fetch_row(master_res)) &&
922
 
        strcmp(master_row[0], global_system_variables.collation_server->name))
923
 
    {
924
 
      errmsg=
925
 
        _("The slave I/O thread stops because master and slave have"
926
 
          " different values for the COLLATION_SERVER global variable."
927
 
          " The values must be equal for replication to work");
928
 
      err_code= ER_SLAVE_FATAL_ERROR;
929
 
      sprintf(err_buff, ER(err_code), errmsg);
930
 
      err_msg.append(err_buff);
931
 
    }
932
 
    drizzle_free_result(master_res);
933
 
    if (errmsg)
934
 
      goto err;
935
 
  }
936
 
 
937
 
  /*
938
 
    Perform analogous check for time zone. Theoretically we also should
939
 
    perform check here to verify that SYSTEM time zones are the same on
940
 
    slave and master, but we can't rely on value of @@system_time_zone
941
 
    variable (it is time zone abbreviation) since it determined at start
942
 
    time and so could differ for slave and master even if they are really
943
 
    in the same system time zone. So we are omiting this check and just
944
 
    relying on documentation. Also according to Monty there are many users
945
 
    who are using replication between servers in various time zones. Hence
946
 
    such check will broke everything for them. (And now everything will
947
 
    work for them because by default both their master and slave will have
948
 
    'SYSTEM' time zone).
949
 
    This check is only necessary for 4.x masters (and < 5.0.4 masters but
950
 
    those were alpha).
951
 
  */
952
 
  if ((*drizzle->server_version == '4') &&
953
 
      !drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
954
 
      (master_res= drizzle_store_result(drizzle)))
955
 
  {
956
 
    if ((master_row= drizzle_fetch_row(master_res)) &&
957
 
        strcmp(master_row[0],
958
 
               global_system_variables.time_zone->get_name()->ptr()))
959
 
    {
960
 
      errmsg=
961
 
        _("The slave I/O thread stops because master and slave have"
962
 
          " different values for the TIME_ZONE global variable."
963
 
          " The values must be equal for replication to work");
964
 
      err_code= ER_SLAVE_FATAL_ERROR;
965
 
      sprintf(err_buff, ER(err_code), errmsg);
966
 
      err_msg.append(err_buff);
967
 
    }
968
 
    drizzle_free_result(master_res);
969
 
 
970
 
    if (errmsg)
971
 
      goto err;
972
 
  }
973
 
 
974
 
  if (mi->heartbeat_period != 0.0)
975
 
  {
976
 
    char llbuf[22];
977
 
    const char query_format[]= "SET @master_heartbeat_period= %s";
978
 
    char query[sizeof(query_format) - 2 + sizeof(llbuf)];
979
 
    /* 
980
 
       the period is an uint64_t of nano-secs. 
981
 
    */
982
 
    llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
983
 
    sprintf(query, query_format, llbuf);
984
 
 
985
 
    if (drizzle_real_query(drizzle, query, strlen(query))
986
 
        && !check_io_slave_killed(mi->io_thd, mi, NULL))
987
 
    {
988
 
      err_msg.append("The slave I/O thread stops because querying master with '");
989
 
      err_msg.append(query);
990
 
      err_msg.append("' failed;");
991
 
      err_msg.append(" error: ");
992
 
      err_code= drizzle_errno(drizzle);
993
 
      err_msg.qs_append(err_code);
994
 
      err_msg.append("  '");
995
 
      err_msg.append(drizzle_error(drizzle));
996
 
      err_msg.append("'");
997
 
      drizzle_free_result(drizzle_store_result(drizzle));
998
 
      goto err;
999
 
    }
1000
 
    drizzle_free_result(drizzle_store_result(drizzle));
1001
 
  }
1002
 
  
1003
 
err:
1004
 
  if (err_msg.length() != 0)
1005
 
  {
1006
 
    sql_print_error(err_msg.ptr());
1007
 
    assert(err_code != 0);
1008
 
    mi->report(ERROR_LEVEL, err_code, err_msg.ptr());
1009
 
    return(1);
1010
 
  }
1011
 
 
1012
 
  return(0);
1013
 
}
1014
 
 
1015
 
 
1016
 
static bool wait_for_relay_log_space(Relay_log_info* rli)
1017
 
{
1018
 
  bool slave_killed=0;
1019
 
  Master_info* mi = rli->mi;
1020
 
  const char *save_proc_info;
1021
 
  THD* thd = mi->io_thd;
1022
 
 
1023
 
  pthread_mutex_lock(&rli->log_space_lock);
1024
 
  save_proc_info= thd->enter_cond(&rli->log_space_cond,
1025
 
                                  &rli->log_space_lock,
1026
 
                                  _("Waiting for the slave SQL thread "
1027
 
                                    "to free enough relay log space"));
1028
 
  while (rli->log_space_limit < rli->log_space_total &&
1029
 
         !(slave_killed=io_slave_killed(thd,mi)) &&
1030
 
         !rli->ignore_log_space_limit)
1031
 
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1032
 
  thd->exit_cond(save_proc_info);
1033
 
  return(slave_killed);
1034
 
}
1035
 
 
1036
 
 
1037
 
/*
1038
 
  Builds a Rotate from the ignored events' info and writes it to relay log.
1039
 
 
1040
 
  SYNOPSIS
1041
 
  write_ignored_events_info_to_relay_log()
1042
 
    thd             pointer to I/O thread's thd
1043
 
    mi
1044
 
 
1045
 
  DESCRIPTION
1046
 
    Slave I/O thread, going to die, must leave a durable trace of the
1047
 
    ignored events' end position for the use of the slave SQL thread, by
1048
 
    calling this function. Only that thread can call it (see assertion).
1049
 
 */
1050
 
static void write_ignored_events_info_to_relay_log(THD *thd __attribute__((unused)),
1051
 
                                                   Master_info *mi)
1052
 
{
1053
 
  Relay_log_info *rli= &mi->rli;
1054
 
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1055
 
 
1056
 
  assert(thd == mi->io_thd);
1057
 
  pthread_mutex_lock(log_lock);
1058
 
  if (rli->ign_master_log_name_end[0])
1059
 
  {
1060
 
    Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1061
 
                                               0, rli->ign_master_log_pos_end,
1062
 
                                               Rotate_log_event::DUP_NAME);
1063
 
    rli->ign_master_log_name_end[0]= 0;
1064
 
    /* can unlock before writing as slave SQL thd will soon see our Rotate */
1065
 
    pthread_mutex_unlock(log_lock);
1066
 
    if (likely((bool)ev))
1067
 
    {
1068
 
      ev->server_id= 0; // don't be ignored by slave SQL thread
1069
 
      if (unlikely(rli->relay_log.append(ev)))
1070
 
        mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
1071
 
                   ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
1072
 
                   _("failed to write a Rotate event"
1073
 
                     " to the relay log, SHOW SLAVE STATUS may be"
1074
 
                     " inaccurate"));
1075
 
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1076
 
      if (flush_master_info(mi, 1))
1077
 
        sql_print_error(_("Failed to flush master info file"));
1078
 
      delete ev;
1079
 
    }
1080
 
    else
1081
 
      mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
1082
 
                 ER(ER_SLAVE_CREATE_EVENT_FAILURE),
1083
 
                 _("Rotate_event (out of memory?),"
1084
 
                   " SHOW SLAVE STATUS may be inaccurate"));
1085
 
  }
1086
 
  else
1087
 
    pthread_mutex_unlock(log_lock);
1088
 
  return;
1089
 
}
1090
 
 
1091
 
 
1092
 
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
1093
 
                             bool *suppress_warnings)
1094
 
{
1095
 
  unsigned char buf[1024], *pos= buf;
1096
 
  uint32_t report_host_len, report_user_len=0, report_password_len=0;
1097
 
 
1098
 
  *suppress_warnings= false;
1099
 
  if (!report_host)
1100
 
    return(0);
1101
 
  report_host_len= strlen(report_host);
1102
 
  if (report_user)
1103
 
    report_user_len= strlen(report_user);
1104
 
  if (report_password)
1105
 
    report_password_len= strlen(report_password);
1106
 
  /* 30 is a good safety margin */
1107
 
  if (report_host_len + report_user_len + report_password_len + 30 >
1108
 
      sizeof(buf))
1109
 
    return(0);                                     // safety
1110
 
 
1111
 
  int4store(pos, server_id); pos+= 4;
1112
 
  pos= net_store_data(pos, (unsigned char*) report_host, report_host_len);
1113
 
  pos= net_store_data(pos, (unsigned char*) report_user, report_user_len);
1114
 
  pos= net_store_data(pos, (unsigned char*) report_password, report_password_len);
1115
 
  int2store(pos, (uint16_t) report_port); pos+= 2;
1116
 
  int4store(pos, rpl_recovery_rank);    pos+= 4;
1117
 
  /* The master will fill in master_id */
1118
 
  int4store(pos, 0);                    pos+= 4;
1119
 
 
1120
 
  if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
1121
 
  {
1122
 
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1123
 
    {
1124
 
      *suppress_warnings= true;                 // Suppress reconnect warning
1125
 
    }
1126
 
    else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
1127
 
    {
1128
 
      char buf[256];
1129
 
      snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle), 
1130
 
               drizzle_errno(drizzle));
1131
 
      mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1132
 
                 ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1133
 
    }
1134
 
    return(1);
1135
 
  }
1136
 
  return(0);
1137
 
}
1138
 
 
1139
 
 
1140
 
bool show_master_info(THD* thd, Master_info* mi)
1141
 
{
1142
 
  // TODO: fix this for multi-master
1143
 
  List<Item> field_list;
1144
 
  Protocol *protocol= thd->protocol;
1145
 
 
1146
 
  field_list.push_back(new Item_empty_string("Slave_IO_State",
1147
 
                                                     14));
1148
 
  field_list.push_back(new Item_empty_string("Master_Host",
1149
 
                                                     sizeof(mi->host)));
1150
 
  field_list.push_back(new Item_empty_string("Master_User",
1151
 
                                                     sizeof(mi->user)));
1152
 
  field_list.push_back(new Item_return_int("Master_Port", 7,
1153
 
                                           DRIZZLE_TYPE_LONG));
1154
 
  field_list.push_back(new Item_return_int("Connect_Retry", 10,
1155
 
                                           DRIZZLE_TYPE_LONG));
1156
 
  field_list.push_back(new Item_empty_string("Master_Log_File",
1157
 
                                             FN_REFLEN));
1158
 
  field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
1159
 
                                           DRIZZLE_TYPE_LONGLONG));
1160
 
  field_list.push_back(new Item_empty_string("Relay_Log_File",
1161
 
                                             FN_REFLEN));
1162
 
  field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
1163
 
                                           DRIZZLE_TYPE_LONGLONG));
1164
 
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
1165
 
                                             FN_REFLEN));
1166
 
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
1167
 
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1168
 
  field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
1169
 
  field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
1170
 
  field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
1171
 
  field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
1172
 
  field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
1173
 
  field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
1174
 
                                             28));
1175
 
  field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
1176
 
  field_list.push_back(new Item_empty_string("Last_Error", 20));
1177
 
  field_list.push_back(new Item_return_int("Skip_Counter", 10,
1178
 
                                           DRIZZLE_TYPE_LONG));
1179
 
  field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
1180
 
                                           DRIZZLE_TYPE_LONGLONG));
1181
 
  field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
1182
 
                                           DRIZZLE_TYPE_LONGLONG));
1183
 
  field_list.push_back(new Item_empty_string("Until_Condition", 6));
1184
 
  field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1185
 
  field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1186
 
                                           DRIZZLE_TYPE_LONGLONG));
1187
 
  field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
1188
 
  field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
1189
 
                                             sizeof(mi->ssl_ca)));
1190
 
  field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
1191
 
                                             sizeof(mi->ssl_capath)));
1192
 
  field_list.push_back(new Item_empty_string("Master_SSL_Cert",
1193
 
                                             sizeof(mi->ssl_cert)));
1194
 
  field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
1195
 
                                             sizeof(mi->ssl_cipher)));
1196
 
  field_list.push_back(new Item_empty_string("Master_SSL_Key",
1197
 
                                             sizeof(mi->ssl_key)));
1198
 
  field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1199
 
                                           DRIZZLE_TYPE_LONGLONG));
1200
 
  field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
1201
 
                                             3));
1202
 
  field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
1203
 
  field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1204
 
  field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
1205
 
  field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
1206
 
 
1207
 
  if (protocol->send_fields(&field_list,
1208
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1209
 
    return(true);
1210
 
 
1211
 
  if (mi->host[0])
1212
 
  {
1213
 
    String *packet= &thd->packet;
1214
 
    protocol->prepare_for_resend();
1215
 
 
1216
 
    /*
1217
 
      slave_running can be accessed without run_lock but not other
1218
 
      non-volotile members like mi->io_thd, which is guarded by the mutex.
1219
 
    */
1220
 
    pthread_mutex_lock(&mi->run_lock);
1221
 
    protocol->store(mi->io_thd ? mi->io_thd->get_proc_info() : "", &my_charset_bin);
1222
 
    pthread_mutex_unlock(&mi->run_lock);
1223
 
 
1224
 
    pthread_mutex_lock(&mi->data_lock);
1225
 
    pthread_mutex_lock(&mi->rli.data_lock);
1226
 
    protocol->store(mi->host, &my_charset_bin);
1227
 
    protocol->store(mi->user, &my_charset_bin);
1228
 
    protocol->store((uint32_t) mi->port);
1229
 
    protocol->store((uint32_t) mi->connect_retry);
1230
 
    protocol->store(mi->master_log_name, &my_charset_bin);
1231
 
    protocol->store((uint64_t) mi->master_log_pos);
1232
 
    protocol->store(mi->rli.group_relay_log_name +
1233
 
                    dirname_length(mi->rli.group_relay_log_name),
1234
 
                    &my_charset_bin);
1235
 
    protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1236
 
    protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
1237
 
    protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1238
 
                    "Yes" : "No", &my_charset_bin);
1239
 
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1240
 
    protocol->store(rpl_filter->get_do_db());
1241
 
    protocol->store(rpl_filter->get_ignore_db());
1242
 
 
1243
 
    char buf[256];
1244
 
    String tmp(buf, sizeof(buf), &my_charset_bin);
1245
 
    rpl_filter->get_do_table(&tmp);
1246
 
    protocol->store(&tmp);
1247
 
    rpl_filter->get_ignore_table(&tmp);
1248
 
    protocol->store(&tmp);
1249
 
    rpl_filter->get_wild_do_table(&tmp);
1250
 
    protocol->store(&tmp);
1251
 
    rpl_filter->get_wild_ignore_table(&tmp);
1252
 
    protocol->store(&tmp);
1253
 
 
1254
 
    protocol->store(mi->rli.last_error().number);
1255
 
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
1256
 
    protocol->store((uint32_t) mi->rli.slave_skip_counter);
1257
 
    protocol->store((uint64_t) mi->rli.group_master_log_pos);
1258
 
    protocol->store((uint64_t) mi->rli.log_space_total);
1259
 
 
1260
 
    protocol->store(
1261
 
      mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
1262
 
        ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
1263
 
          "Relay"), &my_charset_bin);
1264
 
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
1265
 
    protocol->store((uint64_t) mi->rli.until_log_pos);
1266
 
 
1267
 
    protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
1268
 
    protocol->store(mi->ssl_ca, &my_charset_bin);
1269
 
    protocol->store(mi->ssl_capath, &my_charset_bin);
1270
 
    protocol->store(mi->ssl_cert, &my_charset_bin);
1271
 
    protocol->store(mi->ssl_cipher, &my_charset_bin);
1272
 
    protocol->store(mi->ssl_key, &my_charset_bin);
1273
 
 
1274
 
    /*
1275
 
      Seconds_Behind_Master: if SQL thread is running and I/O thread is
1276
 
      connected, we can compute it otherwise show NULL (i.e. unknown).
1277
 
    */
1278
 
    if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
1279
 
        mi->rli.slave_running)
1280
 
    {
1281
 
      long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
1282
 
                       - mi->clock_diff_with_master);
1283
 
      /*
1284
 
        Apparently on some systems time_diff can be <0. Here are possible
1285
 
        reasons related to MySQL:
1286
 
        - the master is itself a slave of another master whose time is ahead.
1287
 
        - somebody used an explicit SET TIMESTAMP on the master.
1288
 
        Possible reason related to granularity-to-second of time functions
1289
 
        (nothing to do with MySQL), which can explain a value of -1:
1290
 
        assume the master's and slave's time are perfectly synchronized, and
1291
 
        that at slave's connection time, when the master's timestamp is read,
1292
 
        it is at the very end of second 1, and (a very short time later) when
1293
 
        the slave's timestamp is read it is at the very beginning of second
1294
 
        2. Then the recorded value for master is 1 and the recorded value for
1295
 
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1296
 
        between timestamp of slave and rli->last_master_timestamp is 0
1297
 
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1298
 
        This confuses users, so we don't go below 0: hence the cmax().
1299
 
 
1300
 
        last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1301
 
        special marker to say "consider we have caught up".
1302
 
      */
1303
 
      protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1304
 
                                 cmax((long)0, time_diff) : 0));
1305
 
    }
1306
 
    else
1307
 
    {
1308
 
      protocol->store_null();
1309
 
    }
1310
 
    protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
1311
 
 
1312
 
    // Last_IO_Errno
1313
 
    protocol->store(mi->last_error().number);
1314
 
    // Last_IO_Error
1315
 
    protocol->store(mi->last_error().message, &my_charset_bin);
1316
 
    // Last_SQL_Errno
1317
 
    protocol->store(mi->rli.last_error().number);
1318
 
    // Last_SQL_Error
1319
 
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
1320
 
 
1321
 
    pthread_mutex_unlock(&mi->rli.data_lock);
1322
 
    pthread_mutex_unlock(&mi->data_lock);
1323
 
 
1324
 
    if (my_net_write(&thd->net, (unsigned char*) thd->packet.ptr(), packet->length()))
1325
 
      return(true);
1326
 
  }
1327
 
  my_eof(thd);
1328
 
  return(false);
1329
 
}
1330
 
 
1331
 
 
1332
 
void set_slave_thread_options(THD* thd)
1333
 
{
1334
 
  /*
1335
 
     It's nonsense to constrain the slave threads with max_join_size; if a
1336
 
     query succeeded on master, we HAVE to execute it. So set
1337
 
     OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
1338
 
     (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
1339
 
     SELECT examining more than 4 billion rows would still fail (yes, because
1340
 
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1341
 
     only for client threads.
1342
 
  */
1343
 
  uint64_t options= thd->options | OPTION_BIG_SELECTS;
1344
 
  if (opt_log_slave_updates)
1345
 
    options|= OPTION_BIN_LOG;
1346
 
  else
1347
 
    options&= ~OPTION_BIN_LOG;
1348
 
  thd->options= options;
1349
 
  thd->variables.completion_type= 0;
1350
 
  return;
1351
 
}
1352
 
 
1353
 
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
1354
 
{
1355
 
  thd->variables.character_set_client=
1356
 
    global_system_variables.character_set_client;
1357
 
  thd->variables.collation_connection=
1358
 
    global_system_variables.collation_connection;
1359
 
  thd->variables.collation_server=
1360
 
    global_system_variables.collation_server;
1361
 
  thd->update_charset();
1362
 
 
1363
 
  /*
1364
 
    We use a const cast here since the conceptual (and externally
1365
 
    visible) behavior of the function is to set the default charset of
1366
 
    the thread.  That the cache has to be invalidated is a secondary
1367
 
    effect.
1368
 
   */
1369
 
  const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
1370
 
  return;
1371
 
}
1372
 
 
1373
 
/*
1374
 
  init_slave_thread()
1375
 
*/
1376
 
 
1377
 
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
1378
 
{
1379
 
  int32_t simulate_error= 0;
1380
 
  thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
1381
 
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1382
 
  thd->security_ctx->skip_grants();
1383
 
  my_net_init(&thd->net, 0);
1384
 
/*
1385
 
  Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1386
 
  slave threads, since a replication event can become this much larger
1387
 
  than the corresponding packet (query) sent from client to master.
1388
 
*/
1389
 
  thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1390
 
    + MAX_LOG_EVENT_HEADER;  /* note, incr over the global not session var */
1391
 
  thd->slave_thread = 1;
1392
 
  thd->enable_slow_log= opt_log_slow_slave_statements;
1393
 
  set_slave_thread_options(thd);
1394
 
  thd->client_capabilities = CLIENT_LOCAL_FILES;
1395
 
  pthread_mutex_lock(&LOCK_thread_count);
1396
 
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
1397
 
  pthread_mutex_unlock(&LOCK_thread_count);
1398
 
 
1399
 
 simulate_error|= (1 << SLAVE_THD_IO);
1400
 
 simulate_error|= (1 << SLAVE_THD_SQL);
1401
 
  if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
1402
 
  {
1403
 
    thd->cleanup();
1404
 
    return(-1);
1405
 
  }
1406
 
  lex_start(thd);
1407
 
 
1408
 
  if (thd_type == SLAVE_THD_SQL)
1409
 
    thd_proc_info(thd, "Waiting for the next event in relay log");
1410
 
  else
1411
 
    thd_proc_info(thd, "Waiting for master update");
1412
 
  thd->version=refresh_version;
1413
 
  thd->set_time();
1414
 
  return(0);
1415
 
}
1416
 
 
1417
 
 
1418
 
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1419
 
                      void* thread_killed_arg)
1420
 
{
1421
 
  int32_t nap_time;
1422
 
  thr_alarm_t alarmed;
1423
 
 
1424
 
  thr_alarm_init(&alarmed);
1425
 
  time_t start_time= my_time(0);
1426
 
  time_t end_time= start_time+sec;
1427
 
 
1428
 
  while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1429
 
  {
1430
 
    ALARM alarm_buff;
1431
 
    /*
1432
 
      The only reason we are asking for alarm is so that
1433
 
      we will be woken up in case of murder, so if we do not get killed,
1434
 
      set the alarm so it goes off after we wake up naturally
1435
 
    */
1436
 
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
1437
 
    sleep(nap_time);
1438
 
    thr_end_alarm(&alarmed);
1439
 
 
1440
 
    if ((*thread_killed)(thd,thread_killed_arg))
1441
 
      return(1);
1442
 
    start_time= my_time(0);
1443
 
  }
1444
 
  return(0);
1445
 
}
1446
 
 
1447
 
 
1448
 
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
1449
 
                        bool *suppress_warnings)
1450
 
{
1451
 
  unsigned char buf[FN_REFLEN + 10];
1452
 
  int32_t len;
1453
 
  int32_t binlog_flags = 0; // for now
1454
 
  char* logname = mi->master_log_name;
1455
 
  
1456
 
  *suppress_warnings= false;
1457
 
 
1458
 
  // TODO if big log files: Change next to int8store()
1459
 
  int4store(buf, (uint32_t) mi->master_log_pos);
1460
 
  int2store(buf + 4, binlog_flags);
1461
 
  int4store(buf + 6, server_id);
1462
 
  len = (uint32_t) strlen(logname);
1463
 
  memcpy(buf + 10, logname,len);
1464
 
  if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
1465
 
  {
1466
 
    /*
1467
 
      Something went wrong, so we will just reconnect and retry later
1468
 
      in the future, we should do a better error analysis, but for
1469
 
      now we just fill up the error log :-)
1470
 
    */
1471
 
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1472
 
      *suppress_warnings= true;                 // Suppress reconnect warning
1473
 
    else
1474
 
      sql_print_error(_("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs"),
1475
 
                      drizzle_errno(drizzle), drizzle_error(drizzle),
1476
 
                      mi->connect_retry);
1477
 
    return(1);
1478
 
  }
1479
 
 
1480
 
  return(0);
1481
 
}
1482
 
 
1483
 
/*
1484
 
  Read one event from the master
1485
 
 
1486
 
  SYNOPSIS
1487
 
    read_event()
1488
 
    DRIZZLE               DRIZZLE connection
1489
 
    mi                  Master connection information
1490
 
    suppress_warnings   TRUE when a normal net read timeout has caused us to
1491
 
                        try a reconnect.  We do not want to print anything to
1492
 
                        the error log in this case because this a anormal
1493
 
                        event in an idle server.
1494
 
 
1495
 
    RETURN VALUES
1496
 
    'packet_error'      Error
1497
 
    number              Length of packet
1498
 
*/
1499
 
 
1500
 
static uint32_t read_event(DRIZZLE *drizzle,
1501
 
                        Master_info *mi __attribute__((unused)),
1502
 
                        bool* suppress_warnings)
1503
 
{
1504
 
  uint32_t len;
1505
 
 
1506
 
  *suppress_warnings= false;
1507
 
  /*
1508
 
    my_real_read() will time us out
1509
 
    We check if we were told to die, and if not, try reading again
1510
 
  */
1511
 
  if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
1512
 
    return(packet_error);
1513
 
 
1514
 
  len = cli_safe_read(drizzle);
1515
 
  if (len == packet_error || (int32_t) len < 1)
1516
 
  {
1517
 
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1518
 
    {
1519
 
      /*
1520
 
        We are trying a normal reconnect after a read timeout;
1521
 
        we suppress prints to .err file as long as the reconnect
1522
 
        happens without problems
1523
 
      */
1524
 
      *suppress_warnings= true;
1525
 
    }
1526
 
    else
1527
 
      sql_print_error(_("Error reading packet from server: %s ( server_errno=%d)"),
1528
 
                      drizzle_error(drizzle), drizzle_errno(drizzle));
1529
 
    return(packet_error);
1530
 
  }
1531
 
 
1532
 
  /* Check if eof packet */
1533
 
  if (len < 8 && drizzle->net.read_pos[0] == 254)
1534
 
  {
1535
 
    sql_print_information(_("Slave: received end packet from server, apparent "
1536
 
                            "master shutdown: %s"),
1537
 
                     drizzle_error(drizzle));
1538
 
     return(packet_error);
1539
 
  }
1540
 
 
1541
 
  return(len - 1);
1542
 
}
1543
 
 
1544
 
 
1545
 
int32_t check_expected_error(THD* thd __attribute__((unused)),
1546
 
                         Relay_log_info const *rli __attribute__((unused)),
1547
 
                         int32_t expected_error)
1548
 
{
1549
 
  switch (expected_error) {
1550
 
  case ER_NET_READ_ERROR:
1551
 
  case ER_NET_ERROR_ON_WRITE:
1552
 
  case ER_QUERY_INTERRUPTED:
1553
 
  case ER_SERVER_SHUTDOWN:
1554
 
  case ER_NEW_ABORTING_CONNECTION:
1555
 
    return(1);
1556
 
  default:
1557
 
    return(0);
1558
 
  }
1559
 
}
1560
 
 
1561
 
 
1562
 
/*
1563
 
  Check if the current error is of temporary nature of not.
1564
 
  Some errors are temporary in nature, such as
1565
 
  ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT.  Ndb also signals
1566
 
  that the error is temporary by pushing a warning with the error code
1567
 
  ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1568
 
*/
1569
 
static int32_t has_temporary_error(THD *thd)
1570
 
{
1571
 
  if (thd->is_fatal_error)
1572
 
    return(0);
1573
 
 
1574
 
  if (thd->main_da.is_error())
1575
 
  {
1576
 
    thd->clear_error();
1577
 
    my_error(ER_LOCK_DEADLOCK, MYF(0));
1578
 
  }
1579
 
 
1580
 
  /*
1581
 
    If there is no message in THD, we can't say if it's a temporary
1582
 
    error or not. This is currently the case for Incident_log_event,
1583
 
    which sets no message. Return FALSE.
1584
 
  */
1585
 
  if (!thd->is_error())
1586
 
    return(0);
1587
 
 
1588
 
  /*
1589
 
    Temporary error codes:
1590
 
    currently, InnoDB deadlock detected by InnoDB or lock
1591
 
    wait timeout (innodb_lock_wait_timeout exceeded
1592
 
  */
1593
 
  if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
1594
 
      thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
1595
 
    return(1);
1596
 
 
1597
 
  return(0);
1598
 
}
1599
 
 
1600
 
 
1601
 
/**
1602
 
  Applies the given event and advances the relay log position.
1603
 
 
1604
 
  In essence, this function does:
1605
 
 
1606
 
  @code
1607
 
    ev->apply_event(rli);
1608
 
    ev->update_pos(rli);
1609
 
  @endcode
1610
 
 
1611
 
  But it also does some maintainance, such as skipping events if
1612
 
  needed and reporting errors.
1613
 
 
1614
 
  If the @c skip flag is set, then it is tested whether the event
1615
 
  should be skipped, by looking at the slave_skip_counter and the
1616
 
  server id.  The skip flag should be set when calling this from a
1617
 
  replication thread but not set when executing an explicit BINLOG
1618
 
  statement.
1619
 
 
1620
 
  @retval 0 OK.
1621
 
 
1622
 
  @retval 1 Error calling ev->apply_event().
1623
 
 
1624
 
  @retval 2 No error calling ev->apply_event(), but error calling
1625
 
  ev->update_pos().
1626
 
*/
1627
 
int32_t apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
1628
 
                               bool skip)
1629
 
{
1630
 
  int32_t exec_res= 0;
1631
 
 
1632
 
  /*
1633
 
    Execute the event to change the database and update the binary
1634
 
    log coordinates, but first we set some data that is needed for
1635
 
    the thread.
1636
 
 
1637
 
    The event will be executed unless it is supposed to be skipped.
1638
 
 
1639
 
    Queries originating from this server must be skipped.  Low-level
1640
 
    events (Format_description_log_event, Rotate_log_event,
1641
 
    Stop_log_event) from this server must also be skipped. But for
1642
 
    those we don't want to modify 'group_master_log_pos', because
1643
 
    these events did not exist on the master.
1644
 
    Format_description_log_event is not completely skipped.
1645
 
 
1646
 
    Skip queries specified by the user in 'slave_skip_counter'.  We
1647
 
    can't however skip events that has something to do with the log
1648
 
    files themselves.
1649
 
 
1650
 
    Filtering on own server id is extremely important, to ignore
1651
 
    execution of events created by the creation/rotation of the relay
1652
 
    log (remember that now the relay log starts with its Format_desc,
1653
 
    has a Rotate etc).
1654
 
  */
1655
 
 
1656
 
  thd->server_id = ev->server_id; // use the original server id for logging
1657
 
  thd->set_time();                            // time the query
1658
 
  thd->lex->current_select= 0;
1659
 
  if (!ev->when)
1660
 
    ev->when= my_time(0);
1661
 
  ev->thd = thd; // because up to this point, ev->thd == 0
1662
 
 
1663
 
  if (skip)
1664
 
  {
1665
 
    int32_t reason= ev->shall_skip(rli);
1666
 
    if (reason == Log_event::EVENT_SKIP_COUNT)
1667
 
      --rli->slave_skip_counter;
1668
 
    pthread_mutex_unlock(&rli->data_lock);
1669
 
    if (reason == Log_event::EVENT_SKIP_NOT)
1670
 
      exec_res= ev->apply_event(rli);
1671
 
  }
1672
 
  else
1673
 
    exec_res= ev->apply_event(rli);
1674
 
 
1675
 
  if (exec_res == 0)
1676
 
  {
1677
 
    int32_t error= ev->update_pos(rli);
1678
 
    /*
1679
 
      The update should not fail, so print an error message and
1680
 
      return an error code.
1681
 
 
1682
 
      TODO: Replace this with a decent error message when merged
1683
 
      with BUG#24954 (which adds several new error message).
1684
 
    */
1685
 
    if (error)
1686
 
    {
1687
 
      char buf[22];
1688
 
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
1689
 
                  _("It was not possible to update the positions"
1690
 
                  " of the relay log information: the slave may"
1691
 
                  " be in an inconsistent state."
1692
 
                  " Stopped in %s position %s"),
1693
 
                  rli->group_relay_log_name,
1694
 
                  llstr(rli->group_relay_log_pos, buf));
1695
 
      return(2);
1696
 
    }
1697
 
  }
1698
 
 
1699
 
  return(exec_res ? 1 : 0);
1700
 
}
1701
 
 
1702
 
 
1703
 
/**
1704
 
  Top-level function for executing the next event from the relay log.
1705
 
 
1706
 
  This function reads the event from the relay log, executes it, and
1707
 
  advances the relay log position.  It also handles errors, etc.
1708
 
 
1709
 
  This function may fail to apply the event for the following reasons:
1710
 
 
1711
 
   - The position specfied by the UNTIL condition of the START SLAVE
1712
 
     command is reached.
1713
 
 
1714
 
   - It was not possible to read the event from the log.
1715
 
 
1716
 
   - The slave is killed.
1717
 
 
1718
 
   - An error occurred when applying the event, and the event has been
1719
 
     tried slave_trans_retries times.  If the event has been retried
1720
 
     fewer times, 0 is returned.
1721
 
 
1722
 
   - init_master_info or init_relay_log_pos failed. (These are called
1723
 
     if a failure occurs when applying the event.)</li>
1724
 
 
1725
 
   - An error occurred when updating the binlog position.
1726
 
 
1727
 
  @retval 0 The event was applied.
1728
 
 
1729
 
  @retval 1 The event was not applied.
1730
 
*/
1731
 
static int32_t exec_relay_log_event(THD* thd, Relay_log_info* rli)
1732
 
{
1733
 
  /*
1734
 
     We acquire this mutex since we need it for all operations except
1735
 
     event execution. But we will release it in places where we will
1736
 
     wait for something for example inside of next_event().
1737
 
   */
1738
 
  pthread_mutex_lock(&rli->data_lock);
1739
 
 
1740
 
  Log_event * ev = next_event(rli);
1741
 
 
1742
 
  assert(rli->sql_thd==thd);
1743
 
 
1744
 
  if (sql_slave_killed(thd,rli))
1745
 
  {
1746
 
    pthread_mutex_unlock(&rli->data_lock);
1747
 
    delete ev;
1748
 
    return(1);
1749
 
  }
1750
 
  if (ev)
1751
 
  {
1752
 
    int32_t exec_res;
1753
 
 
1754
 
    /*
1755
 
      This tests if the position of the beginning of the current event
1756
 
      hits the UNTIL barrier.
1757
 
    */
1758
 
    if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
1759
 
        rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
1760
 
                                rli->group_master_log_pos :
1761
 
                                ev->log_pos - ev->data_written))
1762
 
    {
1763
 
      char buf[22];
1764
 
      sql_print_information(_("Slave SQL thread stopped because it reached its"
1765
 
                              " UNTIL position %s"),
1766
 
                            llstr(rli->until_pos(), buf));
1767
 
      /*
1768
 
        Setting abort_slave flag because we do not want additional message about
1769
 
        error in query execution to be printed.
1770
 
      */
1771
 
      rli->abort_slave= 1;
1772
 
      pthread_mutex_unlock(&rli->data_lock);
1773
 
      delete ev;
1774
 
      return(1);
1775
 
    }
1776
 
    exec_res= apply_event_and_update_pos(ev, thd, rli, true);
1777
 
 
1778
 
    /*
1779
 
      Format_description_log_event should not be deleted because it will be
1780
 
      used to read info about the relay log's format; it will be deleted when
1781
 
      the SQL thread does not need it, i.e. when this thread terminates.
1782
 
    */
1783
 
    if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
1784
 
    {
1785
 
      delete ev;
1786
 
    }
1787
 
 
1788
 
    /*
1789
 
      update_log_pos failed: this should not happen, so we don't
1790
 
      retry.
1791
 
    */
1792
 
    if (exec_res == 2)
1793
 
      return(1);
1794
 
 
1795
 
    if (slave_trans_retries)
1796
 
    {
1797
 
      int32_t temp_err= 0;
1798
 
      if (exec_res && (temp_err= has_temporary_error(thd)))
1799
 
      {
1800
 
        const char *errmsg;
1801
 
        /*
1802
 
          We were in a transaction which has been rolled back because of a
1803
 
          temporary error;
1804
 
          let's seek back to BEGIN log event and retry it all again.
1805
 
          Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
1806
 
          there is no rollback since 5.0.13 (ref: manual).
1807
 
          We have to not only seek but also
1808
 
          a) init_master_info(), to seek back to hot relay log's start for later
1809
 
          (for when we will come back to this hot log after re-processing the
1810
 
          possibly existing old logs where BEGIN is: check_binlog_magic() will
1811
 
          then need the cache to be at position 0 (see comments at beginning of
1812
 
          init_master_info()).
1813
 
          b) init_relay_log_pos(), because the BEGIN may be an older relay log.
1814
 
        */
1815
 
        if (rli->trans_retries < slave_trans_retries)
1816
 
        {
1817
 
          if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
1818
 
            sql_print_error(_("Failed to initialize the master info structure"));
1819
 
          else if (init_relay_log_pos(rli,
1820
 
                                      rli->group_relay_log_name,
1821
 
                                      rli->group_relay_log_pos,
1822
 
                                      1, &errmsg, 1))
1823
 
            sql_print_error(_("Error initializing relay log position: %s"),
1824
 
                            errmsg);
1825
 
          else
1826
 
          {
1827
 
            exec_res= 0;
1828
 
            end_trans(thd, ROLLBACK);
1829
 
            /* chance for concurrent connection to get more locks */
1830
 
            safe_sleep(thd, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1831
 
                       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1832
 
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1833
 
            rli->trans_retries++;
1834
 
            rli->retried_trans++;
1835
 
            pthread_mutex_unlock(&rli->data_lock);
1836
 
          }
1837
 
        }
1838
 
        else
1839
 
          sql_print_error(_("Slave SQL thread retried transaction %lu time(s) "
1840
 
                            "in vain, giving up. Consider raising the value of "
1841
 
                            "the slave_transaction_retries variable."),
1842
 
                          slave_trans_retries);
1843
 
      }
1844
 
      else if ((exec_res && !temp_err) ||
1845
 
               (opt_using_transactions &&
1846
 
                rli->group_relay_log_pos == rli->event_relay_log_pos))
1847
 
      {
1848
 
        /*
1849
 
          Only reset the retry counter if the entire group succeeded
1850
 
          or failed with a non-transient error.  On a successful
1851
 
          event, the execution will proceed as usual; in the case of a
1852
 
          non-transient error, the slave will stop with an error.
1853
 
         */
1854
 
        rli->trans_retries= 0; // restart from fresh
1855
 
      }
1856
 
    }
1857
 
    return(exec_res);
1858
 
  }
1859
 
  pthread_mutex_unlock(&rli->data_lock);
1860
 
  rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
1861
 
              ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
1862
 
              _("Could not parse relay log event entry. The possible reasons "
1863
 
                "are: the master's binary log is corrupted (you can check this "
1864
 
                "by running 'mysqlbinlog' on the binary log), the slave's "
1865
 
                "relay log is corrupted (you can check this by running "
1866
 
                "'mysqlbinlog' on the relay log), a network problem, or a bug "
1867
 
                "in the master's or slave's DRIZZLE code. If you want to check "
1868
 
                "the master's binary log or slave's relay log, you will be "
1869
 
                "able to know their names by issuing 'SHOW SLAVE STATUS' "
1870
 
                "on this slave."));
1871
 
  return(1);
1872
 
}
1873
 
 
1874
 
 
1875
 
/**
1876
 
  @brief Try to reconnect slave IO thread.
1877
 
 
1878
 
  @details Terminates current connection to master, sleeps for
1879
 
  @c mi->connect_retry msecs and initiates new connection with
1880
 
  @c safe_reconnect(). Variable pointed by @c retry_count is increased -
1881
 
  if it exceeds @c master_retry_count then connection is not re-established
1882
 
  and function signals error.
1883
 
  Unless @c suppres_warnings is TRUE, a warning is put in the server error log
1884
 
  when reconnecting. The warning message and messages used to report errors
1885
 
  are taken from @c messages array. In case @c master_retry_count is exceeded,
1886
 
  no messages are added to the log.
1887
 
 
1888
 
  @param[in]     thd                 Thread context.
1889
 
  @param[in]     DRIZZLE               DRIZZLE connection.
1890
 
  @param[in]     mi                  Master connection information.
1891
 
  @param[in,out] retry_count         Number of attempts to reconnect.
1892
 
  @param[in]     suppress_warnings   TRUE when a normal net read timeout 
1893
 
                                     has caused to reconnecting.
1894
 
  @param[in]     messages            Messages to print/log, see 
1895
 
                                     reconnect_messages[] array.
1896
 
 
1897
 
  @retval        0                   OK.
1898
 
  @retval        1                   There was an error.
1899
 
*/
1900
 
 
1901
 
static int32_t try_to_reconnect(THD *thd, DRIZZLE *drizzle, Master_info *mi,
1902
 
                                uint32_t *retry_count, bool suppress_warnings,
1903
 
                                const char *messages[SLAVE_RECON_MSG_MAX])
1904
 
{
1905
 
  mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1906
 
  thd->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
1907
 
  drizzle_disconnect(drizzle);
1908
 
  if ((*retry_count)++)
1909
 
  {
1910
 
    if (*retry_count > master_retry_count)
1911
 
      return 1;                             // Don't retry forever
1912
 
    safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1913
 
               (void *) mi);
1914
 
  }
1915
 
  if (check_io_slave_killed(thd, mi,
1916
 
                            _(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1917
 
    return 1;
1918
 
  thd->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
1919
 
  if (!suppress_warnings)
1920
 
  {
1921
 
    char buf[256], llbuff[22];
1922
 
    snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1923
 
             IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
1924
 
    /*
1925
 
      Raise a warining during registering on master/requesting dump.
1926
 
      Log a message reading event.
1927
 
    */
1928
 
    if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
1929
 
    {
1930
 
      mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1931
 
                 ER(ER_SLAVE_MASTER_COM_FAILURE),
1932
 
                 _(messages[SLAVE_RECON_MSG_COMMAND]), buf);
1933
 
    }
1934
 
    else
1935
 
    {
1936
 
      sql_print_information(buf);
1937
 
    }
1938
 
  }
1939
 
  if (safe_reconnect(thd, drizzle, mi, 1) || io_slave_killed(thd, mi))
1940
 
  {
1941
 
    if (global_system_variables.log_warnings)
1942
 
      sql_print_information(_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1943
 
    return 1;
1944
 
  }
1945
 
  return 0;
1946
 
}
1947
 
 
1948
 
 
1949
 
/* Slave I/O Thread entry point */
1950
 
 
1951
 
pthread_handler_t handle_slave_io(void *arg)
1952
 
{
1953
 
  THD *thd; // needs to be first for thread_stack
1954
 
  DRIZZLE *drizzle;
1955
 
  Master_info *mi = (Master_info*)arg;
1956
 
  Relay_log_info *rli= &mi->rli;
1957
 
  char llbuff[22];
1958
 
  uint32_t retry_count;
1959
 
  bool suppress_warnings;
1960
 
  uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
1961
 
  my_thread_init();
1962
 
 
1963
 
  assert(mi->inited);
1964
 
  drizzle= NULL ;
1965
 
  retry_count= 0;
1966
 
 
1967
 
  pthread_mutex_lock(&mi->run_lock);
1968
 
  /* Inform waiting threads that slave has started */
1969
 
  mi->slave_run_id++;
1970
 
 
1971
 
  mi->events_till_disconnect = disconnect_slave_event_count;
1972
 
 
1973
 
  thd= new THD;
1974
 
  THD_CHECK_SENTRY(thd);
1975
 
  mi->io_thd = thd;
1976
 
 
1977
 
  pthread_detach_this_thread();
1978
 
  thd->thread_stack= (char*) &thd; // remember where our stack is
1979
 
  if (init_slave_thread(thd, SLAVE_THD_IO))
1980
 
  {
1981
 
    pthread_cond_broadcast(&mi->start_cond);
1982
 
    pthread_mutex_unlock(&mi->run_lock);
1983
 
    sql_print_error(_("Failed during slave I/O thread initialization"));
1984
 
    goto err;
1985
 
  }
1986
 
  pthread_mutex_lock(&LOCK_thread_count);
1987
 
  threads.append(thd);
1988
 
  pthread_mutex_unlock(&LOCK_thread_count);
1989
 
  mi->slave_running = 1;
1990
 
  mi->abort_slave = 0;
1991
 
  pthread_mutex_unlock(&mi->run_lock);
1992
 
  pthread_cond_broadcast(&mi->start_cond);
1993
 
 
1994
 
  if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
1995
 
  {
1996
 
    mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
1997
 
               ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
1998
 
    goto err;
1999
 
  }
2000
 
 
2001
 
  thd_proc_info(thd, "Connecting to master");
2002
 
  // we can get killed during safe_connect
2003
 
  if (!safe_connect(thd, drizzle, mi))
2004
 
  {
2005
 
    sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
2006
 
                            "replication started in log '%s' at position %s"),
2007
 
                          mi->user, mi->host, mi->port,
2008
 
                          IO_RPL_LOG_NAME,
2009
 
                          llstr(mi->master_log_pos,llbuff));
2010
 
  /*
2011
 
    Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
2012
 
    thread, since a replication event can become this much larger than
2013
 
    the corresponding packet (query) sent from client to master.
2014
 
  */
2015
 
    drizzle->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
2016
 
  }
2017
 
  else
2018
 
  {
2019
 
    sql_print_information(_("Slave I/O thread killed while connecting to master"));
2020
 
    goto err;
2021
 
  }
2022
 
 
2023
 
connected:
2024
 
 
2025
 
  // TODO: the assignment below should be under mutex (5.0)
2026
 
  mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
2027
 
  thd->slave_net = &drizzle->net;
2028
 
  thd_proc_info(thd, "Checking master version");
2029
 
  if (get_master_version_and_clock(drizzle, mi))
2030
 
    goto err;
2031
 
  
2032
 
  if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
2033
 
  {
2034
 
    /*
2035
 
      Register ourselves with the master.
2036
 
    */
2037
 
    thd_proc_info(thd, "Registering slave on master");
2038
 
    if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2039
 
    {
2040
 
      if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
2041
 
                                 "while registering slave on master"))
2042
 
      {
2043
 
        sql_print_error(_("Slave I/O thread couldn't register on master"));
2044
 
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2045
 
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
2046
 
          goto err;
2047
 
      }
2048
 
      else
2049
 
        goto err;
2050
 
      goto connected;
2051
 
    }
2052
 
    if (!retry_count_reg)
2053
 
    {
2054
 
      retry_count_reg++;
2055
 
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
2056
 
      if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2057
 
                           reconnect_messages[SLAVE_RECON_ACT_REG]))
2058
 
        goto err;
2059
 
      goto connected;
2060
 
    }
2061
 
  }
2062
 
 
2063
 
  while (!io_slave_killed(thd,mi))
2064
 
  {
2065
 
    thd_proc_info(thd, "Requesting binlog dump");
2066
 
    if (request_dump(drizzle, mi, &suppress_warnings))
2067
 
    {
2068
 
      sql_print_error(_("Failed on request_dump()"));
2069
 
      if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while \
2070
 
requesting master dump")) ||
2071
 
          try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2072
 
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2073
 
        goto err;
2074
 
      goto connected;
2075
 
    }
2076
 
    if (!retry_count_dump)
2077
 
    {
2078
 
      retry_count_dump++;
2079
 
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
2080
 
      if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2081
 
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2082
 
        goto err;
2083
 
      goto connected;
2084
 
    }
2085
 
 
2086
 
    while (!io_slave_killed(thd,mi))
2087
 
    {
2088
 
      uint32_t event_len;
2089
 
      /*
2090
 
        We say "waiting" because read_event() will wait if there's nothing to
2091
 
        read. But if there's something to read, it will not wait. The
2092
 
        important thing is to not confuse users by saying "reading" whereas
2093
 
        we're in fact receiving nothing.
2094
 
      */
2095
 
      thd_proc_info(thd, _("Waiting for master to send event"));
2096
 
      event_len= read_event(drizzle, mi, &suppress_warnings);
2097
 
      if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while "
2098
 
                                           "reading event")))
2099
 
        goto err;
2100
 
      if (!retry_count_event)
2101
 
      {
2102
 
        retry_count_event++;
2103
 
        sql_print_information(_("Forcing to reconnect slave I/O thread"));
2104
 
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2105
 
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2106
 
          goto err;
2107
 
        goto connected;
2108
 
      }
2109
 
 
2110
 
      if (event_len == packet_error)
2111
 
      {
2112
 
        uint32_t drizzle_error_number= drizzle_errno(drizzle);
2113
 
        switch (drizzle_error_number) {
2114
 
        case CR_NET_PACKET_TOO_LARGE:
2115
 
          sql_print_error(_("Log entry on master is longer than "
2116
 
                            "max_allowed_packet (%ld) on "
2117
 
                            "slave. If the entry is correct, restart the "
2118
 
                            "server with a higher value of "
2119
 
                            "max_allowed_packet"),
2120
 
                          thd->variables.max_allowed_packet);
2121
 
          goto err;
2122
 
        case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2123
 
          sql_print_error(ER(drizzle_error_number), drizzle_error_number,
2124
 
                          drizzle_error(drizzle));
2125
 
          goto err;
2126
 
        case EE_OUTOFMEMORY:
2127
 
        case ER_OUTOFMEMORY:
2128
 
          sql_print_error(
2129
 
       _("Stopping slave I/O thread due to out-of-memory error from master"));
2130
 
          goto err;
2131
 
        }
2132
 
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2133
 
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2134
 
          goto err;
2135
 
        goto connected;
2136
 
      } // if (event_len == packet_error)
2137
 
 
2138
 
      retry_count=0;                    // ok event, reset retry counter
2139
 
      thd_proc_info(thd, _("Queueing master event to the relay log"));
2140
 
      if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2141
 
      {
2142
 
        goto err;
2143
 
      }
2144
 
      if (flush_master_info(mi, 1))
2145
 
      {
2146
 
        sql_print_error(_("Failed to flush master info file"));
2147
 
        goto err;
2148
 
      }
2149
 
      /*
2150
 
        See if the relay logs take too much space.
2151
 
        We don't lock mi->rli.log_space_lock here; this dirty read saves time
2152
 
        and does not introduce any problem:
2153
 
        - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
2154
 
        the clean value is 0), then we are reading only one more event as we
2155
 
        should, and we'll block only at the next event. No big deal.
2156
 
        - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
2157
 
        the clean value is 1), then we are going into wait_for_relay_log_space()
2158
 
        for no reason, but this function will do a clean read, notice the clean
2159
 
        value and exit immediately.
2160
 
      */
2161
 
      if (rli->log_space_limit && rli->log_space_limit <
2162
 
          rli->log_space_total &&
2163
 
          !rli->ignore_log_space_limit)
2164
 
        if (wait_for_relay_log_space(rli))
2165
 
        {
2166
 
          sql_print_error(_("Slave I/O thread aborted while waiting for "
2167
 
                            "relay log space"));
2168
 
          goto err;
2169
 
        }
2170
 
    }
2171
 
  }
2172
 
 
2173
 
// error = 0;
2174
 
err:
2175
 
// print the current replication position
2176
 
  sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2177
 
                          "position %s"),
2178
 
                        IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2179
 
  pthread_mutex_lock(&LOCK_thread_count);
2180
 
  thd->query = thd->db = 0; // extra safety
2181
 
  thd->query_length= thd->db_length= 0;
2182
 
  pthread_mutex_unlock(&LOCK_thread_count);
2183
 
  if (drizzle)
2184
 
  {
2185
 
    /*
2186
 
      Here we need to clear the active VIO before closing the
2187
 
      connection with the master.  The reason is that THD::awake()
2188
 
      might be called from terminate_slave_thread() because somebody
2189
 
      issued a STOP SLAVE.  If that happends, the close_active_vio()
2190
 
      can be called in the middle of closing the VIO associated with
2191
 
      the 'mysql' object, causing a crash.
2192
 
    */
2193
 
    drizzle_close(drizzle);
2194
 
    mi->drizzle=0;
2195
 
  }
2196
 
  write_ignored_events_info_to_relay_log(thd, mi);
2197
 
  thd_proc_info(thd, _("Waiting for slave mutex on exit"));
2198
 
  pthread_mutex_lock(&mi->run_lock);
2199
 
 
2200
 
  /* Forget the relay log's format */
2201
 
  delete mi->rli.relay_log.description_event_for_queue;
2202
 
  mi->rli.relay_log.description_event_for_queue= 0;
2203
 
  // TODO: make rpl_status part of Master_info
2204
 
  change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2205
 
  assert(thd->net.buff != 0);
2206
 
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
2207
 
  close_thread_tables(thd);
2208
 
  pthread_mutex_lock(&LOCK_thread_count);
2209
 
  THD_CHECK_SENTRY(thd);
2210
 
  delete thd;
2211
 
  pthread_mutex_unlock(&LOCK_thread_count);
2212
 
  mi->abort_slave= 0;
2213
 
  mi->slave_running= 0;
2214
 
  mi->io_thd= 0;
2215
 
  /*
2216
 
    Note: the order of the two following calls (first broadcast, then unlock)
2217
 
    is important. Otherwise a killer_thread can execute between the calls and
2218
 
    delete the mi structure leading to a crash! (see BUG#25306 for details)
2219
 
   */ 
2220
 
  pthread_cond_broadcast(&mi->stop_cond);       // tell the world we are done
2221
 
  pthread_mutex_unlock(&mi->run_lock);
2222
 
  my_thread_end();
2223
 
  pthread_exit(0);
2224
 
  return(0);                               // Can't return anything here
2225
 
}
2226
 
 
2227
 
 
2228
 
/* Slave SQL Thread entry point */
2229
 
 
2230
 
pthread_handler_t handle_slave_sql(void *arg)
2231
 
{
2232
 
  THD *thd;                     /* needs to be first for thread_stack */
2233
 
  char llbuff[22],llbuff1[22];
2234
 
 
2235
 
  Relay_log_info* rli = &((Master_info*)arg)->rli;
2236
 
  const char *errmsg;
2237
 
 
2238
 
  my_thread_init();
2239
 
 
2240
 
  assert(rli->inited);
2241
 
  pthread_mutex_lock(&rli->run_lock);
2242
 
  assert(!rli->slave_running);
2243
 
  errmsg= 0;
2244
 
  rli->events_till_abort = abort_slave_event_count;
2245
 
 
2246
 
  thd = new THD;
2247
 
  thd->thread_stack = (char*)&thd; // remember where our stack is
2248
 
  rli->sql_thd= thd;
2249
 
  
2250
 
  /* Inform waiting threads that slave has started */
2251
 
  rli->slave_run_id++;
2252
 
  rli->slave_running = 1;
2253
 
 
2254
 
  pthread_detach_this_thread();
2255
 
  if (init_slave_thread(thd, SLAVE_THD_SQL))
2256
 
  {
2257
 
    /*
2258
 
      TODO: this is currently broken - slave start and change master
2259
 
      will be stuck if we fail here
2260
 
    */
2261
 
    pthread_cond_broadcast(&rli->start_cond);
2262
 
    pthread_mutex_unlock(&rli->run_lock);
2263
 
    sql_print_error(_("Failed during slave thread initialization"));
2264
 
    goto err;
2265
 
  }
2266
 
  thd->init_for_queries();
2267
 
  thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
2268
 
  pthread_mutex_lock(&LOCK_thread_count);
2269
 
  threads.append(thd);
2270
 
  pthread_mutex_unlock(&LOCK_thread_count);
2271
 
  /*
2272
 
    We are going to set slave_running to 1. Assuming slave I/O thread is
2273
 
    alive and connected, this is going to make Seconds_Behind_Master be 0
2274
 
    i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
2275
 
    the moment we start we can think we are caught up, and the next second we
2276
 
    start receiving data so we realize we are not caught up and
2277
 
    Seconds_Behind_Master grows. No big deal.
2278
 
  */
2279
 
  rli->abort_slave = 0;
2280
 
  pthread_mutex_unlock(&rli->run_lock);
2281
 
  pthread_cond_broadcast(&rli->start_cond);
2282
 
 
2283
 
  /*
2284
 
    Reset errors for a clean start (otherwise, if the master is idle, the SQL
2285
 
    thread may execute no Query_log_event, so the error will remain even
2286
 
    though there's no problem anymore). Do not reset the master timestamp
2287
 
    (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
2288
 
    as we are not sure that we are going to receive a query, we want to
2289
 
    remember the last master timestamp (to say how many seconds behind we are
2290
 
    now.
2291
 
    But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
2292
 
  */
2293
 
  rli->clear_error();
2294
 
 
2295
 
  //tell the I/O thread to take relay_log_space_limit into account from now on
2296
 
  pthread_mutex_lock(&rli->log_space_lock);
2297
 
  rli->ignore_log_space_limit= 0;
2298
 
  pthread_mutex_unlock(&rli->log_space_lock);
2299
 
  rli->trans_retries= 0; // start from "no error"
2300
 
 
2301
 
  if (init_relay_log_pos(rli,
2302
 
                         rli->group_relay_log_name,
2303
 
                         rli->group_relay_log_pos,
2304
 
                         1 /*need data lock*/, &errmsg,
2305
 
                         1 /*look for a description_event*/))
2306
 
  {
2307
 
    sql_print_error(_("Error initializing relay log position: %s"),
2308
 
                    errmsg);
2309
 
    goto err;
2310
 
  }
2311
 
  THD_CHECK_SENTRY(thd);
2312
 
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2313
 
  /*
2314
 
    Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2315
 
    correct position when it's called just after my_b_seek() (the questionable
2316
 
    stuff is those "seek is done on next read" comments in the my_b_seek()
2317
 
    source code).
2318
 
    The crude reality is that this assertion randomly fails whereas
2319
 
    replication seems to work fine. And there is no easy explanation why it
2320
 
    fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
2321
 
    init_relay_log_pos() called above). Maybe the assertion would be
2322
 
    meaningful if we held rli->data_lock between the my_b_seek() and the
2323
 
    assert().
2324
 
  */
2325
 
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2326
 
  assert(rli->sql_thd == thd);
2327
 
 
2328
 
  if (global_system_variables.log_warnings)
2329
 
    sql_print_information(_("Slave SQL thread initialized, "
2330
 
                            "starting replication in log '%s' at "
2331
 
                            "position %s, relay log '%s' position: %s"),
2332
 
                            RPL_LOG_NAME,
2333
 
                          llstr(rli->group_master_log_pos,llbuff),
2334
 
                          rli->group_relay_log_name,
2335
 
                          llstr(rli->group_relay_log_pos,llbuff1));
2336
 
 
2337
 
  /* execute init_slave variable */
2338
 
  if (sys_init_slave.value_length)
2339
 
  {
2340
 
    execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
2341
 
    if (thd->is_slave_error)
2342
 
    {
2343
 
      sql_print_error(_("Slave SQL thread aborted. "
2344
 
                        "Can't execute init_slave query"));
2345
 
      goto err;
2346
 
    }
2347
 
  }
2348
 
 
2349
 
  /*
2350
 
    First check until condition - probably there is nothing to execute. We
2351
 
    do not want to wait for next event in this case.
2352
 
  */
2353
 
  pthread_mutex_lock(&rli->data_lock);
2354
 
  if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
2355
 
      rli->is_until_satisfied(rli->group_master_log_pos))
2356
 
  {
2357
 
    char buf[22];
2358
 
    sql_print_information(_("Slave SQL thread stopped because it reached its"
2359
 
                            " UNTIL position %s"), llstr(rli->until_pos(), buf));
2360
 
    pthread_mutex_unlock(&rli->data_lock);
2361
 
    goto err;
2362
 
  }
2363
 
  pthread_mutex_unlock(&rli->data_lock);
2364
 
 
2365
 
  /* Read queries from the IO/THREAD until this thread is killed */
2366
 
 
2367
 
  while (!sql_slave_killed(thd,rli))
2368
 
  {
2369
 
    thd_proc_info(thd, _("Reading event from the relay log"));
2370
 
    assert(rli->sql_thd == thd);
2371
 
    THD_CHECK_SENTRY(thd);
2372
 
    if (exec_relay_log_event(thd,rli))
2373
 
    {
2374
 
      // do not scare the user if SQL thread was simply killed or stopped
2375
 
      if (!sql_slave_killed(thd,rli))
2376
 
      {
2377
 
        /*
2378
 
          retrieve as much info as possible from the thd and, error
2379
 
          codes and warnings and print this to the error log as to
2380
 
          allow the user to locate the error
2381
 
        */
2382
 
        uint32_t const last_errno= rli->last_error().number;
2383
 
 
2384
 
        if (thd->is_error())
2385
 
        {
2386
 
          char const *const errmsg= thd->main_da.message();
2387
 
 
2388
 
          if (last_errno == 0)
2389
 
          {
2390
 
            rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), errmsg);
2391
 
          }
2392
 
          else if (last_errno != thd->main_da.sql_errno())
2393
 
          {
2394
 
            sql_print_error(_("Slave (additional info): %s Error_code: %d"),
2395
 
                            errmsg, thd->main_da.sql_errno());
2396
 
          }
2397
 
        }
2398
 
 
2399
 
        /* Print any warnings issued */
2400
 
        List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
2401
 
        DRIZZLE_ERROR *err;
2402
 
        /*
2403
 
          Added controlled slave thread cancel for replication
2404
 
          of user-defined variables.
2405
 
        */
2406
 
        bool udf_error = false;
2407
 
        while ((err= it++))
2408
 
        {
2409
 
          if (err->code == ER_CANT_OPEN_LIBRARY)
2410
 
            udf_error = true;
2411
 
          sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
2412
 
        }
2413
 
        if (udf_error)
2414
 
          sql_print_error(_("Error loading user-defined library, slave SQL "
2415
 
                            "thread aborted. Install the missing library, "
2416
 
                            "and restart the slave SQL thread with "
2417
 
                            "\"SLAVE START\". We stopped at log '%s' "
2418
 
                            "position %s"),
2419
 
                          RPL_LOG_NAME, llstr(rli->group_master_log_pos,
2420
 
            llbuff));
2421
 
        else
2422
 
          sql_print_error(_("Error running query, slave SQL thread aborted. "
2423
 
                            "Fix the problem, and restart "
2424
 
                            "the slave SQL thread with \"SLAVE START\". "
2425
 
                            "We stopped at log '%s' position %s"),
2426
 
                          RPL_LOG_NAME,
2427
 
                          llstr(rli->group_master_log_pos, llbuff));
2428
 
      }
2429
 
      goto err;
2430
 
    }
2431
 
  }
2432
 
 
2433
 
  /* Thread stopped. Print the current replication position to the log */
2434
 
  sql_print_information(_("Slave SQL thread exiting, replication stopped in "
2435
 
                          "log '%s' at position %s"),
2436
 
                        RPL_LOG_NAME,
2437
 
                        llstr(rli->group_master_log_pos,llbuff));
2438
 
 
2439
 
 err:
2440
 
 
2441
 
  /*
2442
 
    Some events set some playgrounds, which won't be cleared because thread
2443
 
    stops. Stopping of this thread may not be known to these events ("stop"
2444
 
    request is detected only by the present function, not by events), so we
2445
 
    must "proactively" clear playgrounds:
2446
 
  */
2447
 
  rli->cleanup_context(thd, 1);
2448
 
  pthread_mutex_lock(&LOCK_thread_count);
2449
 
  /*
2450
 
    Some extra safety, which should not been needed (normally, event deletion
2451
 
    should already have done these assignments (each event which sets these
2452
 
    variables is supposed to set them to 0 before terminating)).
2453
 
  */
2454
 
  thd->query= thd->db= thd->catalog= 0;
2455
 
  thd->query_length= thd->db_length= 0;
2456
 
  pthread_mutex_unlock(&LOCK_thread_count);
2457
 
  thd_proc_info(thd, "Waiting for slave mutex on exit");
2458
 
  pthread_mutex_lock(&rli->run_lock);
2459
 
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
2460
 
  pthread_mutex_lock(&rli->data_lock);
2461
 
  assert(rli->slave_running == 1); // tracking buffer overrun
2462
 
  /* When master_pos_wait() wakes up it will check this and terminate */
2463
 
  rli->slave_running= 0;
2464
 
  /* Forget the relay log's format */
2465
 
  delete rli->relay_log.description_event_for_exec;
2466
 
  rli->relay_log.description_event_for_exec= 0;
2467
 
  /* Wake up master_pos_wait() */
2468
 
  pthread_mutex_unlock(&rli->data_lock);
2469
 
  pthread_cond_broadcast(&rli->data_cond);
2470
 
  rli->ignore_log_space_limit= 0; /* don't need any lock */
2471
 
  /* we die so won't remember charset - re-update them on next thread start */
2472
 
  rli->cached_charset_invalidate();
2473
 
  rli->save_temporary_tables = thd->temporary_tables;
2474
 
 
2475
 
  /*
2476
 
    TODO: see if we can do this conditionally in next_event() instead
2477
 
    to avoid unneeded position re-init
2478
 
  */
2479
 
  thd->temporary_tables = 0; // remove tempation from destructor to close them
2480
 
  assert(thd->net.buff != 0);
2481
 
  net_end(&thd->net); // destructor will not free it, because we are weird
2482
 
  assert(rli->sql_thd == thd);
2483
 
  THD_CHECK_SENTRY(thd);
2484
 
  rli->sql_thd= 0;
2485
 
  pthread_mutex_lock(&LOCK_thread_count);
2486
 
  THD_CHECK_SENTRY(thd);
2487
 
  delete thd;
2488
 
  pthread_mutex_unlock(&LOCK_thread_count);
2489
 
 /*
2490
 
  Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
2491
 
  is important. Otherwise a killer_thread can execute between the calls and
2492
 
  delete the mi structure leading to a crash! (see BUG#25306 for details)
2493
 
 */ 
2494
 
  pthread_cond_broadcast(&rli->stop_cond);
2495
 
  pthread_mutex_unlock(&rli->run_lock);  // tell the world we are done
2496
 
  
2497
 
  my_thread_end();
2498
 
  pthread_exit(0);
2499
 
  return(0);                               // Can't return anything here
2500
 
}
2501
 
 
2502
 
 
2503
 
/*
2504
 
  process_io_create_file()
2505
 
*/
2506
 
 
2507
 
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
2508
 
{
2509
 
  int32_t error = 1;
2510
 
  uint32_t num_bytes;
2511
 
  bool cev_not_written;
2512
 
  THD *thd = mi->io_thd;
2513
 
  NET *net = &mi->drizzle->net;
2514
 
 
2515
 
  if (unlikely(!cev->is_valid()))
2516
 
    return(1);
2517
 
 
2518
 
  if (!rpl_filter->db_ok(cev->db))
2519
 
  {
2520
 
    skip_load_data_infile(net);
2521
 
    return(0);
2522
 
  }
2523
 
  assert(cev->inited_from_old);
2524
 
  thd->file_id = cev->file_id = mi->file_id++;
2525
 
  thd->server_id = cev->server_id;
2526
 
  cev_not_written = 1;
2527
 
 
2528
 
  if (unlikely(net_request_file(net,cev->fname)))
2529
 
  {
2530
 
    sql_print_error(_("Slave I/O: failed requesting download of '%s'"),
2531
 
                    cev->fname);
2532
 
    goto err;
2533
 
  }
2534
 
 
2535
 
  /*
2536
 
    This dummy block is so we could instantiate Append_block_log_event
2537
 
    once and then modify it slightly instead of doing it multiple times
2538
 
    in the loop
2539
 
  */
2540
 
  {
2541
 
    Append_block_log_event aev(thd,0,0,0,0);
2542
 
 
2543
 
    for (;;)
2544
 
    {
2545
 
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
2546
 
      {
2547
 
        sql_print_error(_("Network read error downloading '%s' from master"),
2548
 
                        cev->fname);
2549
 
        goto err;
2550
 
      }
2551
 
      if (unlikely(!num_bytes)) /* eof */
2552
 
      {
2553
 
        /* 3.23 master wants it */
2554
 
        net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0);
2555
 
        /*
2556
 
          If we wrote Create_file_log_event, then we need to write
2557
 
          Execute_load_log_event. If we did not write Create_file_log_event,
2558
 
          then this is an empty file and we can just do as if the LOAD DATA
2559
 
          INFILE had not existed, i.e. write nothing.
2560
 
        */
2561
 
        if (unlikely(cev_not_written))
2562
 
          break;
2563
 
        Execute_load_log_event xev(thd,0,0);
2564
 
        xev.log_pos = cev->log_pos;
2565
 
        if (unlikely(mi->rli.relay_log.append(&xev)))
2566
 
        {
2567
 
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2568
 
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2569
 
                     _("error writing Exec_load event to relay log"));
2570
 
          goto err;
2571
 
        }
2572
 
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2573
 
        break;
2574
 
      }
2575
 
      if (unlikely(cev_not_written))
2576
 
      {
2577
 
        cev->block = net->read_pos;
2578
 
        cev->block_len = num_bytes;
2579
 
        if (unlikely(mi->rli.relay_log.append(cev)))
2580
 
        {
2581
 
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2582
 
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2583
 
                     _("error writing Create_file event to relay log"));
2584
 
          goto err;
2585
 
        }
2586
 
        cev_not_written=0;
2587
 
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2588
 
      }
2589
 
      else
2590
 
      {
2591
 
        aev.block = net->read_pos;
2592
 
        aev.block_len = num_bytes;
2593
 
        aev.log_pos = cev->log_pos;
2594
 
        if (unlikely(mi->rli.relay_log.append(&aev)))
2595
 
        {
2596
 
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2597
 
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2598
 
                     _("error writing Append_block event to relay log"));
2599
 
          goto err;
2600
 
        }
2601
 
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
2602
 
      }
2603
 
    }
2604
 
  }
2605
 
  error=0;
2606
 
err:
2607
 
  return(error);
2608
 
}
2609
 
 
2610
 
 
2611
 
/*
2612
 
  Start using a new binary log on the master
2613
 
 
2614
 
  SYNOPSIS
2615
 
    process_io_rotate()
2616
 
    mi                  master_info for the slave
2617
 
    rev                 The rotate log event read from the binary log
2618
 
 
2619
 
  DESCRIPTION
2620
 
    Updates the master info with the place in the next binary
2621
 
    log where we should start reading.
2622
 
    Rotate the relay log to avoid mixed-format relay logs.
2623
 
 
2624
 
  NOTES
2625
 
    We assume we already locked mi->data_lock
2626
 
 
2627
 
  RETURN VALUES
2628
 
    0           ok
2629
 
    1           Log event is illegal
2630
 
 
2631
 
*/
2632
 
 
2633
 
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
2634
 
{
2635
 
  safe_mutex_assert_owner(&mi->data_lock);
2636
 
 
2637
 
  if (unlikely(!rev->is_valid()))
2638
 
    return(1);
2639
 
 
2640
 
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2641
 
  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
2642
 
  mi->master_log_pos= rev->pos;
2643
 
  /*
2644
 
    If we do not do this, we will be getting the first
2645
 
    rotate event forever, so we need to not disconnect after one.
2646
 
  */
2647
 
  if (disconnect_slave_event_count)
2648
 
    mi->events_till_disconnect++;
2649
 
 
2650
 
  /*
2651
 
    If description_event_for_queue is format <4, there is conversion in the
2652
 
    relay log to the slave's format (4). And Rotate can mean upgrade or
2653
 
    nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
2654
 
    no need to reset description_event_for_queue now. And if it's nothing (same
2655
 
    master version as before), no need (still using the slave's format).
2656
 
  */
2657
 
  if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
2658
 
  {
2659
 
    delete mi->rli.relay_log.description_event_for_queue;
2660
 
    /* start from format 3 (DRIZZLE 4.0) again */
2661
 
    mi->rli.relay_log.description_event_for_queue= new
2662
 
      Format_description_log_event(3);
2663
 
  }
2664
 
  /*
2665
 
    Rotate the relay log makes binlog format detection easier (at next slave
2666
 
    start or mysqlbinlog)
2667
 
  */
2668
 
  rotate_relay_log(mi); /* will take the right mutexes */
2669
 
  return(0);
2670
 
}
2671
 
 
2672
 
/*
2673
 
  Reads a 3.23 event and converts it to the slave's format. This code was
2674
 
  copied from DRIZZLE 4.0.
2675
 
*/
2676
 
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
2677
 
                           uint32_t event_len)
2678
 
{
2679
 
  const char *errmsg = 0;
2680
 
  uint32_t inc_pos;
2681
 
  bool ignore_event= 0;
2682
 
  char *tmp_buf = 0;
2683
 
  Relay_log_info *rli= &mi->rli;
2684
 
 
2685
 
  /*
2686
 
    If we get Load event, we need to pass a non-reusable buffer
2687
 
    to read_log_event, so we do a trick
2688
 
  */
2689
 
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
2690
 
  {
2691
 
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
2692
 
    {
2693
 
      mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2694
 
                 ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
2695
 
      return(1);
2696
 
    }
2697
 
    memcpy(tmp_buf,buf,event_len);
2698
 
    /*
2699
 
      Create_file constructor wants a 0 as last char of buffer, this 0 will
2700
 
      serve as the string-termination char for the file's name (which is at the
2701
 
      end of the buffer)
2702
 
      We must increment event_len, otherwise the event constructor will not see
2703
 
      this end 0, which leads to segfault.
2704
 
    */
2705
 
    tmp_buf[event_len++]=0;
2706
 
    int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
2707
 
    buf = (const char*)tmp_buf;
2708
 
  }
2709
 
  /*
2710
 
    This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
2711
 
    send the loaded file, and write it to the relay log in the form of
2712
 
    Append_block/Exec_load (the SQL thread needs the data, as that thread is not
2713
 
    connected to the master).
2714
 
  */
2715
 
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2716
 
                                            mi->rli.relay_log.description_event_for_queue);
2717
 
  if (unlikely(!ev))
2718
 
  {
2719
 
    sql_print_error(_("Read invalid event from master: '%s', "
2720
 
                      "master could be corrupt but a more likely cause "
2721
 
                      "of this is a bug"),
2722
 
                    errmsg);
2723
 
    free((char*) tmp_buf);
2724
 
    return(1);
2725
 
  }
2726
 
 
2727
 
  pthread_mutex_lock(&mi->data_lock);
2728
 
  ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
2729
 
  switch (ev->get_type_code()) {
2730
 
  case STOP_EVENT:
2731
 
    ignore_event= 1;
2732
 
    inc_pos= event_len;
2733
 
    break;
2734
 
  case ROTATE_EVENT:
2735
 
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2736
 
    {
2737
 
      delete ev;
2738
 
      pthread_mutex_unlock(&mi->data_lock);
2739
 
      return(1);
2740
 
    }
2741
 
    inc_pos= 0;
2742
 
    break;
2743
 
  case CREATE_FILE_EVENT:
2744
 
    /*
2745
 
      Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
2746
 
      queue_old_event() which is for 3.23 events which don't comprise
2747
 
      CREATE_FILE_EVENT. This is because read_log_event() above has just
2748
 
      transformed LOAD_EVENT into CREATE_FILE_EVENT.
2749
 
    */
2750
 
  {
2751
 
    /* We come here when and only when tmp_buf != 0 */
2752
 
    assert(tmp_buf != 0);
2753
 
    inc_pos=event_len;
2754
 
    ev->log_pos+= inc_pos;
2755
 
    int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2756
 
    delete ev;
2757
 
    mi->master_log_pos += inc_pos;
2758
 
    pthread_mutex_unlock(&mi->data_lock);
2759
 
    free((char*)tmp_buf);
2760
 
    return(error);
2761
 
  }
2762
 
  default:
2763
 
    inc_pos= event_len;
2764
 
    break;
2765
 
  }
2766
 
  if (likely(!ignore_event))
2767
 
  {
2768
 
    if (ev->log_pos)
2769
 
      /*
2770
 
         Don't do it for fake Rotate events (see comment in
2771
 
      Log_event::Log_event(const char* buf...) in log_event.cc).
2772
 
      */
2773
 
      ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
2774
 
    if (unlikely(rli->relay_log.append(ev)))
2775
 
    {
2776
 
      delete ev;
2777
 
      pthread_mutex_unlock(&mi->data_lock);
2778
 
      return(1);
2779
 
    }
2780
 
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2781
 
  }
2782
 
  delete ev;
2783
 
  mi->master_log_pos+= inc_pos;
2784
 
  pthread_mutex_unlock(&mi->data_lock);
2785
 
  return(0);
2786
 
}
2787
 
 
2788
 
/*
2789
 
  Reads a 4.0 event and converts it to the slave's format. This code was copied
2790
 
  from queue_binlog_ver_1_event(), with some affordable simplifications.
2791
 
*/
2792
 
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
2793
 
                           uint32_t event_len)
2794
 
{
2795
 
  const char *errmsg = 0;
2796
 
  uint32_t inc_pos;
2797
 
  char *tmp_buf = 0;
2798
 
  Relay_log_info *rli= &mi->rli;
2799
 
 
2800
 
  /* read_log_event() will adjust log_pos to be end_log_pos */
2801
 
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2802
 
                                            mi->rli.relay_log.description_event_for_queue);
2803
 
  if (unlikely(!ev))
2804
 
  {
2805
 
    sql_print_error(_("Read invalid event from master: '%s', "
2806
 
                      "master could be corrupt but a more likely cause of "
2807
 
                      "this is a bug"),
2808
 
                    errmsg);
2809
 
    free((char*) tmp_buf);
2810
 
    return(1);
2811
 
  }
2812
 
  pthread_mutex_lock(&mi->data_lock);
2813
 
  switch (ev->get_type_code()) {
2814
 
  case STOP_EVENT:
2815
 
    goto err;
2816
 
  case ROTATE_EVENT:
2817
 
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2818
 
    {
2819
 
      delete ev;
2820
 
      pthread_mutex_unlock(&mi->data_lock);
2821
 
      return(1);
2822
 
    }
2823
 
    inc_pos= 0;
2824
 
    break;
2825
 
  default:
2826
 
    inc_pos= event_len;
2827
 
    break;
2828
 
  }
2829
 
  if (unlikely(rli->relay_log.append(ev)))
2830
 
  {
2831
 
    delete ev;
2832
 
    pthread_mutex_unlock(&mi->data_lock);
2833
 
    return(1);
2834
 
  }
2835
 
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2836
 
  delete ev;
2837
 
  mi->master_log_pos+= inc_pos;
2838
 
err:
2839
 
  pthread_mutex_unlock(&mi->data_lock);
2840
 
  return(0);
2841
 
}
2842
 
 
2843
 
/*
2844
 
  queue_old_event()
2845
 
 
2846
 
  Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
2847
 
  (exactly, slave's) format. To do the conversion, we create a 5.0 event from
2848
 
  the 3.23/4.0 bytes, then write this event to the relay log.
2849
 
 
2850
 
  TODO:
2851
 
    Test this code before release - it has to be tested on a separate
2852
 
    setup with 3.23 master or 4.0 master
2853
 
*/
2854
 
 
2855
 
static int32_t queue_old_event(Master_info *mi, const char *buf,
2856
 
                           uint32_t event_len)
2857
 
{
2858
 
  switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
2859
 
  {
2860
 
  case 1:
2861
 
      return(queue_binlog_ver_1_event(mi,buf,event_len));
2862
 
  case 3:
2863
 
      return(queue_binlog_ver_3_event(mi,buf,event_len));
2864
 
  default: /* unsupported format; eg version 2 */
2865
 
    return(1);
2866
 
  }
2867
 
}
2868
 
 
2869
 
/*
2870
 
  queue_event()
2871
 
 
2872
 
  If the event is 3.23/4.0, passes it to queue_old_event() which will convert
2873
 
  it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
2874
 
  no format conversion, it's pure read/write of bytes.
2875
 
  So a 5.0.0 slave's relay log can contain events in the slave's format or in
2876
 
  any >=5.0.0 format.
2877
 
*/
2878
 
 
2879
 
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
2880
 
{
2881
 
  int32_t error= 0;
2882
 
  String error_msg;
2883
 
  uint32_t inc_pos= 0;
2884
 
  Relay_log_info *rli= &mi->rli;
2885
 
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
2886
 
 
2887
 
 
2888
 
  if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
2889
 
      buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
2890
 
    return(queue_old_event(mi,buf,event_len));
2891
 
 
2892
 
  pthread_mutex_lock(&mi->data_lock);
2893
 
 
2894
 
  switch (buf[EVENT_TYPE_OFFSET]) {
2895
 
  case STOP_EVENT:
2896
 
    /*
2897
 
      We needn't write this event to the relay log. Indeed, it just indicates a
2898
 
      master server shutdown. The only thing this does is cleaning. But
2899
 
      cleaning is already done on a per-master-thread basis (as the master
2900
 
      server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
2901
 
      prepared statements' deletion are TODO only when we binlog prep stmts).
2902
 
 
2903
 
      We don't even increment mi->master_log_pos, because we may be just after
2904
 
      a Rotate event. Btw, in a few milliseconds we are going to have a Start
2905
 
      event from the next binlog (unless the master is presently running
2906
 
      without --log-bin).
2907
 
    */
2908
 
    goto err;
2909
 
  case ROTATE_EVENT:
2910
 
  {
2911
 
    Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
2912
 
    if (unlikely(process_io_rotate(mi,&rev)))
2913
 
    {
2914
 
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2915
 
      goto err;
2916
 
    }
2917
 
    /*
2918
 
      Now the I/O thread has just changed its mi->master_log_name, so
2919
 
      incrementing mi->master_log_pos is nonsense.
2920
 
    */
2921
 
    inc_pos= 0;
2922
 
    break;
2923
 
  }
2924
 
  case FORMAT_DESCRIPTION_EVENT:
2925
 
  {
2926
 
    /*
2927
 
      Create an event, and save it (when we rotate the relay log, we will have
2928
 
      to write this event again).
2929
 
    */
2930
 
    /*
2931
 
      We are the only thread which reads/writes description_event_for_queue.
2932
 
      The relay_log struct does not move (though some members of it can
2933
 
      change), so we needn't any lock (no rli->data_lock, no log lock).
2934
 
    */
2935
 
    Format_description_log_event* tmp;
2936
 
    const char* errmsg;
2937
 
    if (!(tmp= (Format_description_log_event*)
2938
 
          Log_event::read_log_event(buf, event_len, &errmsg,
2939
 
                                    mi->rli.relay_log.description_event_for_queue)))
2940
 
    {
2941
 
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2942
 
      goto err;
2943
 
    }
2944
 
    delete mi->rli.relay_log.description_event_for_queue;
2945
 
    mi->rli.relay_log.description_event_for_queue= tmp;
2946
 
    /*
2947
 
       Though this does some conversion to the slave's format, this will
2948
 
       preserve the master's binlog format version, and number of event types.
2949
 
    */
2950
 
    /*
2951
 
       If the event was not requested by the slave (the slave did not ask for
2952
 
       it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
2953
 
    */
2954
 
    inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
2955
 
  }
2956
 
  break;
2957
 
 
2958
 
  case HEARTBEAT_LOG_EVENT:
2959
 
  {
2960
 
    /*
2961
 
      HB (heartbeat) cannot come before RL (Relay)
2962
 
    */
2963
 
    char  llbuf[22];
2964
 
    Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
2965
 
    if (!hb.is_valid())
2966
 
    {
2967
 
      error= ER_SLAVE_HEARTBEAT_FAILURE;
2968
 
      error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2969
 
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2970
 
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2971
 
      error_msg.append(STRING_WITH_LEN(" log_pos "));
2972
 
      llstr(hb.log_pos, llbuf);
2973
 
      error_msg.append(llbuf, strlen(llbuf));
2974
 
      goto err;
2975
 
    }
2976
 
    mi->received_heartbeats++;
2977
 
    /* 
2978
 
       compare local and event's versions of log_file, log_pos.
2979
 
       
2980
 
       Heartbeat is sent only after an event corresponding to the corrdinates
2981
 
       the heartbeat carries.
2982
 
       Slave can not have a difference in coordinates except in the only
2983
 
       special case when mi->master_log_name, master_log_pos have never
2984
 
       been updated by Rotate event i.e when slave does not have any history
2985
 
       with the master (and thereafter mi->master_log_pos is NULL).
2986
 
 
2987
 
       TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2988
 
    */
2989
 
    if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
2990
 
         && mi->master_log_name != NULL)
2991
 
        || mi->master_log_pos != hb.log_pos)
2992
 
    {
2993
 
      /* missed events of heartbeat from the past */
2994
 
      error= ER_SLAVE_HEARTBEAT_FAILURE;
2995
 
      error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
2996
 
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2997
 
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2998
 
      error_msg.append(STRING_WITH_LEN(" log_pos "));
2999
 
      llstr(hb.log_pos, llbuf);
3000
 
      error_msg.append(llbuf, strlen(llbuf));
3001
 
      goto err;
3002
 
    }
3003
 
    goto skip_relay_logging;
3004
 
  }
3005
 
  break;
3006
 
    
3007
 
  default:
3008
 
    inc_pos= event_len;
3009
 
    break;
3010
 
  }
3011
 
 
3012
 
  /*
3013
 
     If this event is originating from this server, don't queue it.
3014
 
     We don't check this for 3.23 events because it's simpler like this; 3.23
3015
 
     will be filtered anyway by the SQL slave thread which also tests the
3016
 
     server id (we must also keep this test in the SQL thread, in case somebody
3017
 
     upgrades a 4.0 slave which has a not-filtered relay log).
3018
 
 
3019
 
     ANY event coming from ourselves can be ignored: it is obvious for queries;
3020
 
     for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
3021
 
     (--log-slave-updates would not log that) unless this slave is also its
3022
 
     direct master (an unsupported, useless setup!).
3023
 
  */
3024
 
 
3025
 
  pthread_mutex_lock(log_lock);
3026
 
 
3027
 
  if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
3028
 
      !mi->rli.replicate_same_server_id)
3029
 
  {
3030
 
    /*
3031
 
      Do not write it to the relay log.
3032
 
      a) We still want to increment mi->master_log_pos, so that we won't
3033
 
      re-read this event from the master if the slave IO thread is now
3034
 
      stopped/restarted (more efficient if the events we are ignoring are big
3035
 
      LOAD DATA INFILE).
3036
 
      b) We want to record that we are skipping events, for the information of
3037
 
      the slave SQL thread, otherwise that thread may let
3038
 
      rli->group_relay_log_pos stay too small if the last binlog's event is
3039
 
      ignored.
3040
 
      But events which were generated by this slave and which do not exist in
3041
 
      the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
3042
 
      mi->master_log_pos.
3043
 
    */
3044
 
    if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
3045
 
        buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3046
 
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3047
 
    {
3048
 
      mi->master_log_pos+= inc_pos;
3049
 
      memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
3050
 
      assert(rli->ign_master_log_name_end[0]);
3051
 
      rli->ign_master_log_pos_end= mi->master_log_pos;
3052
 
    }
3053
 
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3054
 
  }
3055
 
  else
3056
 
  {
3057
 
    /* write the event to the relay log */
3058
 
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3059
 
    {
3060
 
      mi->master_log_pos+= inc_pos;
3061
 
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3062
 
    }
3063
 
    else
3064
 
    {
3065
 
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
3066
 
    }
3067
 
    rli->ign_master_log_name_end[0]= 0; // last event is not ignored
3068
 
  }
3069
 
  pthread_mutex_unlock(log_lock);
3070
 
 
3071
 
skip_relay_logging:
3072
 
  
3073
 
err:
3074
 
  pthread_mutex_unlock(&mi->data_lock);
3075
 
  if (error)
3076
 
    mi->report(ERROR_LEVEL, error, ER(error),
3077
 
               (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
3078
 
               _("could not queue event from master") :
3079
 
               error_msg.ptr());
3080
 
  return(error);
3081
 
}
3082
 
 
3083
 
 
3084
 
void end_relay_log_info(Relay_log_info* rli)
3085
 
{
3086
 
  if (!rli->inited)
3087
 
    return;
3088
 
  if (rli->info_fd >= 0)
3089
 
  {
3090
 
    end_io_cache(&rli->info_file);
3091
 
    (void) my_close(rli->info_fd, MYF(MY_WME));
3092
 
    rli->info_fd = -1;
3093
 
  }
3094
 
  if (rli->cur_log_fd >= 0)
3095
 
  {
3096
 
    end_io_cache(&rli->cache_buf);
3097
 
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
3098
 
    rli->cur_log_fd = -1;
3099
 
  }
3100
 
  rli->inited = 0;
3101
 
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
3102
 
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3103
 
  /*
3104
 
    Delete the slave's temporary tables from memory.
3105
 
    In the future there will be other actions than this, to ensure persistance
3106
 
    of slave's temp tables after shutdown.
3107
 
  */
3108
 
  rli->close_temporary_tables();
3109
 
  return;
3110
 
}
3111
 
 
3112
 
/*
3113
 
  Try to connect until successful or slave killed
3114
 
 
3115
 
  SYNPOSIS
3116
 
    safe_connect()
3117
 
    thd                 Thread handler for slave
3118
 
    DRIZZLE               DRIZZLE connection handle
3119
 
    mi                  Replication handle
3120
 
 
3121
 
  RETURN
3122
 
    0   ok
3123
 
    #   Error
3124
 
*/
3125
 
 
3126
 
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi)
3127
 
{
3128
 
  return(connect_to_master(thd, drizzle, mi, 0, 0));
3129
 
}
3130
 
 
3131
 
 
3132
 
/*
3133
 
  SYNPOSIS
3134
 
    connect_to_master()
3135
 
 
3136
 
  IMPLEMENTATION
3137
 
    Try to connect until successful or slave killed or we have retried
3138
 
    master_retry_count times
3139
 
*/
3140
 
 
3141
 
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3142
 
                                 bool reconnect, bool suppress_warnings)
3143
 
{
3144
 
  int32_t slave_was_killed;
3145
 
  int32_t last_errno= -2;                           // impossible error
3146
 
  uint32_t err_count=0;
3147
 
  char llbuff[22];
3148
 
 
3149
 
  mi->events_till_disconnect = disconnect_slave_event_count;
3150
 
  uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
3151
 
  if (opt_slave_compressed_protocol)
3152
 
    client_flag=CLIENT_COMPRESS;                /* We will use compression */
3153
 
 
3154
 
  drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3155
 
  drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3156
 
 
3157
 
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3158
 
         (reconnect ? drizzle_reconnect(drizzle) != 0 :
3159
 
          drizzle_connect(drizzle, mi->host, mi->user, mi->password, 0,
3160
 
                             mi->port, 0, client_flag) == 0))
3161
 
  {
3162
 
    /* Don't repeat last error */
3163
 
    if ((int32_t)drizzle_errno(drizzle) != last_errno)
3164
 
    {
3165
 
      last_errno=drizzle_errno(drizzle);
3166
 
      suppress_warnings= 0;
3167
 
      mi->report(ERROR_LEVEL, last_errno,
3168
 
                 _("error %s to master '%s@%s:%d'"
3169
 
                   " - retry-time: %d  retries: %u"),
3170
 
                 (reconnect ? _("reconnecting") : _("connecting")),
3171
 
                 mi->user, mi->host, mi->port,
3172
 
                 mi->connect_retry, master_retry_count);
3173
 
    }
3174
 
    /*
3175
 
      By default we try forever. The reason is that failure will trigger
3176
 
      master election, so if the user did not set master_retry_count we
3177
 
      do not want to have election triggered on the first failure to
3178
 
      connect
3179
 
    */
3180
 
    if (++err_count == master_retry_count)
3181
 
    {
3182
 
      slave_was_killed=1;
3183
 
      if (reconnect)
3184
 
        change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
3185
 
      break;
3186
 
    }
3187
 
    safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3188
 
               (void*)mi);
3189
 
  }
3190
 
 
3191
 
  if (!slave_was_killed)
3192
 
  {
3193
 
    if (reconnect)
3194
 
    {
3195
 
      if (!suppress_warnings && global_system_variables.log_warnings)
3196
 
        sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3197
 
                                "replication resumed in log '%s' at "
3198
 
                                "position %s"), mi->user,
3199
 
                                mi->host, mi->port,
3200
 
                                IO_RPL_LOG_NAME,
3201
 
                                llstr(mi->master_log_pos,llbuff));
3202
 
    }
3203
 
    else
3204
 
    {
3205
 
      change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
3206
 
    }
3207
 
  }
3208
 
  drizzle->reconnect= 1;
3209
 
  return(slave_was_killed);
3210
 
}
3211
 
 
3212
 
 
3213
 
/*
3214
 
  safe_reconnect()
3215
 
 
3216
 
  IMPLEMENTATION
3217
 
    Try to connect until successful or slave killed or we have retried
3218
 
    master_retry_count times
3219
 
*/
3220
 
 
3221
 
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3222
 
                          bool suppress_warnings)
3223
 
{
3224
 
  return(connect_to_master(thd, drizzle, mi, 1, suppress_warnings));
3225
 
}
3226
 
 
3227
 
 
3228
 
/*
3229
 
  Store the file and position where the execute-slave thread are in the
3230
 
  relay log.
3231
 
 
3232
 
  SYNOPSIS
3233
 
    flush_relay_log_info()
3234
 
    rli                 Relay log information
3235
 
 
3236
 
  NOTES
3237
 
    - As this is only called by the slave thread, we don't need to
3238
 
      have a lock on this.
3239
 
    - If there is an active transaction, then we don't update the position
3240
 
      in the relay log.  This is to ensure that we re-execute statements
3241
 
      if we die in the middle of an transaction that was rolled back.
3242
 
    - As a transaction never spans binary logs, we don't have to handle the
3243
 
      case where we do a relay-log-rotation in the middle of the transaction.
3244
 
      If this would not be the case, we would have to ensure that we
3245
 
      don't delete the relay log file where the transaction started when
3246
 
      we switch to a new relay log file.
3247
 
 
3248
 
  TODO
3249
 
    - Change the log file information to a binary format to avoid calling
3250
 
      int64_t2str.
3251
 
 
3252
 
  RETURN VALUES
3253
 
    0   ok
3254
 
    1   write error
3255
 
*/
3256
 
 
3257
 
bool flush_relay_log_info(Relay_log_info* rli)
3258
 
{
3259
 
  bool error=0;
3260
 
 
3261
 
  if (unlikely(rli->no_storage))
3262
 
    return(0);
3263
 
 
3264
 
  IO_CACHE *file = &rli->info_file;
3265
 
  char buff[FN_REFLEN*2+22*2+4], *pos;
3266
 
 
3267
 
  my_b_seek(file, 0L);
3268
 
  pos=my_stpcpy(buff, rli->group_relay_log_name);
3269
 
  *pos++='\n';
3270
 
  pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
3271
 
  *pos++='\n';
3272
 
  pos=my_stpcpy(pos, rli->group_master_log_name);
3273
 
  *pos++='\n';
3274
 
  pos=int64_t2str(rli->group_master_log_pos, pos, 10);
3275
 
  *pos='\n';
3276
 
  if (my_b_write(file, (unsigned char*) buff, (size_t) (pos-buff)+1))
3277
 
    error=1;
3278
 
  if (flush_io_cache(file))
3279
 
    error=1;
3280
 
 
3281
 
  /* Flushing the relay log is done by the slave I/O thread */
3282
 
  return(error);
3283
 
}
3284
 
 
3285
 
 
3286
 
/*
3287
 
  Called when we notice that the current "hot" log got rotated under our feet.
3288
 
*/
3289
 
 
3290
 
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
3291
 
{
3292
 
  assert(rli->cur_log != &rli->cache_buf);
3293
 
  assert(rli->cur_log_fd == -1);
3294
 
 
3295
 
  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3296
 
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
3297
 
                                   errmsg)) <0)
3298
 
    return(0);
3299
 
  /*
3300
 
    We want to start exactly where we was before:
3301
 
    relay_log_pos       Current log pos
3302
 
    pending             Number of bytes already processed from the event
3303
 
  */
3304
 
  rli->event_relay_log_pos= cmax(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3305
 
  my_b_seek(cur_log,rli->event_relay_log_pos);
3306
 
  return(cur_log);
3307
 
}
3308
 
 
3309
 
 
3310
 
static Log_event* next_event(Relay_log_info* rli)
3311
 
{
3312
 
  Log_event* ev;
3313
 
  IO_CACHE* cur_log = rli->cur_log;
3314
 
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3315
 
  const char* errmsg=0;
3316
 
  THD* thd = rli->sql_thd;
3317
 
 
3318
 
  assert(thd != 0);
3319
 
 
3320
 
  if (abort_slave_event_count && !rli->events_till_abort--)
3321
 
    return(0);
3322
 
 
3323
 
  /*
3324
 
    For most operations we need to protect rli members with data_lock,
3325
 
    so we assume calling function acquired this mutex for us and we will
3326
 
    hold it for the most of the loop below However, we will release it
3327
 
    whenever it is worth the hassle,  and in the cases when we go into a
3328
 
    pthread_cond_wait() with the non-data_lock mutex
3329
 
  */
3330
 
  safe_mutex_assert_owner(&rli->data_lock);
3331
 
 
3332
 
  while (!sql_slave_killed(thd,rli))
3333
 
  {
3334
 
    /*
3335
 
      We can have two kinds of log reading:
3336
 
      hot_log:
3337
 
        rli->cur_log points at the IO_CACHE of relay_log, which
3338
 
        is actively being updated by the I/O thread. We need to be careful
3339
 
        in this case and make sure that we are not looking at a stale log that
3340
 
        has already been rotated. If it has been, we reopen the log.
3341
 
 
3342
 
      The other case is much simpler:
3343
 
        We just have a read only log that nobody else will be updating.
3344
 
    */
3345
 
    bool hot_log;
3346
 
    if ((hot_log = (cur_log != &rli->cache_buf)))
3347
 
    {
3348
 
      assert(rli->cur_log_fd == -1); // foreign descriptor
3349
 
      pthread_mutex_lock(log_lock);
3350
 
 
3351
 
      /*
3352
 
        Reading xxx_file_id is safe because the log will only
3353
 
        be rotated when we hold relay_log.LOCK_log
3354
 
      */
3355
 
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
3356
 
      {
3357
 
        // The master has switched to a new log file; Reopen the old log file
3358
 
        cur_log=reopen_relay_log(rli, &errmsg);
3359
 
        pthread_mutex_unlock(log_lock);
3360
 
        if (!cur_log)                           // No more log files
3361
 
          goto err;
3362
 
        hot_log=0;                              // Using old binary log
3363
 
      }
3364
 
    }
3365
 
    /* 
3366
 
      As there is no guarantee that the relay is open (for example, an I/O
3367
 
      error during a write by the slave I/O thread may have closed it), we
3368
 
      have to test it.
3369
 
    */
3370
 
    if (!my_b_inited(cur_log))
3371
 
      goto err;
3372
 
    assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3373
 
    assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
3374
 
 
3375
 
    /*
3376
 
      Relay log is always in new format - if the master is 3.23, the
3377
 
      I/O thread will convert the format for us.
3378
 
      A problem: the description event may be in a previous relay log. So if
3379
 
      the slave has been shutdown meanwhile, we would have to look in old relay
3380
 
      logs, which may even have been deleted. So we need to write this
3381
 
      description event at the beginning of the relay log.
3382
 
      When the relay log is created when the I/O thread starts, easy: the
3383
 
      master will send the description event and we will queue it.
3384
 
      But if the relay log is created by new_file(): then the solution is:
3385
 
      DRIZZLE_BIN_LOG::open() will write the buffered description event.
3386
 
    */
3387
 
    if ((ev=Log_event::read_log_event(cur_log,0,
3388
 
                                      rli->relay_log.description_event_for_exec)))
3389
 
 
3390
 
    {
3391
 
      assert(thd==rli->sql_thd);
3392
 
      /*
3393
 
        read it while we have a lock, to avoid a mutex lock in
3394
 
        inc_event_relay_log_pos()
3395
 
      */
3396
 
      rli->future_event_relay_log_pos= my_b_tell(cur_log);
3397
 
      if (hot_log)
3398
 
        pthread_mutex_unlock(log_lock);
3399
 
      return(ev);
3400
 
    }
3401
 
    assert(thd==rli->sql_thd);
3402
 
    if (opt_reckless_slave)                     // For mysql-test
3403
 
      cur_log->error = 0;
3404
 
    if (cur_log->error < 0)
3405
 
    {
3406
 
      errmsg = "slave SQL thread aborted because of I/O error";
3407
 
      if (hot_log)
3408
 
        pthread_mutex_unlock(log_lock);
3409
 
      goto err;
3410
 
    }
3411
 
    if (!cur_log->error) /* EOF */
3412
 
    {
3413
 
      /*
3414
 
        On a hot log, EOF means that there are no more updates to
3415
 
        process and we must block until I/O thread adds some and
3416
 
        signals us to continue
3417
 
      */
3418
 
      if (hot_log)
3419
 
      {
3420
 
        /*
3421
 
          We say in Seconds_Behind_Master that we have "caught up". Note that
3422
 
          for example if network link is broken but I/O slave thread hasn't
3423
 
          noticed it (slave_net_timeout not elapsed), then we'll say "caught
3424
 
          up" whereas we're not really caught up. Fixing that would require
3425
 
          internally cutting timeout in smaller pieces in network read, no
3426
 
          thanks. Another example: SQL has caught up on I/O, now I/O has read
3427
 
          a new event and is queuing it; the false "0" will exist until SQL
3428
 
          finishes executing the new event; it will be look abnormal only if
3429
 
          the events have old timestamps (then you get "many", 0, "many").
3430
 
 
3431
 
          Transient phases like this can be fixed with implemeting
3432
 
          Heartbeat event which provides the slave the status of the
3433
 
          master at time the master does not have any new update to send.
3434
 
          Seconds_Behind_Master would be zero only when master has no
3435
 
          more updates in binlog for slave. The heartbeat can be sent
3436
 
          in a (small) fraction of slave_net_timeout. Until it's done
3437
 
          rli->last_master_timestamp is temporarely (for time of
3438
 
          waiting for the following event) reset whenever EOF is
3439
 
          reached.
3440
 
        */
3441
 
        time_t save_timestamp= rli->last_master_timestamp;
3442
 
        rli->last_master_timestamp= 0;
3443
 
 
3444
 
        assert(rli->relay_log.get_open_count() ==
3445
 
                    rli->cur_log_old_open_count);
3446
 
 
3447
 
        if (rli->ign_master_log_name_end[0])
3448
 
        {
3449
 
          /* We generate and return a Rotate, to make our positions advance */
3450
 
          ev= new Rotate_log_event(rli->ign_master_log_name_end,
3451
 
                                   0, rli->ign_master_log_pos_end,
3452
 
                                   Rotate_log_event::DUP_NAME);
3453
 
          rli->ign_master_log_name_end[0]= 0;
3454
 
          pthread_mutex_unlock(log_lock);
3455
 
          if (unlikely(!ev))
3456
 
          {
3457
 
            errmsg= "Slave SQL thread failed to create a Rotate event "
3458
 
              "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
3459
 
            goto err;
3460
 
          }
3461
 
          ev->server_id= 0; // don't be ignored by slave SQL thread
3462
 
          return(ev);
3463
 
        }
3464
 
 
3465
 
        /*
3466
 
          We can, and should release data_lock while we are waiting for
3467
 
          update. If we do not, show slave status will block
3468
 
        */
3469
 
        pthread_mutex_unlock(&rli->data_lock);
3470
 
 
3471
 
        /*
3472
 
          Possible deadlock :
3473
 
          - the I/O thread has reached log_space_limit
3474
 
          - the SQL thread has read all relay logs, but cannot purge for some
3475
 
          reason:
3476
 
            * it has already purged all logs except the current one
3477
 
            * there are other logs than the current one but they're involved in
3478
 
            a transaction that finishes in the current one (or is not finished)
3479
 
          Solution :
3480
 
          Wake up the possibly waiting I/O thread, and set a boolean asking
3481
 
          the I/O thread to temporarily ignore the log_space_limit
3482
 
          constraint, because we do not want the I/O thread to block because of
3483
 
          space (it's ok if it blocks for any other reason (e.g. because the
3484
 
          master does not send anything). Then the I/O thread stops waiting
3485
 
          and reads more events.
3486
 
          The SQL thread decides when the I/O thread should take log_space_limit
3487
 
          into account again : ignore_log_space_limit is reset to 0
3488
 
          in purge_first_log (when the SQL thread purges the just-read relay
3489
 
          log), and also when the SQL thread starts. We should also reset
3490
 
          ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
3491
 
          fact, no need as RESET SLAVE requires that the slave
3492
 
          be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
3493
 
          it stops.
3494
 
        */
3495
 
        pthread_mutex_lock(&rli->log_space_lock);
3496
 
        // prevent the I/O thread from blocking next times
3497
 
        rli->ignore_log_space_limit= 1;
3498
 
        /*
3499
 
          If the I/O thread is blocked, unblock it.  Ok to broadcast
3500
 
          after unlock, because the mutex is only destroyed in
3501
 
          ~Relay_log_info(), i.e. when rli is destroyed, and rli will
3502
 
          not be destroyed before we exit the present function.
3503
 
        */
3504
 
        pthread_mutex_unlock(&rli->log_space_lock);
3505
 
        pthread_cond_broadcast(&rli->log_space_cond);
3506
 
        // Note that wait_for_update_relay_log unlocks lock_log !
3507
 
        rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
3508
 
        // re-acquire data lock since we released it earlier
3509
 
        pthread_mutex_lock(&rli->data_lock);
3510
 
        rli->last_master_timestamp= save_timestamp;
3511
 
        continue;
3512
 
      }
3513
 
      /*
3514
 
        If the log was not hot, we need to move to the next log in
3515
 
        sequence. The next log could be hot or cold, we deal with both
3516
 
        cases separately after doing some common initialization
3517
 
      */
3518
 
      end_io_cache(cur_log);
3519
 
      assert(rli->cur_log_fd >= 0);
3520
 
      my_close(rli->cur_log_fd, MYF(MY_WME));
3521
 
      rli->cur_log_fd = -1;
3522
 
 
3523
 
      if (relay_log_purge)
3524
 
      {
3525
 
        /*
3526
 
          purge_first_log will properly set up relay log coordinates in rli.
3527
 
          If the group's coordinates are equal to the event's coordinates
3528
 
          (i.e. the relay log was not rotated in the middle of a group),
3529
 
          we can purge this relay log too.
3530
 
          We do uint64_t and string comparisons, this may be slow but
3531
 
          - purging the last relay log is nice (it can save 1GB of disk), so we
3532
 
          like to detect the case where we can do it, and given this,
3533
 
          - I see no better detection method
3534
 
          - purge_first_log is not called that often
3535
 
        */
3536
 
        if (rli->relay_log.purge_first_log
3537
 
            (rli,
3538
 
             rli->group_relay_log_pos == rli->event_relay_log_pos
3539
 
             && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
3540
 
        {
3541
 
          errmsg = "Error purging processed logs";
3542
 
          goto err;
3543
 
        }
3544
 
      }
3545
 
      else
3546
 
      {
3547
 
        /*
3548
 
          If hot_log is set, then we already have a lock on
3549
 
          LOCK_log.  If not, we have to get the lock.
3550
 
 
3551
 
          According to Sasha, the only time this code will ever be executed
3552
 
          is if we are recovering from a bug.
3553
 
        */
3554
 
        if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
3555
 
        {
3556
 
          errmsg = "error switching to the next log";
3557
 
          goto err;
3558
 
        }
3559
 
        rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3560
 
        strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
3561
 
                sizeof(rli->event_relay_log_name)-1);
3562
 
        flush_relay_log_info(rli);
3563
 
      }
3564
 
 
3565
 
      /*
3566
 
        Now we want to open this next log. To know if it's a hot log (the one
3567
 
        being written by the I/O thread now) or a cold log, we can use
3568
 
        is_active(); if it is hot, we use the I/O cache; if it's cold we open
3569
 
        the file normally. But if is_active() reports that the log is hot, this
3570
 
        may change between the test and the consequence of the test. So we may
3571
 
        open the I/O cache whereas the log is now cold, which is nonsense.
3572
 
        To guard against this, we need to have LOCK_log.
3573
 
      */
3574
 
 
3575
 
      if (!hot_log) /* if hot_log, we already have this mutex */
3576
 
        pthread_mutex_lock(log_lock);
3577
 
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
3578
 
      {
3579
 
#ifdef EXTRA_DEBUG
3580
 
        if (global_system_variables.log_warnings)
3581
 
          sql_print_information(_("next log '%s' is currently active"),
3582
 
                                rli->linfo.log_file_name);
3583
 
#endif
3584
 
        rli->cur_log= cur_log= rli->relay_log.get_log_file();
3585
 
        rli->cur_log_old_open_count= rli->relay_log.get_open_count();
3586
 
        assert(rli->cur_log_fd == -1);
3587
 
 
3588
 
        /*
3589
 
          Read pointer has to be at the start since we are the only
3590
 
          reader.
3591
 
          We must keep the LOCK_log to read the 4 first bytes, as this is a hot
3592
 
          log (same as when we call read_log_event() above: for a hot log we
3593
 
          take the mutex).
3594
 
        */
3595
 
        if (check_binlog_magic(cur_log,&errmsg))
3596
 
        {
3597
 
          if (!hot_log) pthread_mutex_unlock(log_lock);
3598
 
          goto err;
3599
 
        }
3600
 
        if (!hot_log) pthread_mutex_unlock(log_lock);
3601
 
        continue;
3602
 
      }
3603
 
      if (!hot_log) pthread_mutex_unlock(log_lock);
3604
 
      /*
3605
 
        if we get here, the log was not hot, so we will have to open it
3606
 
        ourselves. We are sure that the log is still not hot now (a log can get
3607
 
        from hot to cold, but not from cold to hot). No need for LOCK_log.
3608
 
      */
3609
 
#ifdef EXTRA_DEBUG
3610
 
      if (global_system_variables.log_warnings)
3611
 
        sql_print_information(_("next log '%s' is not active"),
3612
 
                              rli->linfo.log_file_name);
3613
 
#endif
3614
 
      // open_binlog() will check the magic header
3615
 
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
3616
 
                                       &errmsg)) <0)
3617
 
        goto err;
3618
 
    }
3619
 
    else
3620
 
    {
3621
 
      /*
3622
 
        Read failed with a non-EOF error.
3623
 
        TODO: come up with something better to handle this error
3624
 
      */
3625
 
      if (hot_log)
3626
 
        pthread_mutex_unlock(log_lock);
3627
 
      sql_print_error(_("Slave SQL thread: I/O error reading "
3628
 
                        "event(errno: %d  cur_log->error: %d)"),
3629
 
                      my_errno,cur_log->error);
3630
 
      // set read position to the beginning of the event
3631
 
      my_b_seek(cur_log,rli->event_relay_log_pos);
3632
 
      /* otherwise, we have had a partial read */
3633
 
      errmsg = _("Aborting slave SQL thread because of partial event read");
3634
 
      break;                                    // To end of function
3635
 
    }
3636
 
  }
3637
 
  if (!errmsg && global_system_variables.log_warnings)
3638
 
  {
3639
 
    sql_print_information(_("Error reading relay log event: %s"),
3640
 
                          _("slave SQL thread was killed"));
3641
 
    return(0);
3642
 
  }
3643
 
 
3644
 
err:
3645
 
  if (errmsg)
3646
 
    sql_print_error(_("Error reading relay log event: %s"), errmsg);
3647
 
  return(0);
3648
 
}
3649
 
 
3650
 
/*
3651
 
  Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
3652
 
  because of size is simpler because when we do it we already have all relevant
3653
 
  locks; here we don't, so this function is mainly taking locks).
3654
 
  Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
3655
 
  is void).
3656
 
*/
3657
 
 
3658
 
void rotate_relay_log(Master_info* mi)
3659
 
{
3660
 
  Relay_log_info* rli= &mi->rli;
3661
 
 
3662
 
  /* We don't lock rli->run_lock. This would lead to deadlocks. */
3663
 
  pthread_mutex_lock(&mi->run_lock);
3664
 
 
3665
 
  /*
3666
 
     We need to test inited because otherwise, new_file() will attempt to lock
3667
 
     LOCK_log, which may not be inited (if we're not a slave).
3668
 
  */
3669
 
  if (!rli->inited)
3670
 
  {
3671
 
    goto end;
3672
 
  }
3673
 
 
3674
 
  /* If the relay log is closed, new_file() will do nothing. */
3675
 
  rli->relay_log.new_file();
3676
 
 
3677
 
  /*
3678
 
    We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
3679
 
    be counted, so imagine a succession of FLUSH LOGS  and assume the slave
3680
 
    threads are started:
3681
 
    relay_log_space decreases by the size of the deleted relay log, but does
3682
 
    not increase, so flush-after-flush we may become negative, which is wrong.
3683
 
    Even if this will be corrected as soon as a query is replicated on the
3684
 
    slave (because the I/O thread will then call harvest_bytes_written() which
3685
 
    will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
3686
 
    output in SHOW SLAVE STATUS meanwhile. So we harvest now.
3687
 
    If the log is closed, then this will just harvest the last writes, probably
3688
 
    0 as they probably have been harvested.
3689
 
  */
3690
 
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3691
 
end:
3692
 
  pthread_mutex_unlock(&mi->run_lock);
3693
 
  return;
3694
 
}
3695
 
 
3696
 
 
3697
 
/**
3698
 
   Detects, based on master's version (as found in the relay log), if master
3699
 
   has a certain bug.
3700
 
   @param rli Relay_log_info which tells the master's version
3701
 
   @param bug_id Number of the bug as found in bugs.mysql.com
3702
 
   @param report bool report error message, default TRUE
3703
 
   @return true if master has the bug, FALSE if it does not.
3704
 
*/
3705
 
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
3706
 
{
3707
 
  struct st_version_range_for_one_bug {
3708
 
    uint32_t        bug_id;
3709
 
    const unsigned char introduced_in[3]; // first version with bug
3710
 
    const unsigned char fixed_in[3];      // first version with fix
3711
 
  };
3712
 
  static struct st_version_range_for_one_bug versions_for_all_bugs[]=
3713
 
  {
3714
 
    {24432, { 5, 0, 24 }, { 5, 0, 38 } },
3715
 
    {24432, { 5, 1, 12 }, { 5, 1, 17 } },
3716
 
    {33029, { 5, 0,  0 }, { 5, 0, 58 } },
3717
 
    {33029, { 5, 1,  0 }, { 5, 1, 12 } },
3718
 
  };
3719
 
  const unsigned char *master_ver=
3720
 
    rli->relay_log.description_event_for_exec->server_version_split;
3721
 
 
3722
 
  assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3723
 
 
3724
 
  for (uint32_t i= 0;
3725
 
       i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3726
 
  {
3727
 
    const unsigned char *introduced_in= versions_for_all_bugs[i].introduced_in,
3728
 
      *fixed_in= versions_for_all_bugs[i].fixed_in;
3729
 
    if ((versions_for_all_bugs[i].bug_id == bug_id) &&
3730
 
        (memcmp(introduced_in, master_ver, 3) <= 0) &&
3731
 
        (memcmp(fixed_in,      master_ver, 3) >  0))
3732
 
    {
3733
 
      if (!report)
3734
 
        return true;
3735
 
 
3736
 
      // a short message for SHOW SLAVE STATUS (message length constraints)
3737
 
      my_printf_error(ER_UNKNOWN_ERROR,
3738
 
                      _("master may suffer from"
3739
 
                        " http://bugs.mysql.com/bug.php?id=%u"
3740
 
                        " so slave stops; check error log on slave"
3741
 
                        " for more info"), MYF(0), bug_id);
3742
 
      // a verbose message for the error log
3743
 
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
3744
 
                  _("According to the master's version ('%s'),"
3745
 
                    " it is probable that master suffers from this bug:"
3746
 
                    " http://bugs.mysql.com/bug.php?id=%u"
3747
 
                    " and thus replicating the current binary log event"
3748
 
                    " may make the slave's data become different from the"
3749
 
                    " master's data."
3750
 
                    " To take no risk, slave refuses to replicate"
3751
 
                    " this event and stops."
3752
 
                    " We recommend that all updates be stopped on the"
3753
 
                    " master and slave, that the data of both be"
3754
 
                    " manually synchronized,"
3755
 
                    " that master's binary logs be deleted,"
3756
 
                    " that master be upgraded to a version at least"
3757
 
                    " equal to '%d.%d.%d'. Then replication can be"
3758
 
                    " restarted."),
3759
 
                  rli->relay_log.description_event_for_exec->server_version,
3760
 
                  bug_id,
3761
 
                  fixed_in[0], fixed_in[1], fixed_in[2]);
3762
 
      return true;
3763
 
    }
3764
 
  }
3765
 
  return false;
3766
 
}
3767
 
 
3768
 
/**
3769
 
   BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
3770
 
   exclusive, if one statement in a SP generated AUTO_INCREMENT value
3771
 
   by the top statement, all statements after it would be considered
3772
 
   generated AUTO_INCREMENT value by the top statement, and a
3773
 
   erroneous INSERT_ID value might be associated with these statement,
3774
 
   which could cause duplicate entry error and stop the slave.
3775
 
 
3776
 
   Detect buggy master to work around.
3777
 
 */
3778
 
bool rpl_master_erroneous_autoinc(THD *thd)
3779
 
{
3780
 
  if (active_mi && active_mi->rli.sql_thd == thd)
3781
 
  {
3782
 
    Relay_log_info *rli= &active_mi->rli;
3783
 
    return rpl_master_has_bug(rli, 33029, false);
3784
 
  }
3785
 
  return false;
3786
 
}
3787
 
 
3788
 
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
3789
 
template class I_List_iterator<i_string>;
3790
 
template class I_List_iterator<i_string_pair>;
3791
 
#endif
3792
 
 
3793
 
/**
3794
 
  @} (end of group Replication)
3795
 
*/