~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/slave.cc

pandora-build v0.71. Added check for avahi.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2000-2003 DRIZZLE AB
2
 
 
3
 
   This program is free software; you can redistribute it and/or modify
4
 
   it under the terms of the GNU General Public License as published by
5
 
   the Free Software Foundation; version 2 of the License.
6
 
 
7
 
   This program is distributed in the hope that it will be useful,
8
 
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 
   GNU General Public License for more details.
11
 
 
12
 
   You should have received a copy of the GNU General Public License
13
 
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
 
 
16
 
 
17
 
/**
18
 
  @addtogroup Replication
19
 
  @{
20
 
 
21
 
  @file
22
 
 
23
 
  @brief Code to run the io thread and the sql thread on the
24
 
  replication slave.
25
 
*/
26
 
#include <drizzled/server_includes.h>
27
 
 
28
 
#include <storage/myisam/myisam.h>
29
 
#include "rpl_mi.h"
30
 
#include "rpl_rli.h"
31
 
#include "sql_repl.h"
32
 
#include "rpl_filter.h"
33
 
#include "repl_failsafe.h"
34
 
#include <mysys/thr_alarm.h>
35
 
#include <libdrizzle/sql_common.h>
36
 
#include <libdrizzle/errmsg.h>
37
 
#include <mysys/mysys_err.h>
38
 
#include <drizzled/drizzled_error_messages.h>
39
 
 
40
 
#ifdef HAVE_REPLICATION
41
 
 
42
 
#include "rpl_tblmap.h"
43
 
 
44
 
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
45
 
 
46
 
#define MAX_SLAVE_RETRY_PAUSE 5
47
 
bool use_slave_mask = 0;
48
 
MY_BITMAP slave_error_mask;
49
 
 
50
 
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
51
 
 
52
 
char* slave_load_tmpdir = 0;
53
 
Master_info *active_mi= 0;
54
 
bool replicate_same_server_id;
55
 
uint64_t relay_log_space_limit = 0;
56
 
 
57
 
/*
58
 
  When slave thread exits, we need to remember the temporary tables so we
59
 
  can re-use them on slave start.
60
 
 
61
 
  TODO: move the vars below under Master_info
62
 
*/
63
 
 
64
 
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
65
 
int32_t events_till_abort = -1;
66
 
 
67
 
enum enum_slave_reconnect_actions
68
 
{
69
 
  SLAVE_RECON_ACT_REG= 0,
70
 
  SLAVE_RECON_ACT_DUMP= 1,
71
 
  SLAVE_RECON_ACT_EVENT= 2,
72
 
  SLAVE_RECON_ACT_MAX
73
 
};
74
 
 
75
 
enum enum_slave_reconnect_messages
76
 
{
77
 
  SLAVE_RECON_MSG_WAIT= 0,
78
 
  SLAVE_RECON_MSG_KILLED_WAITING= 1,
79
 
  SLAVE_RECON_MSG_AFTER= 2,
80
 
  SLAVE_RECON_MSG_FAILED= 3,
81
 
  SLAVE_RECON_MSG_COMMAND= 4,
82
 
  SLAVE_RECON_MSG_KILLED_AFTER= 5,
83
 
  SLAVE_RECON_MSG_MAX
84
 
};
85
 
 
86
 
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
87
 
{
88
 
  {
89
 
    N_("Waiting to reconnect after a failed registration on master"),
90
 
    N_("Slave I/O thread killed while waitnig to reconnect after a "
91
 
                 "failed registration on master"),
92
 
    N_("Reconnecting after a failed registration on master"),
93
 
    N_("failed registering on master, reconnecting to try again, "
94
 
                 "log '%s' at postion %s"),
95
 
    "COM_REGISTER_SLAVE",
96
 
    N_("Slave I/O thread killed during or after reconnect")
97
 
  },
98
 
  {
99
 
    N_("Waiting to reconnect after a failed binlog dump request"),
100
 
    N_("Slave I/O thread killed while retrying master dump"),
101
 
    N_("Reconnecting after a failed binlog dump request"),
102
 
    N_("failed dump request, reconnecting to try again, "
103
 
                 "log '%s' at postion %s"),
104
 
    "COM_BINLOG_DUMP",
105
 
    N_("Slave I/O thread killed during or after reconnect")
106
 
  },
107
 
  {
108
 
    N_("Waiting to reconnect after a failed master event read"),
109
 
    N_("Slave I/O thread killed while waiting to reconnect "
110
 
                 "after a failed read"),
111
 
    N_("Reconnecting after a failed master event read"),
112
 
    N_("Slave I/O thread: Failed reading log event, "
113
 
                 "reconnecting to retry, log '%s' at postion %s"),
114
 
    "",
115
 
    N_("Slave I/O thread killed during or after a "
116
 
                 "reconnect done to recover from failed read")
117
 
  }
118
 
};
119
 
 
120
 
 
121
 
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
122
 
 
123
 
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
124
 
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
125
 
static bool wait_for_relay_log_space(Relay_log_info* rli);
126
 
static inline bool io_slave_killed(THD* thd,Master_info* mi);
127
 
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
128
 
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
129
 
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi);
130
 
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
131
 
                          bool suppress_warnings);
132
 
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
133
 
                             bool reconnect, bool suppress_warnings);
134
 
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
135
 
                      void* thread_killed_arg);
136
 
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
137
 
static Log_event* next_event(Relay_log_info* rli);
138
 
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
139
 
static int32_t terminate_slave_thread(THD *thd,
140
 
                                  pthread_mutex_t* term_lock,
141
 
                                  pthread_cond_t* term_cond,
142
 
                                  volatile uint32_t *slave_running,
143
 
                                  bool skip_lock);
144
 
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
145
 
 
146
 
/*
147
 
  Find out which replications threads are running
148
 
 
149
 
  SYNOPSIS
150
 
    init_thread_mask()
151
 
    mask                Return value here
152
 
    mi                  master_info for slave
153
 
    inverse             If set, returns which threads are not running
154
 
 
155
 
  IMPLEMENTATION
156
 
    Get a bit mask for which threads are running so that we can later restart
157
 
    these threads.
158
 
 
159
 
  RETURN
160
 
    mask        If inverse == 0, running threads
161
 
                If inverse == 1, stopped threads
162
 
*/
163
 
 
164
 
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
165
 
{
166
 
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
167
 
  register int32_t tmp_mask=0;
168
 
 
169
 
  if (set_io)
170
 
    tmp_mask |= SLAVE_IO;
171
 
  if (set_sql)
172
 
    tmp_mask |= SLAVE_SQL;
173
 
  if (inverse)
174
 
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
175
 
  *mask = tmp_mask;
176
 
  return;
177
 
}
178
 
 
179
 
 
180
 
/*
181
 
  lock_slave_threads()
182
 
*/
183
 
 
184
 
void lock_slave_threads(Master_info* mi)
185
 
{
186
 
  //TODO: see if we can do this without dual mutex
187
 
  pthread_mutex_lock(&mi->run_lock);
188
 
  pthread_mutex_lock(&mi->rli.run_lock);
189
 
  return;
190
 
}
191
 
 
192
 
 
193
 
/*
194
 
  unlock_slave_threads()
195
 
*/
196
 
 
197
 
void unlock_slave_threads(Master_info* mi)
198
 
{
199
 
  //TODO: see if we can do this without dual mutex
200
 
  pthread_mutex_unlock(&mi->rli.run_lock);
201
 
  pthread_mutex_unlock(&mi->run_lock);
202
 
  return;
203
 
}
204
 
 
205
 
 
206
 
/* Initialize slave structures */
207
 
 
208
 
int32_t init_slave()
209
 
{
210
 
  /*
211
 
    This is called when mysqld starts. Before client connections are
212
 
    accepted. However bootstrap may conflict with us if it does START SLAVE.
213
 
    So it's safer to take the lock.
214
 
  */
215
 
  pthread_mutex_lock(&LOCK_active_mi);
216
 
  /*
217
 
    TODO: re-write this to interate through the list of files
218
 
    for multi-master
219
 
  */
220
 
  active_mi= new Master_info;
221
 
 
222
 
  /*
223
 
    If master_host is not specified, try to read it from the master_info file.
224
 
    If master_host is specified, create the master_info file if it doesn't
225
 
    exists.
226
 
  */
227
 
  if (!active_mi)
228
 
  {
229
 
    sql_print_error(_("Failed to allocate memory for the master info structure"));
230
 
    goto err;
231
 
  }
232
 
 
233
 
  if (init_master_info(active_mi,master_info_file,relay_log_info_file,
234
 
                       1, (SLAVE_IO | SLAVE_SQL)))
235
 
  {
236
 
    sql_print_error(_("Failed to initialize the master info structure"));
237
 
    goto err;
238
 
  }
239
 
 
240
 
  /* If server id is not set, start_slave_thread() will say it */
241
 
 
242
 
  if (active_mi->host[0] && !opt_skip_slave_start)
243
 
  {
244
 
    if (start_slave_threads(1 /* need mutex */,
245
 
                            0 /* no wait for start*/,
246
 
                            active_mi,
247
 
                            master_info_file,
248
 
                            relay_log_info_file,
249
 
                            SLAVE_IO | SLAVE_SQL))
250
 
    {
251
 
      sql_print_error(_("Failed to create slave threads"));
252
 
      goto err;
253
 
    }
254
 
  }
255
 
  pthread_mutex_unlock(&LOCK_active_mi);
256
 
  return(0);
257
 
 
258
 
err:
259
 
  pthread_mutex_unlock(&LOCK_active_mi);
260
 
  return(1);
261
 
}
262
 
 
263
 
 
264
 
/*
265
 
  Init function to set up array for errors that should be skipped for slave
266
 
 
267
 
  SYNOPSIS
268
 
    init_slave_skip_errors()
269
 
    arg         List of errors numbers to skip, separated with ','
270
 
 
271
 
  NOTES
272
 
    Called from get_options() in mysqld.cc on start-up
273
 
*/
274
 
 
275
 
void init_slave_skip_errors(const char* arg)
276
 
{
277
 
  const char *p;
278
 
 
279
 
  if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
280
 
  {
281
 
    fprintf(stderr, "Badly out of memory, please check your system status\n");
282
 
    exit(1);
283
 
  }
284
 
  use_slave_mask = 1;
285
 
  for (;my_isspace(system_charset_info,*arg);++arg)
286
 
    /* empty */;
287
 
  if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
288
 
  {
289
 
    bitmap_set_all(&slave_error_mask);
290
 
    return;
291
 
  }
292
 
  for (p= arg ; *p; )
293
 
  {
294
 
    long err_code;
295
 
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
296
 
      break;
297
 
    if (err_code < MAX_SLAVE_ERROR)
298
 
       bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
299
 
    while (!my_isdigit(system_charset_info,*p) && *p)
300
 
      p++;
301
 
  }
302
 
  return;
303
 
}
304
 
 
305
 
 
306
 
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
307
 
{
308
 
  if (!mi->inited)
309
 
    return(0); /* successfully do nothing */
310
 
  int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
311
 
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
312
 
 
313
 
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
314
 
  {
315
 
    mi->abort_slave=1;
316
 
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
317
 
                                      &mi->stop_cond,
318
 
                                      &mi->slave_running,
319
 
                                      skip_lock)) &&
320
 
        !force_all)
321
 
      return(error);
322
 
  }
323
 
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
324
 
  {
325
 
    mi->rli.abort_slave=1;
326
 
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
327
 
                                      &mi->rli.stop_cond,
328
 
                                      &mi->rli.slave_running,
329
 
                                      skip_lock)) &&
330
 
        !force_all)
331
 
      return(error);
332
 
  }
333
 
  return(0);
334
 
}
335
 
 
336
 
 
337
 
/**
338
 
   Wait for a slave thread to terminate.
339
 
 
340
 
   This function is called after requesting the thread to terminate
341
 
   (by setting @c abort_slave member of @c Relay_log_info or @c
342
 
   Master_info structure to 1). Termination of the thread is
343
 
   controlled with the the predicate <code>*slave_running</code>.
344
 
 
345
 
   Function will acquire @c term_lock before waiting on the condition
346
 
   unless @c skip_lock is true in which case the mutex should be owned
347
 
   by the caller of this function and will remain acquired after
348
 
   return from the function.
349
 
 
350
 
   @param term_lock
351
 
          Associated lock to use when waiting for @c term_cond
352
 
 
353
 
   @param term_cond
354
 
          Condition that is signalled when the thread has terminated
355
 
 
356
 
   @param slave_running
357
 
          Pointer to predicate to check for slave thread termination
358
 
 
359
 
   @param skip_lock
360
 
          If @c true the lock will not be acquired before waiting on
361
 
          the condition. In this case, it is assumed that the calling
362
 
          function acquires the lock before calling this function.
363
 
 
364
 
   @retval 0 All OK
365
 
 */
366
 
static int32_t
367
 
terminate_slave_thread(THD *thd,
368
 
                       pthread_mutex_t* term_lock,
369
 
                       pthread_cond_t* term_cond,
370
 
                       volatile uint32_t *slave_running,
371
 
                       bool skip_lock)
372
 
{
373
 
  int32_t error;
374
 
 
375
 
  if (!skip_lock)
376
 
    pthread_mutex_lock(term_lock);
377
 
 
378
 
  safe_mutex_assert_owner(term_lock);
379
 
 
380
 
  if (!*slave_running)
381
 
  {
382
 
    if (!skip_lock)
383
 
      pthread_mutex_unlock(term_lock);
384
 
    return(ER_SLAVE_NOT_RUNNING);
385
 
  }
386
 
  assert(thd != 0);
387
 
  THD_CHECK_SENTRY(thd);
388
 
 
389
 
  /*
390
 
    Is is critical to test if the slave is running. Otherwise, we might
391
 
    be referening freed memory trying to kick it
392
 
  */
393
 
 
394
 
  while (*slave_running)                        // Should always be true
395
 
  {
396
 
    pthread_mutex_lock(&thd->LOCK_delete);
397
 
#ifndef DONT_USE_THR_ALARM
398
 
    /*
399
 
      Error codes from pthread_kill are:
400
 
      EINVAL: invalid signal number (can't happen)
401
 
      ESRCH: thread already killed (can happen, should be ignored)
402
 
    */
403
 
    int32_t err= pthread_kill(thd->real_id, thr_client_alarm);
404
 
    assert(err != EINVAL);
405
 
#endif
406
 
    thd->awake(THD::NOT_KILLED);
407
 
    pthread_mutex_unlock(&thd->LOCK_delete);
408
 
 
409
 
    /*
410
 
      There is a small chance that slave thread might miss the first
411
 
      alarm. To protect againts it, resend the signal until it reacts
412
 
    */
413
 
    struct timespec abstime;
414
 
    set_timespec(abstime,2);
415
 
    error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
416
 
    assert(error == ETIMEDOUT || error == 0);
417
 
  }
418
 
 
419
 
  assert(*slave_running == 0);
420
 
 
421
 
  if (!skip_lock)
422
 
    pthread_mutex_unlock(term_lock);
423
 
  return(0);
424
 
}
425
 
 
426
 
 
427
 
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
428
 
                           pthread_mutex_t *cond_lock,
429
 
                           pthread_cond_t *start_cond,
430
 
                           volatile uint32_t *slave_running,
431
 
                           volatile uint32_t *slave_run_id,
432
 
                           Master_info* mi,
433
 
                           bool high_priority)
434
 
{
435
 
  pthread_t th;
436
 
  uint32_t start_id;
437
 
 
438
 
  assert(mi->inited);
439
 
 
440
 
  if (start_lock)
441
 
    pthread_mutex_lock(start_lock);
442
 
  if (!server_id)
443
 
  {
444
 
    if (start_cond)
445
 
      pthread_cond_broadcast(start_cond);
446
 
    if (start_lock)
447
 
      pthread_mutex_unlock(start_lock);
448
 
    sql_print_error(_("Server id not set, will not start slave"));
449
 
    return(ER_BAD_SLAVE);
450
 
  }
451
 
 
452
 
  if (*slave_running)
453
 
  {
454
 
    if (start_cond)
455
 
      pthread_cond_broadcast(start_cond);
456
 
    if (start_lock)
457
 
      pthread_mutex_unlock(start_lock);
458
 
    return(ER_SLAVE_MUST_STOP);
459
 
  }
460
 
  start_id= *slave_run_id;
461
 
  if (high_priority)
462
 
  {
463
 
    struct sched_param tmp_sched_param;
464
 
 
465
 
    memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
466
 
    tmp_sched_param.sched_priority= CONNECT_PRIOR;
467
 
    (void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
468
 
  }
469
 
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
470
 
  {
471
 
    if (start_lock)
472
 
      pthread_mutex_unlock(start_lock);
473
 
    return(ER_SLAVE_THREAD);
474
 
  }
475
 
  if (start_cond && cond_lock) // caller has cond_lock
476
 
  {
477
 
    THD* thd = current_thd;
478
 
    while (start_id == *slave_run_id)
479
 
    {
480
 
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
481
 
                                            "Waiting for slave thread to start");
482
 
      pthread_cond_wait(start_cond,cond_lock);
483
 
      thd->exit_cond(old_msg);
484
 
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
485
 
      if (thd->killed)
486
 
        return(thd->killed_errno());
487
 
    }
488
 
  }
489
 
  if (start_lock)
490
 
    pthread_mutex_unlock(start_lock);
491
 
  return(0);
492
 
}
493
 
 
494
 
 
495
 
/*
496
 
  start_slave_threads()
497
 
 
498
 
  NOTES
499
 
    SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
500
 
    sense to do that for starting a slave--we always care if it actually
501
 
    started the threads that were not previously running
502
 
*/
503
 
 
504
 
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
505
 
                        Master_info* mi,
506
 
                        const char* master_info_fname __attribute__((unused)),
507
 
                        const char* slave_info_fname __attribute__((unused)),
508
 
                        int32_t thread_mask)
509
 
{
510
 
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
511
 
  pthread_cond_t* cond_io=0,*cond_sql=0;
512
 
  int32_t error=0;
513
 
 
514
 
  if (need_slave_mutex)
515
 
  {
516
 
    lock_io = &mi->run_lock;
517
 
    lock_sql = &mi->rli.run_lock;
518
 
  }
519
 
  if (wait_for_start)
520
 
  {
521
 
    cond_io = &mi->start_cond;
522
 
    cond_sql = &mi->rli.start_cond;
523
 
    lock_cond_io = &mi->run_lock;
524
 
    lock_cond_sql = &mi->rli.run_lock;
525
 
  }
526
 
 
527
 
  if (thread_mask & SLAVE_IO)
528
 
    error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
529
 
                              cond_io,
530
 
                              &mi->slave_running, &mi->slave_run_id,
531
 
                              mi, 1); //high priority, to read the most possible
532
 
  if (!error && (thread_mask & SLAVE_SQL))
533
 
  {
534
 
    error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
535
 
                              cond_sql,
536
 
                              &mi->rli.slave_running, &mi->rli.slave_run_id,
537
 
                              mi, 0);
538
 
    if (error)
539
 
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
540
 
  }
541
 
  return(error);
542
 
}
543
 
 
544
 
 
545
 
#ifdef NOT_USED_YET
546
 
static int32_t end_slave_on_walk(Master_info* mi, uchar* /*unused*/)
547
 
{
548
 
  end_master_info(mi);
549
 
  return(0);
550
 
}
551
 
#endif
552
 
 
553
 
 
554
 
/*
555
 
  Free all resources used by slave
556
 
 
557
 
  SYNOPSIS
558
 
    end_slave()
559
 
*/
560
 
 
561
 
void end_slave()
562
 
{
563
 
  /*
564
 
    This is called when the server terminates, in close_connections().
565
 
    It terminates slave threads. However, some CHANGE MASTER etc may still be
566
 
    running presently. If a START SLAVE was in progress, the mutex lock below
567
 
    will make us wait until slave threads have started, and START SLAVE
568
 
    returns, then we terminate them here.
569
 
  */
570
 
  pthread_mutex_lock(&LOCK_active_mi);
571
 
  if (active_mi)
572
 
  {
573
 
    /*
574
 
      TODO: replace the line below with
575
 
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
576
 
      once multi-master code is ready.
577
 
    */
578
 
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
579
 
    end_master_info(active_mi);
580
 
    delete active_mi;
581
 
    active_mi= 0;
582
 
  }
583
 
  pthread_mutex_unlock(&LOCK_active_mi);
584
 
  return;
585
 
}
586
 
 
587
 
 
588
 
static bool io_slave_killed(THD* thd, Master_info* mi)
589
 
{
590
 
  assert(mi->io_thd == thd);
591
 
  assert(mi->slave_running); // tracking buffer overrun
592
 
  return(mi->abort_slave || abort_loop || thd->killed);
593
 
}
594
 
 
595
 
 
596
 
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
597
 
{
598
 
  assert(rli->sql_thd == thd);
599
 
  assert(rli->slave_running == 1);// tracking buffer overrun
600
 
  if (abort_loop || thd->killed || rli->abort_slave)
601
 
  {
602
 
    /*
603
 
      If we are in an unsafe situation (stopping could corrupt replication),
604
 
      we give one minute to the slave SQL thread of grace before really
605
 
      terminating, in the hope that it will be able to read more events and
606
 
      the unsafe situation will soon be left. Note that this one minute starts
607
 
      from the last time anything happened in the slave SQL thread. So it's
608
 
      really one minute of idleness, we don't timeout if the slave SQL thread
609
 
      is actively working.
610
 
    */
611
 
    if (rli->last_event_start_time == 0)
612
 
      return(1);
613
 
    if (difftime(time(0), rli->last_event_start_time) > 60)
614
 
    {
615
 
      rli->report(ERROR_LEVEL, 0,
616
 
                  _("SQL thread had to stop in an unsafe situation, in "
617
 
                  "the middle of applying updates to a "
618
 
                  "non-transactional table without any primary key. "
619
 
                  "There is a risk of duplicate updates when the slave "
620
 
                  "SQL thread is restarted. Please check your tables' "
621
 
                  "contents after restart."));
622
 
      return(1);
623
 
    }
624
 
  }
625
 
  return(0);
626
 
}
627
 
 
628
 
 
629
 
/*
630
 
  skip_load_data_infile()
631
 
 
632
 
  NOTES
633
 
    This is used to tell a 3.23 master to break send_file()
634
 
*/
635
 
 
636
 
void skip_load_data_infile(NET *net)
637
 
{
638
 
  (void)net_request_file(net, "/dev/null");
639
 
  (void)my_net_read(net);                               // discard response
640
 
  (void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok
641
 
  return;
642
 
}
643
 
 
644
 
 
645
 
bool net_request_file(NET* net, const char* fname)
646
 
{
647
 
  return(net_write_command(net, 251, (uchar*) fname, strlen(fname),
648
 
                                (uchar*) "", 0));
649
 
}
650
 
 
651
 
/*
652
 
  From other comments and tests in code, it looks like
653
 
  sometimes Query_log_event and Load_log_event can have db == 0
654
 
  (see rewrite_db() above for example)
655
 
  (cases where this happens are unclear; it may be when the master is 3.23).
656
 
*/
657
 
 
658
 
const char *print_slave_db_safe(const char* db)
659
 
{
660
 
  return((db ? db : ""));
661
 
}
662
 
 
663
 
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
664
 
                                 const char *default_val)
665
 
{
666
 
  uint32_t length;
667
 
 
668
 
  if ((length=my_b_gets(f,var, max_size)))
669
 
  {
670
 
    char* last_p = var + length -1;
671
 
    if (*last_p == '\n')
672
 
      *last_p = 0; // if we stopped on newline, kill it
673
 
    else
674
 
    {
675
 
      /*
676
 
        If we truncated a line or stopped on last char, remove all chars
677
 
        up to and including newline.
678
 
      */
679
 
      int32_t c;
680
 
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
681
 
    }
682
 
    return(0);
683
 
  }
684
 
  else if (default_val)
685
 
  {
686
 
    strmake(var,  default_val, max_size-1);
687
 
    return(0);
688
 
  }
689
 
  return(1);
690
 
}
691
 
 
692
 
 
693
 
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
694
 
{
695
 
  char buf[32];
696
 
 
697
 
 
698
 
  if (my_b_gets(f, buf, sizeof(buf)))
699
 
  {
700
 
    *var = atoi(buf);
701
 
    return(0);
702
 
  }
703
 
  else if (default_val)
704
 
  {
705
 
    *var = default_val;
706
 
    return(0);
707
 
  }
708
 
  return(1);
709
 
}
710
 
 
711
 
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
712
 
{
713
 
  char buf[16];
714
 
 
715
 
 
716
 
  if (my_b_gets(f, buf, sizeof(buf)))
717
 
  {
718
 
    if (sscanf(buf, "%f", var) != 1)
719
 
      return(1);
720
 
    else
721
 
      return(0);
722
 
  }
723
 
  else if (default_val != 0.0)
724
 
  {
725
 
    *var = default_val;
726
 
    return(0);
727
 
  }
728
 
  return(1);
729
 
}
730
 
 
731
 
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
732
 
{
733
 
  if (io_slave_killed(thd, mi))
734
 
  {
735
 
    if (info && global_system_variables.log_warnings)
736
 
      sql_print_information(info);
737
 
    return true;
738
 
  }
739
 
  return false;
740
 
}
741
 
 
742
 
 
743
 
/*
744
 
  Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
745
 
  relying on the binlog's version. This is not perfect: imagine an upgrade
746
 
  of the master without waiting that all slaves are in sync with the master;
747
 
  then a slave could be fooled about the binlog's format. This is what happens
748
 
  when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
749
 
  slaves are fooled. So we do this only to distinguish between 3.23 and more
750
 
  recent masters (it's too late to change things for 3.23).
751
 
 
752
 
  RETURNS
753
 
  0       ok
754
 
  1       error
755
 
*/
756
 
 
757
 
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
758
 
{
759
 
  char error_buf[512];
760
 
  String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
761
 
  char err_buff[MAX_SLAVE_ERRMSG];
762
 
  const char* errmsg= 0;
763
 
  int32_t err_code= 0;
764
 
  DRIZZLE_RES *master_res= 0;
765
 
  DRIZZLE_ROW master_row;
766
 
 
767
 
  err_msg.length(0);
768
 
  /*
769
 
    Free old description_event_for_queue (that is needed if we are in
770
 
    a reconnection).
771
 
  */
772
 
  delete mi->rli.relay_log.description_event_for_queue;
773
 
  mi->rli.relay_log.description_event_for_queue= 0;
774
 
 
775
 
  if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
776
 
  {
777
 
    errmsg = _("Master reported unrecognized DRIZZLE version");
778
 
    err_code= ER_SLAVE_FATAL_ERROR;
779
 
    sprintf(err_buff, ER(err_code), errmsg);
780
 
    err_msg.append(err_buff);
781
 
  }
782
 
  else
783
 
  {
784
 
    /*
785
 
      Note the following switch will bug when we have DRIZZLE branch 30 ;)
786
 
    */
787
 
    switch (*drizzle->server_version)
788
 
    {
789
 
    case '0':
790
 
    case '1':
791
 
    case '2':
792
 
      errmsg = _("Master reported unrecognized DRIZZLE version");
793
 
      err_code= ER_SLAVE_FATAL_ERROR;
794
 
      sprintf(err_buff, ER(err_code), errmsg);
795
 
      err_msg.append(err_buff);
796
 
      break;
797
 
    case '3':
798
 
      mi->rli.relay_log.description_event_for_queue= new
799
 
        Format_description_log_event(1, drizzle->server_version);
800
 
      break;
801
 
    case '4':
802
 
      mi->rli.relay_log.description_event_for_queue= new
803
 
        Format_description_log_event(3, drizzle->server_version);
804
 
      break;
805
 
    default:
806
 
      /*
807
 
        Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
808
 
        take the early steps (like tests for "is this a 3.23 master") which we
809
 
        have to take before we receive the real master's Format_desc which will
810
 
        override this one. Note that the Format_desc we create below is garbage
811
 
        (it has the format of the *slave*); it's only good to help know if the
812
 
        master is 3.23, 4.0, etc.
813
 
      */
814
 
      mi->rli.relay_log.description_event_for_queue= new
815
 
        Format_description_log_event(4, drizzle->server_version);
816
 
      break;
817
 
    }
818
 
  }
819
 
 
820
 
  /*
821
 
     This does not mean that a 5.0 slave will be able to read a 6.0 master; but
822
 
     as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
823
 
     can't read a 6.0 master, this will show up when the slave can't read some
824
 
     events sent by the master, and there will be error messages.
825
 
  */
826
 
 
827
 
  if (err_msg.length() != 0)
828
 
    goto err;
829
 
 
830
 
  /* as we are here, we tried to allocate the event */
831
 
  if (!mi->rli.relay_log.description_event_for_queue)
832
 
  {
833
 
    errmsg= _("default Format_description_log_event");
834
 
    err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
835
 
    sprintf(err_buff, ER(err_code), errmsg);
836
 
    err_msg.append(err_buff);
837
 
    goto err;
838
 
  }
839
 
 
840
 
  /*
841
 
    Compare the master and slave's clock. Do not die if master's clock is
842
 
    unavailable (very old master not supporting UNIX_TIMESTAMP()?).
843
 
  */
844
 
 
845
 
  if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
846
 
      (master_res= drizzle_store_result(drizzle)) &&
847
 
      (master_row= drizzle_fetch_row(master_res)))
848
 
  {
849
 
    mi->clock_diff_with_master=
850
 
      (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
851
 
  }
852
 
  else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
853
 
  {
854
 
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
855
 
    sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
856
 
                        "do not trust column Seconds_Behind_Master of SHOW "
857
 
                        "SLAVE STATUS. Error: %s (%d)"),
858
 
                      drizzle_error(drizzle), drizzle_errno(drizzle));
859
 
  }
860
 
  if (master_res)
861
 
    drizzle_free_result(master_res);
862
 
 
863
 
  /*
864
 
    Check that the master's server id and ours are different. Because if they
865
 
    are equal (which can result from a simple copy of master's datadir to slave,
866
 
    thus copying some my.cnf), replication will work but all events will be
867
 
    skipped.
868
 
    Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
869
 
    master?).
870
 
    Note: we could have put a @@SERVER_ID in the previous SELECT
871
 
    UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
872
 
  */
873
 
  if (!drizzle_real_query(drizzle,
874
 
                        STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
875
 
      (master_res= drizzle_store_result(drizzle)))
876
 
  {
877
 
    if ((master_row= drizzle_fetch_row(master_res)) &&
878
 
        (::server_id == strtoul(master_row[1], 0, 10)) &&
879
 
        !mi->rli.replicate_same_server_id)
880
 
    {
881
 
      errmsg=
882
 
        _("The slave I/O thread stops because master and slave have equal "
883
 
          "DRIZZLE server ids; these ids must be different "
884
 
          "for replication to work (or "
885
 
          "the --replicate-same-server-id option must be used "
886
 
          "on slave but this does"
887
 
          "not always make sense; please check the manual before using it).");
888
 
      err_code= ER_SLAVE_FATAL_ERROR;
889
 
      sprintf(err_buff, ER(err_code), errmsg);
890
 
      err_msg.append(err_buff);
891
 
    }
892
 
    drizzle_free_result(master_res);
893
 
    if (errmsg)
894
 
      goto err;
895
 
  }
896
 
 
897
 
  /*
898
 
    Check that the master's global character_set_server and ours are the same.
899
 
    Not fatal if query fails (old master?).
900
 
    Note that we don't check for equality of global character_set_client and
901
 
    collation_connection (neither do we prevent their setting in
902
 
    set_var.cc). That's because from what I (Guilhem) have tested, the global
903
 
    values of these 2 are never used (new connections don't use them).
904
 
    We don't test equality of global collation_database either as it's is
905
 
    going to be deprecated (made read-only) in 4.1 very soon.
906
 
    The test is only relevant if master < 5.0.3 (we'll test only if it's older
907
 
    than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
908
 
    charset info in each binlog event.
909
 
    We don't do it for 3.23 because masters <3.23.50 hang on
910
 
    SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
911
 
    test only if master is 4.x.
912
 
  */
913
 
 
914
 
  /* redundant with rest of code but safer against later additions */
915
 
  if (*drizzle->server_version == '3')
916
 
    goto err;
917
 
 
918
 
  if ((*drizzle->server_version == '4') &&
919
 
      !drizzle_real_query(drizzle,
920
 
                        STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
921
 
      (master_res= drizzle_store_result(drizzle)))
922
 
  {
923
 
    if ((master_row= drizzle_fetch_row(master_res)) &&
924
 
        strcmp(master_row[0], global_system_variables.collation_server->name))
925
 
    {
926
 
      errmsg=
927
 
        _("The slave I/O thread stops because master and slave have"
928
 
          " different values for the COLLATION_SERVER global variable."
929
 
          " The values must be equal for replication to work");
930
 
      err_code= ER_SLAVE_FATAL_ERROR;
931
 
      sprintf(err_buff, ER(err_code), errmsg);
932
 
      err_msg.append(err_buff);
933
 
    }
934
 
    drizzle_free_result(master_res);
935
 
    if (errmsg)
936
 
      goto err;
937
 
  }
938
 
 
939
 
  /*
940
 
    Perform analogous check for time zone. Theoretically we also should
941
 
    perform check here to verify that SYSTEM time zones are the same on
942
 
    slave and master, but we can't rely on value of @@system_time_zone
943
 
    variable (it is time zone abbreviation) since it determined at start
944
 
    time and so could differ for slave and master even if they are really
945
 
    in the same system time zone. So we are omiting this check and just
946
 
    relying on documentation. Also according to Monty there are many users
947
 
    who are using replication between servers in various time zones. Hence
948
 
    such check will broke everything for them. (And now everything will
949
 
    work for them because by default both their master and slave will have
950
 
    'SYSTEM' time zone).
951
 
    This check is only necessary for 4.x masters (and < 5.0.4 masters but
952
 
    those were alpha).
953
 
  */
954
 
  if ((*drizzle->server_version == '4') &&
955
 
      !drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
956
 
      (master_res= drizzle_store_result(drizzle)))
957
 
  {
958
 
    if ((master_row= drizzle_fetch_row(master_res)) &&
959
 
        strcmp(master_row[0],
960
 
               global_system_variables.time_zone->get_name()->ptr()))
961
 
    {
962
 
      errmsg=
963
 
        _("The slave I/O thread stops because master and slave have"
964
 
          " different values for the TIME_ZONE global variable."
965
 
          " The values must be equal for replication to work");
966
 
      err_code= ER_SLAVE_FATAL_ERROR;
967
 
      sprintf(err_buff, ER(err_code), errmsg);
968
 
      err_msg.append(err_buff);
969
 
    }
970
 
    drizzle_free_result(master_res);
971
 
 
972
 
    if (errmsg)
973
 
      goto err;
974
 
  }
975
 
 
976
 
  if (mi->heartbeat_period != 0.0)
977
 
  {
978
 
    char llbuf[22];
979
 
    const char query_format[]= "SET @master_heartbeat_period= %s";
980
 
    char query[sizeof(query_format) - 2 + sizeof(llbuf)];
981
 
    /* 
982
 
       the period is an uint64_t of nano-secs. 
983
 
    */
984
 
    llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
985
 
    sprintf(query, query_format, llbuf);
986
 
 
987
 
    if (drizzle_real_query(drizzle, query, strlen(query))
988
 
        && !check_io_slave_killed(mi->io_thd, mi, NULL))
989
 
    {
990
 
      err_msg.append("The slave I/O thread stops because querying master with '");
991
 
      err_msg.append(query);
992
 
      err_msg.append("' failed;");
993
 
      err_msg.append(" error: ");
994
 
      err_code= drizzle_errno(drizzle);
995
 
      err_msg.qs_append(err_code);
996
 
      err_msg.append("  '");
997
 
      err_msg.append(drizzle_error(drizzle));
998
 
      err_msg.append("'");
999
 
      drizzle_free_result(drizzle_store_result(drizzle));
1000
 
      goto err;
1001
 
    }
1002
 
    drizzle_free_result(drizzle_store_result(drizzle));
1003
 
  }
1004
 
  
1005
 
err:
1006
 
  if (err_msg.length() != 0)
1007
 
  {
1008
 
    sql_print_error(err_msg.ptr());
1009
 
    assert(err_code != 0);
1010
 
    mi->report(ERROR_LEVEL, err_code, err_msg.ptr());
1011
 
    return(1);
1012
 
  }
1013
 
 
1014
 
  return(0);
1015
 
}
1016
 
 
1017
 
 
1018
 
static bool wait_for_relay_log_space(Relay_log_info* rli)
1019
 
{
1020
 
  bool slave_killed=0;
1021
 
  Master_info* mi = rli->mi;
1022
 
  const char *save_proc_info;
1023
 
  THD* thd = mi->io_thd;
1024
 
 
1025
 
  pthread_mutex_lock(&rli->log_space_lock);
1026
 
  save_proc_info= thd->enter_cond(&rli->log_space_cond,
1027
 
                                  &rli->log_space_lock,
1028
 
                                  _("Waiting for the slave SQL thread "
1029
 
                                    "to free enough relay log space"));
1030
 
  while (rli->log_space_limit < rli->log_space_total &&
1031
 
         !(slave_killed=io_slave_killed(thd,mi)) &&
1032
 
         !rli->ignore_log_space_limit)
1033
 
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1034
 
  thd->exit_cond(save_proc_info);
1035
 
  return(slave_killed);
1036
 
}
1037
 
 
1038
 
 
1039
 
/*
1040
 
  Builds a Rotate from the ignored events' info and writes it to relay log.
1041
 
 
1042
 
  SYNOPSIS
1043
 
  write_ignored_events_info_to_relay_log()
1044
 
    thd             pointer to I/O thread's thd
1045
 
    mi
1046
 
 
1047
 
  DESCRIPTION
1048
 
    Slave I/O thread, going to die, must leave a durable trace of the
1049
 
    ignored events' end position for the use of the slave SQL thread, by
1050
 
    calling this function. Only that thread can call it (see assertion).
1051
 
 */
1052
 
static void write_ignored_events_info_to_relay_log(THD *thd __attribute__((unused)),
1053
 
                                                   Master_info *mi)
1054
 
{
1055
 
  Relay_log_info *rli= &mi->rli;
1056
 
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1057
 
 
1058
 
  assert(thd == mi->io_thd);
1059
 
  pthread_mutex_lock(log_lock);
1060
 
  if (rli->ign_master_log_name_end[0])
1061
 
  {
1062
 
    Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1063
 
                                               0, rli->ign_master_log_pos_end,
1064
 
                                               Rotate_log_event::DUP_NAME);
1065
 
    rli->ign_master_log_name_end[0]= 0;
1066
 
    /* can unlock before writing as slave SQL thd will soon see our Rotate */
1067
 
    pthread_mutex_unlock(log_lock);
1068
 
    if (likely((bool)ev))
1069
 
    {
1070
 
      ev->server_id= 0; // don't be ignored by slave SQL thread
1071
 
      if (unlikely(rli->relay_log.append(ev)))
1072
 
        mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
1073
 
                   ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
1074
 
                   _("failed to write a Rotate event"
1075
 
                     " to the relay log, SHOW SLAVE STATUS may be"
1076
 
                     " inaccurate"));
1077
 
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1078
 
      if (flush_master_info(mi, 1))
1079
 
        sql_print_error(_("Failed to flush master info file"));
1080
 
      delete ev;
1081
 
    }
1082
 
    else
1083
 
      mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
1084
 
                 ER(ER_SLAVE_CREATE_EVENT_FAILURE),
1085
 
                 _("Rotate_event (out of memory?),"
1086
 
                   " SHOW SLAVE STATUS may be inaccurate"));
1087
 
  }
1088
 
  else
1089
 
    pthread_mutex_unlock(log_lock);
1090
 
  return;
1091
 
}
1092
 
 
1093
 
 
1094
 
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
1095
 
                             bool *suppress_warnings)
1096
 
{
1097
 
  uchar buf[1024], *pos= buf;
1098
 
  uint32_t report_host_len, report_user_len=0, report_password_len=0;
1099
 
 
1100
 
  *suppress_warnings= false;
1101
 
  if (!report_host)
1102
 
    return(0);
1103
 
  report_host_len= strlen(report_host);
1104
 
  if (report_user)
1105
 
    report_user_len= strlen(report_user);
1106
 
  if (report_password)
1107
 
    report_password_len= strlen(report_password);
1108
 
  /* 30 is a good safety margin */
1109
 
  if (report_host_len + report_user_len + report_password_len + 30 >
1110
 
      sizeof(buf))
1111
 
    return(0);                                     // safety
1112
 
 
1113
 
  int4store(pos, server_id); pos+= 4;
1114
 
  pos= net_store_data(pos, (uchar*) report_host, report_host_len);
1115
 
  pos= net_store_data(pos, (uchar*) report_user, report_user_len);
1116
 
  pos= net_store_data(pos, (uchar*) report_password, report_password_len);
1117
 
  int2store(pos, (uint16_t) report_port); pos+= 2;
1118
 
  int4store(pos, rpl_recovery_rank);    pos+= 4;
1119
 
  /* The master will fill in master_id */
1120
 
  int4store(pos, 0);                    pos+= 4;
1121
 
 
1122
 
  if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
1123
 
  {
1124
 
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1125
 
    {
1126
 
      *suppress_warnings= true;                 // Suppress reconnect warning
1127
 
    }
1128
 
    else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
1129
 
    {
1130
 
      char buf[256];
1131
 
      snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle), 
1132
 
               drizzle_errno(drizzle));
1133
 
      mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1134
 
                 ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1135
 
    }
1136
 
    return(1);
1137
 
  }
1138
 
  return(0);
1139
 
}
1140
 
 
1141
 
 
1142
 
bool show_master_info(THD* thd, Master_info* mi)
1143
 
{
1144
 
  // TODO: fix this for multi-master
1145
 
  List<Item> field_list;
1146
 
  Protocol *protocol= thd->protocol;
1147
 
 
1148
 
  field_list.push_back(new Item_empty_string("Slave_IO_State",
1149
 
                                                     14));
1150
 
  field_list.push_back(new Item_empty_string("Master_Host",
1151
 
                                                     sizeof(mi->host)));
1152
 
  field_list.push_back(new Item_empty_string("Master_User",
1153
 
                                                     sizeof(mi->user)));
1154
 
  field_list.push_back(new Item_return_int("Master_Port", 7,
1155
 
                                           DRIZZLE_TYPE_LONG));
1156
 
  field_list.push_back(new Item_return_int("Connect_Retry", 10,
1157
 
                                           DRIZZLE_TYPE_LONG));
1158
 
  field_list.push_back(new Item_empty_string("Master_Log_File",
1159
 
                                             FN_REFLEN));
1160
 
  field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
1161
 
                                           DRIZZLE_TYPE_LONGLONG));
1162
 
  field_list.push_back(new Item_empty_string("Relay_Log_File",
1163
 
                                             FN_REFLEN));
1164
 
  field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
1165
 
                                           DRIZZLE_TYPE_LONGLONG));
1166
 
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
1167
 
                                             FN_REFLEN));
1168
 
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
1169
 
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1170
 
  field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
1171
 
  field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
1172
 
  field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
1173
 
  field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
1174
 
  field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
1175
 
  field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
1176
 
                                             28));
1177
 
  field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
1178
 
  field_list.push_back(new Item_empty_string("Last_Error", 20));
1179
 
  field_list.push_back(new Item_return_int("Skip_Counter", 10,
1180
 
                                           DRIZZLE_TYPE_LONG));
1181
 
  field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
1182
 
                                           DRIZZLE_TYPE_LONGLONG));
1183
 
  field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
1184
 
                                           DRIZZLE_TYPE_LONGLONG));
1185
 
  field_list.push_back(new Item_empty_string("Until_Condition", 6));
1186
 
  field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1187
 
  field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1188
 
                                           DRIZZLE_TYPE_LONGLONG));
1189
 
  field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
1190
 
  field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
1191
 
                                             sizeof(mi->ssl_ca)));
1192
 
  field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
1193
 
                                             sizeof(mi->ssl_capath)));
1194
 
  field_list.push_back(new Item_empty_string("Master_SSL_Cert",
1195
 
                                             sizeof(mi->ssl_cert)));
1196
 
  field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
1197
 
                                             sizeof(mi->ssl_cipher)));
1198
 
  field_list.push_back(new Item_empty_string("Master_SSL_Key",
1199
 
                                             sizeof(mi->ssl_key)));
1200
 
  field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1201
 
                                           DRIZZLE_TYPE_LONGLONG));
1202
 
  field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
1203
 
                                             3));
1204
 
  field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
1205
 
  field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1206
 
  field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
1207
 
  field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
1208
 
 
1209
 
  if (protocol->send_fields(&field_list,
1210
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1211
 
    return(true);
1212
 
 
1213
 
  if (mi->host[0])
1214
 
  {
1215
 
    String *packet= &thd->packet;
1216
 
    protocol->prepare_for_resend();
1217
 
 
1218
 
    /*
1219
 
      slave_running can be accessed without run_lock but not other
1220
 
      non-volotile members like mi->io_thd, which is guarded by the mutex.
1221
 
    */
1222
 
    pthread_mutex_lock(&mi->run_lock);
1223
 
    protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
1224
 
    pthread_mutex_unlock(&mi->run_lock);
1225
 
 
1226
 
    pthread_mutex_lock(&mi->data_lock);
1227
 
    pthread_mutex_lock(&mi->rli.data_lock);
1228
 
    protocol->store(mi->host, &my_charset_bin);
1229
 
    protocol->store(mi->user, &my_charset_bin);
1230
 
    protocol->store((uint32_t) mi->port);
1231
 
    protocol->store((uint32_t) mi->connect_retry);
1232
 
    protocol->store(mi->master_log_name, &my_charset_bin);
1233
 
    protocol->store((uint64_t) mi->master_log_pos);
1234
 
    protocol->store(mi->rli.group_relay_log_name +
1235
 
                    dirname_length(mi->rli.group_relay_log_name),
1236
 
                    &my_charset_bin);
1237
 
    protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1238
 
    protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
1239
 
    protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1240
 
                    "Yes" : "No", &my_charset_bin);
1241
 
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1242
 
    protocol->store(rpl_filter->get_do_db());
1243
 
    protocol->store(rpl_filter->get_ignore_db());
1244
 
 
1245
 
    char buf[256];
1246
 
    String tmp(buf, sizeof(buf), &my_charset_bin);
1247
 
    rpl_filter->get_do_table(&tmp);
1248
 
    protocol->store(&tmp);
1249
 
    rpl_filter->get_ignore_table(&tmp);
1250
 
    protocol->store(&tmp);
1251
 
    rpl_filter->get_wild_do_table(&tmp);
1252
 
    protocol->store(&tmp);
1253
 
    rpl_filter->get_wild_ignore_table(&tmp);
1254
 
    protocol->store(&tmp);
1255
 
 
1256
 
    protocol->store(mi->rli.last_error().number);
1257
 
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
1258
 
    protocol->store((uint32_t) mi->rli.slave_skip_counter);
1259
 
    protocol->store((uint64_t) mi->rli.group_master_log_pos);
1260
 
    protocol->store((uint64_t) mi->rli.log_space_total);
1261
 
 
1262
 
    protocol->store(
1263
 
      mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
1264
 
        ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
1265
 
          "Relay"), &my_charset_bin);
1266
 
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
1267
 
    protocol->store((uint64_t) mi->rli.until_log_pos);
1268
 
 
1269
 
    protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
1270
 
    protocol->store(mi->ssl_ca, &my_charset_bin);
1271
 
    protocol->store(mi->ssl_capath, &my_charset_bin);
1272
 
    protocol->store(mi->ssl_cert, &my_charset_bin);
1273
 
    protocol->store(mi->ssl_cipher, &my_charset_bin);
1274
 
    protocol->store(mi->ssl_key, &my_charset_bin);
1275
 
 
1276
 
    /*
1277
 
      Seconds_Behind_Master: if SQL thread is running and I/O thread is
1278
 
      connected, we can compute it otherwise show NULL (i.e. unknown).
1279
 
    */
1280
 
    if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
1281
 
        mi->rli.slave_running)
1282
 
    {
1283
 
      long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
1284
 
                       - mi->clock_diff_with_master);
1285
 
      /*
1286
 
        Apparently on some systems time_diff can be <0. Here are possible
1287
 
        reasons related to MySQL:
1288
 
        - the master is itself a slave of another master whose time is ahead.
1289
 
        - somebody used an explicit SET TIMESTAMP on the master.
1290
 
        Possible reason related to granularity-to-second of time functions
1291
 
        (nothing to do with MySQL), which can explain a value of -1:
1292
 
        assume the master's and slave's time are perfectly synchronized, and
1293
 
        that at slave's connection time, when the master's timestamp is read,
1294
 
        it is at the very end of second 1, and (a very short time later) when
1295
 
        the slave's timestamp is read it is at the very beginning of second
1296
 
        2. Then the recorded value for master is 1 and the recorded value for
1297
 
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1298
 
        between timestamp of slave and rli->last_master_timestamp is 0
1299
 
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1300
 
        This confuses users, so we don't go below 0: hence the max().
1301
 
 
1302
 
        last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1303
 
        special marker to say "consider we have caught up".
1304
 
      */
1305
 
      protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1306
 
                                 max((long)0, time_diff) : 0));
1307
 
    }
1308
 
    else
1309
 
    {
1310
 
      protocol->store_null();
1311
 
    }
1312
 
    protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
1313
 
 
1314
 
    // Last_IO_Errno
1315
 
    protocol->store(mi->last_error().number);
1316
 
    // Last_IO_Error
1317
 
    protocol->store(mi->last_error().message, &my_charset_bin);
1318
 
    // Last_SQL_Errno
1319
 
    protocol->store(mi->rli.last_error().number);
1320
 
    // Last_SQL_Error
1321
 
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
1322
 
 
1323
 
    pthread_mutex_unlock(&mi->rli.data_lock);
1324
 
    pthread_mutex_unlock(&mi->data_lock);
1325
 
 
1326
 
    if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
1327
 
      return(true);
1328
 
  }
1329
 
  my_eof(thd);
1330
 
  return(false);
1331
 
}
1332
 
 
1333
 
 
1334
 
void set_slave_thread_options(THD* thd)
1335
 
{
1336
 
  /*
1337
 
     It's nonsense to constrain the slave threads with max_join_size; if a
1338
 
     query succeeded on master, we HAVE to execute it. So set
1339
 
     OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
1340
 
     (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
1341
 
     SELECT examining more than 4 billion rows would still fail (yes, because
1342
 
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1343
 
     only for client threads.
1344
 
  */
1345
 
  uint64_t options= thd->options | OPTION_BIG_SELECTS;
1346
 
  if (opt_log_slave_updates)
1347
 
    options|= OPTION_BIN_LOG;
1348
 
  else
1349
 
    options&= ~OPTION_BIN_LOG;
1350
 
  thd->options= options;
1351
 
  thd->variables.completion_type= 0;
1352
 
  return;
1353
 
}
1354
 
 
1355
 
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
1356
 
{
1357
 
  thd->variables.character_set_client=
1358
 
    global_system_variables.character_set_client;
1359
 
  thd->variables.collation_connection=
1360
 
    global_system_variables.collation_connection;
1361
 
  thd->variables.collation_server=
1362
 
    global_system_variables.collation_server;
1363
 
  thd->update_charset();
1364
 
 
1365
 
  /*
1366
 
    We use a const cast here since the conceptual (and externally
1367
 
    visible) behavior of the function is to set the default charset of
1368
 
    the thread.  That the cache has to be invalidated is a secondary
1369
 
    effect.
1370
 
   */
1371
 
  const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
1372
 
  return;
1373
 
}
1374
 
 
1375
 
/*
1376
 
  init_slave_thread()
1377
 
*/
1378
 
 
1379
 
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
1380
 
{
1381
 
  int32_t simulate_error= 0;
1382
 
  thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
1383
 
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1384
 
  thd->security_ctx->skip_grants();
1385
 
  my_net_init(&thd->net, 0);
1386
 
/*
1387
 
  Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1388
 
  slave threads, since a replication event can become this much larger
1389
 
  than the corresponding packet (query) sent from client to master.
1390
 
*/
1391
 
  thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1392
 
    + MAX_LOG_EVENT_HEADER;  /* note, incr over the global not session var */
1393
 
  thd->slave_thread = 1;
1394
 
  thd->enable_slow_log= opt_log_slow_slave_statements;
1395
 
  set_slave_thread_options(thd);
1396
 
  thd->client_capabilities = CLIENT_LOCAL_FILES;
1397
 
  pthread_mutex_lock(&LOCK_thread_count);
1398
 
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
1399
 
  pthread_mutex_unlock(&LOCK_thread_count);
1400
 
 
1401
 
 simulate_error|= (1 << SLAVE_THD_IO);
1402
 
 simulate_error|= (1 << SLAVE_THD_SQL);
1403
 
  if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
1404
 
  {
1405
 
    thd->cleanup();
1406
 
    return(-1);
1407
 
  }
1408
 
  lex_start(thd);
1409
 
 
1410
 
  if (thd_type == SLAVE_THD_SQL)
1411
 
    thd_proc_info(thd, "Waiting for the next event in relay log");
1412
 
  else
1413
 
    thd_proc_info(thd, "Waiting for master update");
1414
 
  thd->version=refresh_version;
1415
 
  thd->set_time();
1416
 
  return(0);
1417
 
}
1418
 
 
1419
 
 
1420
 
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1421
 
                      void* thread_killed_arg)
1422
 
{
1423
 
  int32_t nap_time;
1424
 
  thr_alarm_t alarmed;
1425
 
 
1426
 
  thr_alarm_init(&alarmed);
1427
 
  time_t start_time= my_time(0);
1428
 
  time_t end_time= start_time+sec;
1429
 
 
1430
 
  while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1431
 
  {
1432
 
    ALARM alarm_buff;
1433
 
    /*
1434
 
      The only reason we are asking for alarm is so that
1435
 
      we will be woken up in case of murder, so if we do not get killed,
1436
 
      set the alarm so it goes off after we wake up naturally
1437
 
    */
1438
 
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
1439
 
    sleep(nap_time);
1440
 
    thr_end_alarm(&alarmed);
1441
 
 
1442
 
    if ((*thread_killed)(thd,thread_killed_arg))
1443
 
      return(1);
1444
 
    start_time= my_time(0);
1445
 
  }
1446
 
  return(0);
1447
 
}
1448
 
 
1449
 
 
1450
 
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
1451
 
                        bool *suppress_warnings)
1452
 
{
1453
 
  uchar buf[FN_REFLEN + 10];
1454
 
  int32_t len;
1455
 
  int32_t binlog_flags = 0; // for now
1456
 
  char* logname = mi->master_log_name;
1457
 
  
1458
 
  *suppress_warnings= false;
1459
 
 
1460
 
  // TODO if big log files: Change next to int8store()
1461
 
  int4store(buf, (uint32_t) mi->master_log_pos);
1462
 
  int2store(buf + 4, binlog_flags);
1463
 
  int4store(buf + 6, server_id);
1464
 
  len = (uint32_t) strlen(logname);
1465
 
  memcpy(buf + 10, logname,len);
1466
 
  if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
1467
 
  {
1468
 
    /*
1469
 
      Something went wrong, so we will just reconnect and retry later
1470
 
      in the future, we should do a better error analysis, but for
1471
 
      now we just fill up the error log :-)
1472
 
    */
1473
 
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1474
 
      *suppress_warnings= true;                 // Suppress reconnect warning
1475
 
    else
1476
 
      sql_print_error(_("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs"),
1477
 
                      drizzle_errno(drizzle), drizzle_error(drizzle),
1478
 
                      mi->connect_retry);
1479
 
    return(1);
1480
 
  }
1481
 
 
1482
 
  return(0);
1483
 
}
1484
 
 
1485
 
/*
1486
 
  Read one event from the master
1487
 
 
1488
 
  SYNOPSIS
1489
 
    read_event()
1490
 
    DRIZZLE               DRIZZLE connection
1491
 
    mi                  Master connection information
1492
 
    suppress_warnings   TRUE when a normal net read timeout has caused us to
1493
 
                        try a reconnect.  We do not want to print anything to
1494
 
                        the error log in this case because this a anormal
1495
 
                        event in an idle server.
1496
 
 
1497
 
    RETURN VALUES
1498
 
    'packet_error'      Error
1499
 
    number              Length of packet
1500
 
*/
1501
 
 
1502
 
static uint32_t read_event(DRIZZLE *drizzle,
1503
 
                        Master_info *mi __attribute__((unused)),
1504
 
                        bool* suppress_warnings)
1505
 
{
1506
 
  uint32_t len;
1507
 
 
1508
 
  *suppress_warnings= false;
1509
 
  /*
1510
 
    my_real_read() will time us out
1511
 
    We check if we were told to die, and if not, try reading again
1512
 
  */
1513
 
  if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
1514
 
    return(packet_error);
1515
 
 
1516
 
  len = cli_safe_read(drizzle);
1517
 
  if (len == packet_error || (int32_t) len < 1)
1518
 
  {
1519
 
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1520
 
    {
1521
 
      /*
1522
 
        We are trying a normal reconnect after a read timeout;
1523
 
        we suppress prints to .err file as long as the reconnect
1524
 
        happens without problems
1525
 
      */
1526
 
      *suppress_warnings= true;
1527
 
    }
1528
 
    else
1529
 
      sql_print_error(_("Error reading packet from server: %s ( server_errno=%d)"),
1530
 
                      drizzle_error(drizzle), drizzle_errno(drizzle));
1531
 
    return(packet_error);
1532
 
  }
1533
 
 
1534
 
  /* Check if eof packet */
1535
 
  if (len < 8 && drizzle->net.read_pos[0] == 254)
1536
 
  {
1537
 
    sql_print_information(_("Slave: received end packet from server, apparent "
1538
 
                            "master shutdown: %s"),
1539
 
                     drizzle_error(drizzle));
1540
 
     return(packet_error);
1541
 
  }
1542
 
 
1543
 
  return(len - 1);
1544
 
}
1545
 
 
1546
 
 
1547
 
int32_t check_expected_error(THD* thd __attribute__((unused)),
1548
 
                         Relay_log_info const *rli __attribute__((unused)),
1549
 
                         int32_t expected_error)
1550
 
{
1551
 
  switch (expected_error) {
1552
 
  case ER_NET_READ_ERROR:
1553
 
  case ER_NET_ERROR_ON_WRITE:
1554
 
  case ER_QUERY_INTERRUPTED:
1555
 
  case ER_SERVER_SHUTDOWN:
1556
 
  case ER_NEW_ABORTING_CONNECTION:
1557
 
    return(1);
1558
 
  default:
1559
 
    return(0);
1560
 
  }
1561
 
}
1562
 
 
1563
 
 
1564
 
/*
1565
 
  Check if the current error is of temporary nature of not.
1566
 
  Some errors are temporary in nature, such as
1567
 
  ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT.  Ndb also signals
1568
 
  that the error is temporary by pushing a warning with the error code
1569
 
  ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1570
 
*/
1571
 
static int32_t has_temporary_error(THD *thd)
1572
 
{
1573
 
  if (thd->is_fatal_error)
1574
 
    return(0);
1575
 
 
1576
 
  if (thd->main_da.is_error())
1577
 
  {
1578
 
    thd->clear_error();
1579
 
    my_error(ER_LOCK_DEADLOCK, MYF(0));
1580
 
  }
1581
 
 
1582
 
  /*
1583
 
    If there is no message in THD, we can't say if it's a temporary
1584
 
    error or not. This is currently the case for Incident_log_event,
1585
 
    which sets no message. Return FALSE.
1586
 
  */
1587
 
  if (!thd->is_error())
1588
 
    return(0);
1589
 
 
1590
 
  /*
1591
 
    Temporary error codes:
1592
 
    currently, InnoDB deadlock detected by InnoDB or lock
1593
 
    wait timeout (innodb_lock_wait_timeout exceeded
1594
 
  */
1595
 
  if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
1596
 
      thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
1597
 
    return(1);
1598
 
 
1599
 
  return(0);
1600
 
}
1601
 
 
1602
 
 
1603
 
/**
1604
 
  Applies the given event and advances the relay log position.
1605
 
 
1606
 
  In essence, this function does:
1607
 
 
1608
 
  @code
1609
 
    ev->apply_event(rli);
1610
 
    ev->update_pos(rli);
1611
 
  @endcode
1612
 
 
1613
 
  But it also does some maintainance, such as skipping events if
1614
 
  needed and reporting errors.
1615
 
 
1616
 
  If the @c skip flag is set, then it is tested whether the event
1617
 
  should be skipped, by looking at the slave_skip_counter and the
1618
 
  server id.  The skip flag should be set when calling this from a
1619
 
  replication thread but not set when executing an explicit BINLOG
1620
 
  statement.
1621
 
 
1622
 
  @retval 0 OK.
1623
 
 
1624
 
  @retval 1 Error calling ev->apply_event().
1625
 
 
1626
 
  @retval 2 No error calling ev->apply_event(), but error calling
1627
 
  ev->update_pos().
1628
 
*/
1629
 
int32_t apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
1630
 
                               bool skip)
1631
 
{
1632
 
  int32_t exec_res= 0;
1633
 
 
1634
 
  /*
1635
 
    Execute the event to change the database and update the binary
1636
 
    log coordinates, but first we set some data that is needed for
1637
 
    the thread.
1638
 
 
1639
 
    The event will be executed unless it is supposed to be skipped.
1640
 
 
1641
 
    Queries originating from this server must be skipped.  Low-level
1642
 
    events (Format_description_log_event, Rotate_log_event,
1643
 
    Stop_log_event) from this server must also be skipped. But for
1644
 
    those we don't want to modify 'group_master_log_pos', because
1645
 
    these events did not exist on the master.
1646
 
    Format_description_log_event is not completely skipped.
1647
 
 
1648
 
    Skip queries specified by the user in 'slave_skip_counter'.  We
1649
 
    can't however skip events that has something to do with the log
1650
 
    files themselves.
1651
 
 
1652
 
    Filtering on own server id is extremely important, to ignore
1653
 
    execution of events created by the creation/rotation of the relay
1654
 
    log (remember that now the relay log starts with its Format_desc,
1655
 
    has a Rotate etc).
1656
 
  */
1657
 
 
1658
 
  thd->server_id = ev->server_id; // use the original server id for logging
1659
 
  thd->set_time();                            // time the query
1660
 
  thd->lex->current_select= 0;
1661
 
  if (!ev->when)
1662
 
    ev->when= my_time(0);
1663
 
  ev->thd = thd; // because up to this point, ev->thd == 0
1664
 
 
1665
 
  if (skip)
1666
 
  {
1667
 
    int32_t reason= ev->shall_skip(rli);
1668
 
    if (reason == Log_event::EVENT_SKIP_COUNT)
1669
 
      --rli->slave_skip_counter;
1670
 
    pthread_mutex_unlock(&rli->data_lock);
1671
 
    if (reason == Log_event::EVENT_SKIP_NOT)
1672
 
      exec_res= ev->apply_event(rli);
1673
 
  }
1674
 
  else
1675
 
    exec_res= ev->apply_event(rli);
1676
 
 
1677
 
  if (exec_res == 0)
1678
 
  {
1679
 
    int32_t error= ev->update_pos(rli);
1680
 
    /*
1681
 
      The update should not fail, so print an error message and
1682
 
      return an error code.
1683
 
 
1684
 
      TODO: Replace this with a decent error message when merged
1685
 
      with BUG#24954 (which adds several new error message).
1686
 
    */
1687
 
    if (error)
1688
 
    {
1689
 
      char buf[22];
1690
 
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
1691
 
                  _("It was not possible to update the positions"
1692
 
                  " of the relay log information: the slave may"
1693
 
                  " be in an inconsistent state."
1694
 
                  " Stopped in %s position %s"),
1695
 
                  rli->group_relay_log_name,
1696
 
                  llstr(rli->group_relay_log_pos, buf));
1697
 
      return(2);
1698
 
    }
1699
 
  }
1700
 
 
1701
 
  return(exec_res ? 1 : 0);
1702
 
}
1703
 
 
1704
 
 
1705
 
/**
1706
 
  Top-level function for executing the next event from the relay log.
1707
 
 
1708
 
  This function reads the event from the relay log, executes it, and
1709
 
  advances the relay log position.  It also handles errors, etc.
1710
 
 
1711
 
  This function may fail to apply the event for the following reasons:
1712
 
 
1713
 
   - The position specfied by the UNTIL condition of the START SLAVE
1714
 
     command is reached.
1715
 
 
1716
 
   - It was not possible to read the event from the log.
1717
 
 
1718
 
   - The slave is killed.
1719
 
 
1720
 
   - An error occurred when applying the event, and the event has been
1721
 
     tried slave_trans_retries times.  If the event has been retried
1722
 
     fewer times, 0 is returned.
1723
 
 
1724
 
   - init_master_info or init_relay_log_pos failed. (These are called
1725
 
     if a failure occurs when applying the event.)</li>
1726
 
 
1727
 
   - An error occurred when updating the binlog position.
1728
 
 
1729
 
  @retval 0 The event was applied.
1730
 
 
1731
 
  @retval 1 The event was not applied.
1732
 
*/
1733
 
static int32_t exec_relay_log_event(THD* thd, Relay_log_info* rli)
1734
 
{
1735
 
  /*
1736
 
     We acquire this mutex since we need it for all operations except
1737
 
     event execution. But we will release it in places where we will
1738
 
     wait for something for example inside of next_event().
1739
 
   */
1740
 
  pthread_mutex_lock(&rli->data_lock);
1741
 
 
1742
 
  Log_event * ev = next_event(rli);
1743
 
 
1744
 
  assert(rli->sql_thd==thd);
1745
 
 
1746
 
  if (sql_slave_killed(thd,rli))
1747
 
  {
1748
 
    pthread_mutex_unlock(&rli->data_lock);
1749
 
    delete ev;
1750
 
    return(1);
1751
 
  }
1752
 
  if (ev)
1753
 
  {
1754
 
    int32_t exec_res;
1755
 
 
1756
 
    /*
1757
 
      This tests if the position of the beginning of the current event
1758
 
      hits the UNTIL barrier.
1759
 
    */
1760
 
    if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
1761
 
        rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
1762
 
                                rli->group_master_log_pos :
1763
 
                                ev->log_pos - ev->data_written))
1764
 
    {
1765
 
      char buf[22];
1766
 
      sql_print_information(_("Slave SQL thread stopped because it reached its"
1767
 
                              " UNTIL position %s"),
1768
 
                            llstr(rli->until_pos(), buf));
1769
 
      /*
1770
 
        Setting abort_slave flag because we do not want additional message about
1771
 
        error in query execution to be printed.
1772
 
      */
1773
 
      rli->abort_slave= 1;
1774
 
      pthread_mutex_unlock(&rli->data_lock);
1775
 
      delete ev;
1776
 
      return(1);
1777
 
    }
1778
 
    exec_res= apply_event_and_update_pos(ev, thd, rli, true);
1779
 
 
1780
 
    /*
1781
 
      Format_description_log_event should not be deleted because it will be
1782
 
      used to read info about the relay log's format; it will be deleted when
1783
 
      the SQL thread does not need it, i.e. when this thread terminates.
1784
 
    */
1785
 
    if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
1786
 
    {
1787
 
      delete ev;
1788
 
    }
1789
 
 
1790
 
    /*
1791
 
      update_log_pos failed: this should not happen, so we don't
1792
 
      retry.
1793
 
    */
1794
 
    if (exec_res == 2)
1795
 
      return(1);
1796
 
 
1797
 
    if (slave_trans_retries)
1798
 
    {
1799
 
      int32_t temp_err= 0;
1800
 
      if (exec_res && (temp_err= has_temporary_error(thd)))
1801
 
      {
1802
 
        const char *errmsg;
1803
 
        /*
1804
 
          We were in a transaction which has been rolled back because of a
1805
 
          temporary error;
1806
 
          let's seek back to BEGIN log event and retry it all again.
1807
 
          Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
1808
 
          there is no rollback since 5.0.13 (ref: manual).
1809
 
          We have to not only seek but also
1810
 
          a) init_master_info(), to seek back to hot relay log's start for later
1811
 
          (for when we will come back to this hot log after re-processing the
1812
 
          possibly existing old logs where BEGIN is: check_binlog_magic() will
1813
 
          then need the cache to be at position 0 (see comments at beginning of
1814
 
          init_master_info()).
1815
 
          b) init_relay_log_pos(), because the BEGIN may be an older relay log.
1816
 
        */
1817
 
        if (rli->trans_retries < slave_trans_retries)
1818
 
        {
1819
 
          if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
1820
 
            sql_print_error(_("Failed to initialize the master info structure"));
1821
 
          else if (init_relay_log_pos(rli,
1822
 
                                      rli->group_relay_log_name,
1823
 
                                      rli->group_relay_log_pos,
1824
 
                                      1, &errmsg, 1))
1825
 
            sql_print_error(_("Error initializing relay log position: %s"),
1826
 
                            errmsg);
1827
 
          else
1828
 
          {
1829
 
            exec_res= 0;
1830
 
            end_trans(thd, ROLLBACK);
1831
 
            /* chance for concurrent connection to get more locks */
1832
 
            safe_sleep(thd, min(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1833
 
                       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1834
 
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1835
 
            rli->trans_retries++;
1836
 
            rli->retried_trans++;
1837
 
            pthread_mutex_unlock(&rli->data_lock);
1838
 
          }
1839
 
        }
1840
 
        else
1841
 
          sql_print_error(_("Slave SQL thread retried transaction %lu time(s) "
1842
 
                            "in vain, giving up. Consider raising the value of "
1843
 
                            "the slave_transaction_retries variable."),
1844
 
                          slave_trans_retries);
1845
 
      }
1846
 
      else if ((exec_res && !temp_err) ||
1847
 
               (opt_using_transactions &&
1848
 
                rli->group_relay_log_pos == rli->event_relay_log_pos))
1849
 
      {
1850
 
        /*
1851
 
          Only reset the retry counter if the entire group succeeded
1852
 
          or failed with a non-transient error.  On a successful
1853
 
          event, the execution will proceed as usual; in the case of a
1854
 
          non-transient error, the slave will stop with an error.
1855
 
         */
1856
 
        rli->trans_retries= 0; // restart from fresh
1857
 
      }
1858
 
    }
1859
 
    return(exec_res);
1860
 
  }
1861
 
  pthread_mutex_unlock(&rli->data_lock);
1862
 
  rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
1863
 
              ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
1864
 
              _("Could not parse relay log event entry. The possible reasons "
1865
 
                "are: the master's binary log is corrupted (you can check this "
1866
 
                "by running 'mysqlbinlog' on the binary log), the slave's "
1867
 
                "relay log is corrupted (you can check this by running "
1868
 
                "'mysqlbinlog' on the relay log), a network problem, or a bug "
1869
 
                "in the master's or slave's DRIZZLE code. If you want to check "
1870
 
                "the master's binary log or slave's relay log, you will be "
1871
 
                "able to know their names by issuing 'SHOW SLAVE STATUS' "
1872
 
                "on this slave."));
1873
 
  return(1);
1874
 
}
1875
 
 
1876
 
 
1877
 
/**
1878
 
  @brief Try to reconnect slave IO thread.
1879
 
 
1880
 
  @details Terminates current connection to master, sleeps for
1881
 
  @c mi->connect_retry msecs and initiates new connection with
1882
 
  @c safe_reconnect(). Variable pointed by @c retry_count is increased -
1883
 
  if it exceeds @c master_retry_count then connection is not re-established
1884
 
  and function signals error.
1885
 
  Unless @c suppres_warnings is TRUE, a warning is put in the server error log
1886
 
  when reconnecting. The warning message and messages used to report errors
1887
 
  are taken from @c messages array. In case @c master_retry_count is exceeded,
1888
 
  no messages are added to the log.
1889
 
 
1890
 
  @param[in]     thd                 Thread context.
1891
 
  @param[in]     DRIZZLE               DRIZZLE connection.
1892
 
  @param[in]     mi                  Master connection information.
1893
 
  @param[in,out] retry_count         Number of attempts to reconnect.
1894
 
  @param[in]     suppress_warnings   TRUE when a normal net read timeout 
1895
 
                                     has caused to reconnecting.
1896
 
  @param[in]     messages            Messages to print/log, see 
1897
 
                                     reconnect_messages[] array.
1898
 
 
1899
 
  @retval        0                   OK.
1900
 
  @retval        1                   There was an error.
1901
 
*/
1902
 
 
1903
 
static int32_t try_to_reconnect(THD *thd, DRIZZLE *drizzle, Master_info *mi,
1904
 
                            uint32_t *retry_count, bool suppress_warnings,
1905
 
                            const char *messages[SLAVE_RECON_MSG_MAX])
1906
 
{
1907
 
  mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1908
 
  thd->proc_info= _(messages[SLAVE_RECON_MSG_WAIT]);
1909
 
#ifdef SIGNAL_WITH_VIO_CLOSE
1910
 
  thd->clear_active_vio();
1911
 
#endif
1912
 
  end_server(drizzle);
1913
 
  if ((*retry_count)++)
1914
 
  {
1915
 
    if (*retry_count > master_retry_count)
1916
 
      return 1;                             // Don't retry forever
1917
 
    safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1918
 
               (void *) mi);
1919
 
  }
1920
 
  if (check_io_slave_killed(thd, mi,
1921
 
                            _(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1922
 
    return 1;
1923
 
  thd->proc_info = _(messages[SLAVE_RECON_MSG_AFTER]);
1924
 
  if (!suppress_warnings)
1925
 
  {
1926
 
    char buf[256], llbuff[22];
1927
 
    snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1928
 
             IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
1929
 
    /*
1930
 
      Raise a warining during registering on master/requesting dump.
1931
 
      Log a message reading event.
1932
 
    */
1933
 
    if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
1934
 
    {
1935
 
      mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1936
 
                 ER(ER_SLAVE_MASTER_COM_FAILURE),
1937
 
                 _(messages[SLAVE_RECON_MSG_COMMAND]), buf);
1938
 
    }
1939
 
    else
1940
 
    {
1941
 
      sql_print_information(buf);
1942
 
    }
1943
 
  }
1944
 
  if (safe_reconnect(thd, drizzle, mi, 1) || io_slave_killed(thd, mi))
1945
 
  {
1946
 
    if (global_system_variables.log_warnings)
1947
 
      sql_print_information(_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1948
 
    return 1;
1949
 
  }
1950
 
  return 0;
1951
 
}
1952
 
 
1953
 
 
1954
 
/* Slave I/O Thread entry point */
1955
 
 
1956
 
pthread_handler_t handle_slave_io(void *arg)
1957
 
{
1958
 
  THD *thd; // needs to be first for thread_stack
1959
 
  DRIZZLE *drizzle;
1960
 
  Master_info *mi = (Master_info*)arg;
1961
 
  Relay_log_info *rli= &mi->rli;
1962
 
  char llbuff[22];
1963
 
  uint32_t retry_count;
1964
 
  bool suppress_warnings;
1965
 
  uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
1966
 
  my_thread_init();
1967
 
 
1968
 
  assert(mi->inited);
1969
 
  drizzle= NULL ;
1970
 
  retry_count= 0;
1971
 
 
1972
 
  pthread_mutex_lock(&mi->run_lock);
1973
 
  /* Inform waiting threads that slave has started */
1974
 
  mi->slave_run_id++;
1975
 
 
1976
 
  mi->events_till_disconnect = disconnect_slave_event_count;
1977
 
 
1978
 
  thd= new THD;
1979
 
  THD_CHECK_SENTRY(thd);
1980
 
  mi->io_thd = thd;
1981
 
 
1982
 
  pthread_detach_this_thread();
1983
 
  thd->thread_stack= (char*) &thd; // remember where our stack is
1984
 
  if (init_slave_thread(thd, SLAVE_THD_IO))
1985
 
  {
1986
 
    pthread_cond_broadcast(&mi->start_cond);
1987
 
    pthread_mutex_unlock(&mi->run_lock);
1988
 
    sql_print_error(_("Failed during slave I/O thread initialization"));
1989
 
    goto err;
1990
 
  }
1991
 
  pthread_mutex_lock(&LOCK_thread_count);
1992
 
  threads.append(thd);
1993
 
  pthread_mutex_unlock(&LOCK_thread_count);
1994
 
  mi->slave_running = 1;
1995
 
  mi->abort_slave = 0;
1996
 
  pthread_mutex_unlock(&mi->run_lock);
1997
 
  pthread_cond_broadcast(&mi->start_cond);
1998
 
 
1999
 
  if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
2000
 
  {
2001
 
    mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2002
 
               ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
2003
 
    goto err;
2004
 
  }
2005
 
 
2006
 
  thd_proc_info(thd, "Connecting to master");
2007
 
  // we can get killed during safe_connect
2008
 
  if (!safe_connect(thd, drizzle, mi))
2009
 
  {
2010
 
    sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
2011
 
                            "replication started in log '%s' at position %s"),
2012
 
                          mi->user, mi->host, mi->port,
2013
 
                          IO_RPL_LOG_NAME,
2014
 
                          llstr(mi->master_log_pos,llbuff));
2015
 
  /*
2016
 
    Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
2017
 
    thread, since a replication event can become this much larger than
2018
 
    the corresponding packet (query) sent from client to master.
2019
 
  */
2020
 
    drizzle->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
2021
 
  }
2022
 
  else
2023
 
  {
2024
 
    sql_print_information(_("Slave I/O thread killed while connecting to master"));
2025
 
    goto err;
2026
 
  }
2027
 
 
2028
 
connected:
2029
 
 
2030
 
  // TODO: the assignment below should be under mutex (5.0)
2031
 
  mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
2032
 
  thd->slave_net = &drizzle->net;
2033
 
  thd_proc_info(thd, "Checking master version");
2034
 
  if (get_master_version_and_clock(drizzle, mi))
2035
 
    goto err;
2036
 
  
2037
 
  if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
2038
 
  {
2039
 
    /*
2040
 
      Register ourselves with the master.
2041
 
    */
2042
 
    thd_proc_info(thd, "Registering slave on master");
2043
 
    if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2044
 
    {
2045
 
      if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
2046
 
                                 "while registering slave on master"))
2047
 
      {
2048
 
        sql_print_error(_("Slave I/O thread couldn't register on master"));
2049
 
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2050
 
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
2051
 
          goto err;
2052
 
      }
2053
 
      else
2054
 
        goto err;
2055
 
      goto connected;
2056
 
    }
2057
 
    if (!retry_count_reg)
2058
 
    {
2059
 
      retry_count_reg++;
2060
 
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
2061
 
      if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2062
 
                           reconnect_messages[SLAVE_RECON_ACT_REG]))
2063
 
        goto err;
2064
 
      goto connected;
2065
 
    }
2066
 
  }
2067
 
 
2068
 
  while (!io_slave_killed(thd,mi))
2069
 
  {
2070
 
    thd_proc_info(thd, "Requesting binlog dump");
2071
 
    if (request_dump(drizzle, mi, &suppress_warnings))
2072
 
    {
2073
 
      sql_print_error(_("Failed on request_dump()"));
2074
 
      if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while \
2075
 
requesting master dump")) ||
2076
 
          try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2077
 
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2078
 
        goto err;
2079
 
      goto connected;
2080
 
    }
2081
 
    if (!retry_count_dump)
2082
 
    {
2083
 
      retry_count_dump++;
2084
 
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
2085
 
      if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2086
 
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2087
 
        goto err;
2088
 
      goto connected;
2089
 
    }
2090
 
 
2091
 
    while (!io_slave_killed(thd,mi))
2092
 
    {
2093
 
      uint32_t event_len;
2094
 
      /*
2095
 
        We say "waiting" because read_event() will wait if there's nothing to
2096
 
        read. But if there's something to read, it will not wait. The
2097
 
        important thing is to not confuse users by saying "reading" whereas
2098
 
        we're in fact receiving nothing.
2099
 
      */
2100
 
      thd_proc_info(thd, _("Waiting for master to send event"));
2101
 
      event_len= read_event(drizzle, mi, &suppress_warnings);
2102
 
      if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while "
2103
 
                                           "reading event")))
2104
 
        goto err;
2105
 
      if (!retry_count_event)
2106
 
      {
2107
 
        retry_count_event++;
2108
 
        sql_print_information(_("Forcing to reconnect slave I/O thread"));
2109
 
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2110
 
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2111
 
          goto err;
2112
 
        goto connected;
2113
 
      }
2114
 
 
2115
 
      if (event_len == packet_error)
2116
 
      {
2117
 
        uint32_t drizzle_error_number= drizzle_errno(drizzle);
2118
 
        switch (drizzle_error_number) {
2119
 
        case CR_NET_PACKET_TOO_LARGE:
2120
 
          sql_print_error(_("Log entry on master is longer than "
2121
 
                            "max_allowed_packet (%ld) on "
2122
 
                            "slave. If the entry is correct, restart the "
2123
 
                            "server with a higher value of "
2124
 
                            "max_allowed_packet"),
2125
 
                          thd->variables.max_allowed_packet);
2126
 
          goto err;
2127
 
        case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2128
 
          sql_print_error(ER(drizzle_error_number), drizzle_error_number,
2129
 
                          drizzle_error(drizzle));
2130
 
          goto err;
2131
 
        case EE_OUTOFMEMORY:
2132
 
        case ER_OUTOFMEMORY:
2133
 
          sql_print_error(
2134
 
       _("Stopping slave I/O thread due to out-of-memory error from master"));
2135
 
          goto err;
2136
 
        }
2137
 
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
2138
 
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2139
 
          goto err;
2140
 
        goto connected;
2141
 
      } // if (event_len == packet_error)
2142
 
 
2143
 
      retry_count=0;                    // ok event, reset retry counter
2144
 
      thd_proc_info(thd, _("Queueing master event to the relay log"));
2145
 
      if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2146
 
      {
2147
 
        goto err;
2148
 
      }
2149
 
      if (flush_master_info(mi, 1))
2150
 
      {
2151
 
        sql_print_error(_("Failed to flush master info file"));
2152
 
        goto err;
2153
 
      }
2154
 
      /*
2155
 
        See if the relay logs take too much space.
2156
 
        We don't lock mi->rli.log_space_lock here; this dirty read saves time
2157
 
        and does not introduce any problem:
2158
 
        - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
2159
 
        the clean value is 0), then we are reading only one more event as we
2160
 
        should, and we'll block only at the next event. No big deal.
2161
 
        - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
2162
 
        the clean value is 1), then we are going into wait_for_relay_log_space()
2163
 
        for no reason, but this function will do a clean read, notice the clean
2164
 
        value and exit immediately.
2165
 
      */
2166
 
      if (rli->log_space_limit && rli->log_space_limit <
2167
 
          rli->log_space_total &&
2168
 
          !rli->ignore_log_space_limit)
2169
 
        if (wait_for_relay_log_space(rli))
2170
 
        {
2171
 
          sql_print_error(_("Slave I/O thread aborted while waiting for "
2172
 
                            "relay log space"));
2173
 
          goto err;
2174
 
        }
2175
 
    }
2176
 
  }
2177
 
 
2178
 
// error = 0;
2179
 
err:
2180
 
// print the current replication position
2181
 
  sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2182
 
                          "position %s"),
2183
 
                        IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2184
 
  VOID(pthread_mutex_lock(&LOCK_thread_count));
2185
 
  thd->query = thd->db = 0; // extra safety
2186
 
  thd->query_length= thd->db_length= 0;
2187
 
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
2188
 
  if (drizzle)
2189
 
  {
2190
 
    /*
2191
 
      Here we need to clear the active VIO before closing the
2192
 
      connection with the master.  The reason is that THD::awake()
2193
 
      might be called from terminate_slave_thread() because somebody
2194
 
      issued a STOP SLAVE.  If that happends, the close_active_vio()
2195
 
      can be called in the middle of closing the VIO associated with
2196
 
      the 'mysql' object, causing a crash.
2197
 
    */
2198
 
#ifdef SIGNAL_WITH_VIO_CLOSE
2199
 
    thd->clear_active_vio();
2200
 
#endif
2201
 
    drizzle_close(drizzle);
2202
 
    mi->drizzle=0;
2203
 
  }
2204
 
  write_ignored_events_info_to_relay_log(thd, mi);
2205
 
  thd_proc_info(thd, _("Waiting for slave mutex on exit"));
2206
 
  pthread_mutex_lock(&mi->run_lock);
2207
 
 
2208
 
  /* Forget the relay log's format */
2209
 
  delete mi->rli.relay_log.description_event_for_queue;
2210
 
  mi->rli.relay_log.description_event_for_queue= 0;
2211
 
  // TODO: make rpl_status part of Master_info
2212
 
  change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2213
 
  assert(thd->net.buff != 0);
2214
 
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
2215
 
  close_thread_tables(thd);
2216
 
  pthread_mutex_lock(&LOCK_thread_count);
2217
 
  THD_CHECK_SENTRY(thd);
2218
 
  delete thd;
2219
 
  pthread_mutex_unlock(&LOCK_thread_count);
2220
 
  mi->abort_slave= 0;
2221
 
  mi->slave_running= 0;
2222
 
  mi->io_thd= 0;
2223
 
  /*
2224
 
    Note: the order of the two following calls (first broadcast, then unlock)
2225
 
    is important. Otherwise a killer_thread can execute between the calls and
2226
 
    delete the mi structure leading to a crash! (see BUG#25306 for details)
2227
 
   */ 
2228
 
  pthread_cond_broadcast(&mi->stop_cond);       // tell the world we are done
2229
 
  pthread_mutex_unlock(&mi->run_lock);
2230
 
  my_thread_end();
2231
 
  pthread_exit(0);
2232
 
  return(0);                               // Can't return anything here
2233
 
}
2234
 
 
2235
 
 
2236
 
/* Slave SQL Thread entry point */
2237
 
 
2238
 
pthread_handler_t handle_slave_sql(void *arg)
2239
 
{
2240
 
  THD *thd;                     /* needs to be first for thread_stack */
2241
 
  char llbuff[22],llbuff1[22];
2242
 
 
2243
 
  Relay_log_info* rli = &((Master_info*)arg)->rli;
2244
 
  const char *errmsg;
2245
 
 
2246
 
  my_thread_init();
2247
 
 
2248
 
  assert(rli->inited);
2249
 
  pthread_mutex_lock(&rli->run_lock);
2250
 
  assert(!rli->slave_running);
2251
 
  errmsg= 0;
2252
 
  rli->events_till_abort = abort_slave_event_count;
2253
 
 
2254
 
  thd = new THD;
2255
 
  thd->thread_stack = (char*)&thd; // remember where our stack is
2256
 
  rli->sql_thd= thd;
2257
 
  
2258
 
  /* Inform waiting threads that slave has started */
2259
 
  rli->slave_run_id++;
2260
 
  rli->slave_running = 1;
2261
 
 
2262
 
  pthread_detach_this_thread();
2263
 
  if (init_slave_thread(thd, SLAVE_THD_SQL))
2264
 
  {
2265
 
    /*
2266
 
      TODO: this is currently broken - slave start and change master
2267
 
      will be stuck if we fail here
2268
 
    */
2269
 
    pthread_cond_broadcast(&rli->start_cond);
2270
 
    pthread_mutex_unlock(&rli->run_lock);
2271
 
    sql_print_error(_("Failed during slave thread initialization"));
2272
 
    goto err;
2273
 
  }
2274
 
  thd->init_for_queries();
2275
 
  thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
2276
 
  pthread_mutex_lock(&LOCK_thread_count);
2277
 
  threads.append(thd);
2278
 
  pthread_mutex_unlock(&LOCK_thread_count);
2279
 
  /*
2280
 
    We are going to set slave_running to 1. Assuming slave I/O thread is
2281
 
    alive and connected, this is going to make Seconds_Behind_Master be 0
2282
 
    i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
2283
 
    the moment we start we can think we are caught up, and the next second we
2284
 
    start receiving data so we realize we are not caught up and
2285
 
    Seconds_Behind_Master grows. No big deal.
2286
 
  */
2287
 
  rli->abort_slave = 0;
2288
 
  pthread_mutex_unlock(&rli->run_lock);
2289
 
  pthread_cond_broadcast(&rli->start_cond);
2290
 
 
2291
 
  /*
2292
 
    Reset errors for a clean start (otherwise, if the master is idle, the SQL
2293
 
    thread may execute no Query_log_event, so the error will remain even
2294
 
    though there's no problem anymore). Do not reset the master timestamp
2295
 
    (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
2296
 
    as we are not sure that we are going to receive a query, we want to
2297
 
    remember the last master timestamp (to say how many seconds behind we are
2298
 
    now.
2299
 
    But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
2300
 
  */
2301
 
  rli->clear_error();
2302
 
 
2303
 
  //tell the I/O thread to take relay_log_space_limit into account from now on
2304
 
  pthread_mutex_lock(&rli->log_space_lock);
2305
 
  rli->ignore_log_space_limit= 0;
2306
 
  pthread_mutex_unlock(&rli->log_space_lock);
2307
 
  rli->trans_retries= 0; // start from "no error"
2308
 
 
2309
 
  if (init_relay_log_pos(rli,
2310
 
                         rli->group_relay_log_name,
2311
 
                         rli->group_relay_log_pos,
2312
 
                         1 /*need data lock*/, &errmsg,
2313
 
                         1 /*look for a description_event*/))
2314
 
  {
2315
 
    sql_print_error(_("Error initializing relay log position: %s"),
2316
 
                    errmsg);
2317
 
    goto err;
2318
 
  }
2319
 
  THD_CHECK_SENTRY(thd);
2320
 
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2321
 
  /*
2322
 
    Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2323
 
    correct position when it's called just after my_b_seek() (the questionable
2324
 
    stuff is those "seek is done on next read" comments in the my_b_seek()
2325
 
    source code).
2326
 
    The crude reality is that this assertion randomly fails whereas
2327
 
    replication seems to work fine. And there is no easy explanation why it
2328
 
    fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
2329
 
    init_relay_log_pos() called above). Maybe the assertion would be
2330
 
    meaningful if we held rli->data_lock between the my_b_seek() and the
2331
 
    assert().
2332
 
  */
2333
 
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2334
 
  assert(rli->sql_thd == thd);
2335
 
 
2336
 
  if (global_system_variables.log_warnings)
2337
 
    sql_print_information(_("Slave SQL thread initialized, "
2338
 
                            "starting replication in log '%s' at "
2339
 
                            "position %s, relay log '%s' position: %s"),
2340
 
                            RPL_LOG_NAME,
2341
 
                          llstr(rli->group_master_log_pos,llbuff),
2342
 
                          rli->group_relay_log_name,
2343
 
                          llstr(rli->group_relay_log_pos,llbuff1));
2344
 
 
2345
 
  /* execute init_slave variable */
2346
 
  if (sys_init_slave.value_length)
2347
 
  {
2348
 
    execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
2349
 
    if (thd->is_slave_error)
2350
 
    {
2351
 
      sql_print_error(_("Slave SQL thread aborted. "
2352
 
                        "Can't execute init_slave query"));
2353
 
      goto err;
2354
 
    }
2355
 
  }
2356
 
 
2357
 
  /*
2358
 
    First check until condition - probably there is nothing to execute. We
2359
 
    do not want to wait for next event in this case.
2360
 
  */
2361
 
  pthread_mutex_lock(&rli->data_lock);
2362
 
  if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
2363
 
      rli->is_until_satisfied(rli->group_master_log_pos))
2364
 
  {
2365
 
    char buf[22];
2366
 
    sql_print_information(_("Slave SQL thread stopped because it reached its"
2367
 
                            " UNTIL position %s"), llstr(rli->until_pos(), buf));
2368
 
    pthread_mutex_unlock(&rli->data_lock);
2369
 
    goto err;
2370
 
  }
2371
 
  pthread_mutex_unlock(&rli->data_lock);
2372
 
 
2373
 
  /* Read queries from the IO/THREAD until this thread is killed */
2374
 
 
2375
 
  while (!sql_slave_killed(thd,rli))
2376
 
  {
2377
 
    thd_proc_info(thd, _("Reading event from the relay log"));
2378
 
    assert(rli->sql_thd == thd);
2379
 
    THD_CHECK_SENTRY(thd);
2380
 
    if (exec_relay_log_event(thd,rli))
2381
 
    {
2382
 
      // do not scare the user if SQL thread was simply killed or stopped
2383
 
      if (!sql_slave_killed(thd,rli))
2384
 
      {
2385
 
        /*
2386
 
          retrieve as much info as possible from the thd and, error
2387
 
          codes and warnings and print this to the error log as to
2388
 
          allow the user to locate the error
2389
 
        */
2390
 
        uint32_t const last_errno= rli->last_error().number;
2391
 
 
2392
 
        if (thd->is_error())
2393
 
        {
2394
 
          char const *const errmsg= thd->main_da.message();
2395
 
 
2396
 
          if (last_errno == 0)
2397
 
          {
2398
 
            rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), errmsg);
2399
 
          }
2400
 
          else if (last_errno != thd->main_da.sql_errno())
2401
 
          {
2402
 
            sql_print_error(_("Slave (additional info): %s Error_code: %d"),
2403
 
                            errmsg, thd->main_da.sql_errno());
2404
 
          }
2405
 
        }
2406
 
 
2407
 
        /* Print any warnings issued */
2408
 
        List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
2409
 
        DRIZZLE_ERROR *err;
2410
 
        /*
2411
 
          Added controlled slave thread cancel for replication
2412
 
          of user-defined variables.
2413
 
        */
2414
 
        bool udf_error = false;
2415
 
        while ((err= it++))
2416
 
        {
2417
 
          if (err->code == ER_CANT_OPEN_LIBRARY)
2418
 
            udf_error = true;
2419
 
          sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
2420
 
        }
2421
 
        if (udf_error)
2422
 
          sql_print_error(_("Error loading user-defined library, slave SQL "
2423
 
                            "thread aborted. Install the missing library, "
2424
 
                            "and restart the slave SQL thread with "
2425
 
                            "\"SLAVE START\". We stopped at log '%s' "
2426
 
                            "position %s"),
2427
 
                          RPL_LOG_NAME, llstr(rli->group_master_log_pos,
2428
 
            llbuff));
2429
 
        else
2430
 
          sql_print_error(_("Error running query, slave SQL thread aborted. "
2431
 
                            "Fix the problem, and restart "
2432
 
                            "the slave SQL thread with \"SLAVE START\". "
2433
 
                            "We stopped at log '%s' position %s"),
2434
 
                          RPL_LOG_NAME,
2435
 
                          llstr(rli->group_master_log_pos, llbuff));
2436
 
      }
2437
 
      goto err;
2438
 
    }
2439
 
  }
2440
 
 
2441
 
  /* Thread stopped. Print the current replication position to the log */
2442
 
  sql_print_information(_("Slave SQL thread exiting, replication stopped in "
2443
 
                          "log '%s' at position %s"),
2444
 
                        RPL_LOG_NAME,
2445
 
                        llstr(rli->group_master_log_pos,llbuff));
2446
 
 
2447
 
 err:
2448
 
 
2449
 
  /*
2450
 
    Some events set some playgrounds, which won't be cleared because thread
2451
 
    stops. Stopping of this thread may not be known to these events ("stop"
2452
 
    request is detected only by the present function, not by events), so we
2453
 
    must "proactively" clear playgrounds:
2454
 
  */
2455
 
  rli->cleanup_context(thd, 1);
2456
 
  VOID(pthread_mutex_lock(&LOCK_thread_count));
2457
 
  /*
2458
 
    Some extra safety, which should not been needed (normally, event deletion
2459
 
    should already have done these assignments (each event which sets these
2460
 
    variables is supposed to set them to 0 before terminating)).
2461
 
  */
2462
 
  thd->query= thd->db= thd->catalog= 0;
2463
 
  thd->query_length= thd->db_length= 0;
2464
 
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
2465
 
  thd_proc_info(thd, "Waiting for slave mutex on exit");
2466
 
  pthread_mutex_lock(&rli->run_lock);
2467
 
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
2468
 
  pthread_mutex_lock(&rli->data_lock);
2469
 
  assert(rli->slave_running == 1); // tracking buffer overrun
2470
 
  /* When master_pos_wait() wakes up it will check this and terminate */
2471
 
  rli->slave_running= 0;
2472
 
  /* Forget the relay log's format */
2473
 
  delete rli->relay_log.description_event_for_exec;
2474
 
  rli->relay_log.description_event_for_exec= 0;
2475
 
  /* Wake up master_pos_wait() */
2476
 
  pthread_mutex_unlock(&rli->data_lock);
2477
 
  pthread_cond_broadcast(&rli->data_cond);
2478
 
  rli->ignore_log_space_limit= 0; /* don't need any lock */
2479
 
  /* we die so won't remember charset - re-update them on next thread start */
2480
 
  rli->cached_charset_invalidate();
2481
 
  rli->save_temporary_tables = thd->temporary_tables;
2482
 
 
2483
 
  /*
2484
 
    TODO: see if we can do this conditionally in next_event() instead
2485
 
    to avoid unneeded position re-init
2486
 
  */
2487
 
  thd->temporary_tables = 0; // remove tempation from destructor to close them
2488
 
  assert(thd->net.buff != 0);
2489
 
  net_end(&thd->net); // destructor will not free it, because we are weird
2490
 
  assert(rli->sql_thd == thd);
2491
 
  THD_CHECK_SENTRY(thd);
2492
 
  rli->sql_thd= 0;
2493
 
  pthread_mutex_lock(&LOCK_thread_count);
2494
 
  THD_CHECK_SENTRY(thd);
2495
 
  delete thd;
2496
 
  pthread_mutex_unlock(&LOCK_thread_count);
2497
 
 /*
2498
 
  Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
2499
 
  is important. Otherwise a killer_thread can execute between the calls and
2500
 
  delete the mi structure leading to a crash! (see BUG#25306 for details)
2501
 
 */ 
2502
 
  pthread_cond_broadcast(&rli->stop_cond);
2503
 
  pthread_mutex_unlock(&rli->run_lock);  // tell the world we are done
2504
 
  
2505
 
  my_thread_end();
2506
 
  pthread_exit(0);
2507
 
  return(0);                               // Can't return anything here
2508
 
}
2509
 
 
2510
 
 
2511
 
/*
2512
 
  process_io_create_file()
2513
 
*/
2514
 
 
2515
 
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
2516
 
{
2517
 
  int32_t error = 1;
2518
 
  uint32_t num_bytes;
2519
 
  bool cev_not_written;
2520
 
  THD *thd = mi->io_thd;
2521
 
  NET *net = &mi->drizzle->net;
2522
 
 
2523
 
  if (unlikely(!cev->is_valid()))
2524
 
    return(1);
2525
 
 
2526
 
  if (!rpl_filter->db_ok(cev->db))
2527
 
  {
2528
 
    skip_load_data_infile(net);
2529
 
    return(0);
2530
 
  }
2531
 
  assert(cev->inited_from_old);
2532
 
  thd->file_id = cev->file_id = mi->file_id++;
2533
 
  thd->server_id = cev->server_id;
2534
 
  cev_not_written = 1;
2535
 
 
2536
 
  if (unlikely(net_request_file(net,cev->fname)))
2537
 
  {
2538
 
    sql_print_error(_("Slave I/O: failed requesting download of '%s'"),
2539
 
                    cev->fname);
2540
 
    goto err;
2541
 
  }
2542
 
 
2543
 
  /*
2544
 
    This dummy block is so we could instantiate Append_block_log_event
2545
 
    once and then modify it slightly instead of doing it multiple times
2546
 
    in the loop
2547
 
  */
2548
 
  {
2549
 
    Append_block_log_event aev(thd,0,0,0,0);
2550
 
 
2551
 
    for (;;)
2552
 
    {
2553
 
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
2554
 
      {
2555
 
        sql_print_error(_("Network read error downloading '%s' from master"),
2556
 
                        cev->fname);
2557
 
        goto err;
2558
 
      }
2559
 
      if (unlikely(!num_bytes)) /* eof */
2560
 
      {
2561
 
        /* 3.23 master wants it */
2562
 
        net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0);
2563
 
        /*
2564
 
          If we wrote Create_file_log_event, then we need to write
2565
 
          Execute_load_log_event. If we did not write Create_file_log_event,
2566
 
          then this is an empty file and we can just do as if the LOAD DATA
2567
 
          INFILE had not existed, i.e. write nothing.
2568
 
        */
2569
 
        if (unlikely(cev_not_written))
2570
 
          break;
2571
 
        Execute_load_log_event xev(thd,0,0);
2572
 
        xev.log_pos = cev->log_pos;
2573
 
        if (unlikely(mi->rli.relay_log.append(&xev)))
2574
 
        {
2575
 
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2576
 
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2577
 
                     _("error writing Exec_load event to relay log"));
2578
 
          goto err;
2579
 
        }
2580
 
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2581
 
        break;
2582
 
      }
2583
 
      if (unlikely(cev_not_written))
2584
 
      {
2585
 
        cev->block = net->read_pos;
2586
 
        cev->block_len = num_bytes;
2587
 
        if (unlikely(mi->rli.relay_log.append(cev)))
2588
 
        {
2589
 
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2590
 
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2591
 
                     _("error writing Create_file event to relay log"));
2592
 
          goto err;
2593
 
        }
2594
 
        cev_not_written=0;
2595
 
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2596
 
      }
2597
 
      else
2598
 
      {
2599
 
        aev.block = net->read_pos;
2600
 
        aev.block_len = num_bytes;
2601
 
        aev.log_pos = cev->log_pos;
2602
 
        if (unlikely(mi->rli.relay_log.append(&aev)))
2603
 
        {
2604
 
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2605
 
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2606
 
                     _("error writing Append_block event to relay log"));
2607
 
          goto err;
2608
 
        }
2609
 
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
2610
 
      }
2611
 
    }
2612
 
  }
2613
 
  error=0;
2614
 
err:
2615
 
  return(error);
2616
 
}
2617
 
 
2618
 
 
2619
 
/*
2620
 
  Start using a new binary log on the master
2621
 
 
2622
 
  SYNOPSIS
2623
 
    process_io_rotate()
2624
 
    mi                  master_info for the slave
2625
 
    rev                 The rotate log event read from the binary log
2626
 
 
2627
 
  DESCRIPTION
2628
 
    Updates the master info with the place in the next binary
2629
 
    log where we should start reading.
2630
 
    Rotate the relay log to avoid mixed-format relay logs.
2631
 
 
2632
 
  NOTES
2633
 
    We assume we already locked mi->data_lock
2634
 
 
2635
 
  RETURN VALUES
2636
 
    0           ok
2637
 
    1           Log event is illegal
2638
 
 
2639
 
*/
2640
 
 
2641
 
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
2642
 
{
2643
 
  safe_mutex_assert_owner(&mi->data_lock);
2644
 
 
2645
 
  if (unlikely(!rev->is_valid()))
2646
 
    return(1);
2647
 
 
2648
 
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2649
 
  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
2650
 
  mi->master_log_pos= rev->pos;
2651
 
  /*
2652
 
    If we do not do this, we will be getting the first
2653
 
    rotate event forever, so we need to not disconnect after one.
2654
 
  */
2655
 
  if (disconnect_slave_event_count)
2656
 
    mi->events_till_disconnect++;
2657
 
 
2658
 
  /*
2659
 
    If description_event_for_queue is format <4, there is conversion in the
2660
 
    relay log to the slave's format (4). And Rotate can mean upgrade or
2661
 
    nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
2662
 
    no need to reset description_event_for_queue now. And if it's nothing (same
2663
 
    master version as before), no need (still using the slave's format).
2664
 
  */
2665
 
  if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
2666
 
  {
2667
 
    delete mi->rli.relay_log.description_event_for_queue;
2668
 
    /* start from format 3 (DRIZZLE 4.0) again */
2669
 
    mi->rli.relay_log.description_event_for_queue= new
2670
 
      Format_description_log_event(3);
2671
 
  }
2672
 
  /*
2673
 
    Rotate the relay log makes binlog format detection easier (at next slave
2674
 
    start or mysqlbinlog)
2675
 
  */
2676
 
  rotate_relay_log(mi); /* will take the right mutexes */
2677
 
  return(0);
2678
 
}
2679
 
 
2680
 
/*
2681
 
  Reads a 3.23 event and converts it to the slave's format. This code was
2682
 
  copied from DRIZZLE 4.0.
2683
 
*/
2684
 
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
2685
 
                           uint32_t event_len)
2686
 
{
2687
 
  const char *errmsg = 0;
2688
 
  uint32_t inc_pos;
2689
 
  bool ignore_event= 0;
2690
 
  char *tmp_buf = 0;
2691
 
  Relay_log_info *rli= &mi->rli;
2692
 
 
2693
 
  /*
2694
 
    If we get Load event, we need to pass a non-reusable buffer
2695
 
    to read_log_event, so we do a trick
2696
 
  */
2697
 
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
2698
 
  {
2699
 
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
2700
 
    {
2701
 
      mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2702
 
                 ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
2703
 
      return(1);
2704
 
    }
2705
 
    memcpy(tmp_buf,buf,event_len);
2706
 
    /*
2707
 
      Create_file constructor wants a 0 as last char of buffer, this 0 will
2708
 
      serve as the string-termination char for the file's name (which is at the
2709
 
      end of the buffer)
2710
 
      We must increment event_len, otherwise the event constructor will not see
2711
 
      this end 0, which leads to segfault.
2712
 
    */
2713
 
    tmp_buf[event_len++]=0;
2714
 
    int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
2715
 
    buf = (const char*)tmp_buf;
2716
 
  }
2717
 
  /*
2718
 
    This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
2719
 
    send the loaded file, and write it to the relay log in the form of
2720
 
    Append_block/Exec_load (the SQL thread needs the data, as that thread is not
2721
 
    connected to the master).
2722
 
  */
2723
 
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2724
 
                                            mi->rli.relay_log.description_event_for_queue);
2725
 
  if (unlikely(!ev))
2726
 
  {
2727
 
    sql_print_error(_("Read invalid event from master: '%s', "
2728
 
                      "master could be corrupt but a more likely cause "
2729
 
                      "of this is a bug"),
2730
 
                    errmsg);
2731
 
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
2732
 
    return(1);
2733
 
  }
2734
 
 
2735
 
  pthread_mutex_lock(&mi->data_lock);
2736
 
  ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
2737
 
  switch (ev->get_type_code()) {
2738
 
  case STOP_EVENT:
2739
 
    ignore_event= 1;
2740
 
    inc_pos= event_len;
2741
 
    break;
2742
 
  case ROTATE_EVENT:
2743
 
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2744
 
    {
2745
 
      delete ev;
2746
 
      pthread_mutex_unlock(&mi->data_lock);
2747
 
      return(1);
2748
 
    }
2749
 
    inc_pos= 0;
2750
 
    break;
2751
 
  case CREATE_FILE_EVENT:
2752
 
    /*
2753
 
      Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
2754
 
      queue_old_event() which is for 3.23 events which don't comprise
2755
 
      CREATE_FILE_EVENT. This is because read_log_event() above has just
2756
 
      transformed LOAD_EVENT into CREATE_FILE_EVENT.
2757
 
    */
2758
 
  {
2759
 
    /* We come here when and only when tmp_buf != 0 */
2760
 
    assert(tmp_buf != 0);
2761
 
    inc_pos=event_len;
2762
 
    ev->log_pos+= inc_pos;
2763
 
    int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2764
 
    delete ev;
2765
 
    mi->master_log_pos += inc_pos;
2766
 
    pthread_mutex_unlock(&mi->data_lock);
2767
 
    my_free((char*)tmp_buf, MYF(0));
2768
 
    return(error);
2769
 
  }
2770
 
  default:
2771
 
    inc_pos= event_len;
2772
 
    break;
2773
 
  }
2774
 
  if (likely(!ignore_event))
2775
 
  {
2776
 
    if (ev->log_pos)
2777
 
      /*
2778
 
         Don't do it for fake Rotate events (see comment in
2779
 
      Log_event::Log_event(const char* buf...) in log_event.cc).
2780
 
      */
2781
 
      ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
2782
 
    if (unlikely(rli->relay_log.append(ev)))
2783
 
    {
2784
 
      delete ev;
2785
 
      pthread_mutex_unlock(&mi->data_lock);
2786
 
      return(1);
2787
 
    }
2788
 
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2789
 
  }
2790
 
  delete ev;
2791
 
  mi->master_log_pos+= inc_pos;
2792
 
  pthread_mutex_unlock(&mi->data_lock);
2793
 
  return(0);
2794
 
}
2795
 
 
2796
 
/*
2797
 
  Reads a 4.0 event and converts it to the slave's format. This code was copied
2798
 
  from queue_binlog_ver_1_event(), with some affordable simplifications.
2799
 
*/
2800
 
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
2801
 
                           uint32_t event_len)
2802
 
{
2803
 
  const char *errmsg = 0;
2804
 
  uint32_t inc_pos;
2805
 
  char *tmp_buf = 0;
2806
 
  Relay_log_info *rli= &mi->rli;
2807
 
 
2808
 
  /* read_log_event() will adjust log_pos to be end_log_pos */
2809
 
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2810
 
                                            mi->rli.relay_log.description_event_for_queue);
2811
 
  if (unlikely(!ev))
2812
 
  {
2813
 
    sql_print_error(_("Read invalid event from master: '%s', "
2814
 
                      "master could be corrupt but a more likely cause of "
2815
 
                      "this is a bug"),
2816
 
                    errmsg);
2817
 
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
2818
 
    return(1);
2819
 
  }
2820
 
  pthread_mutex_lock(&mi->data_lock);
2821
 
  switch (ev->get_type_code()) {
2822
 
  case STOP_EVENT:
2823
 
    goto err;
2824
 
  case ROTATE_EVENT:
2825
 
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2826
 
    {
2827
 
      delete ev;
2828
 
      pthread_mutex_unlock(&mi->data_lock);
2829
 
      return(1);
2830
 
    }
2831
 
    inc_pos= 0;
2832
 
    break;
2833
 
  default:
2834
 
    inc_pos= event_len;
2835
 
    break;
2836
 
  }
2837
 
  if (unlikely(rli->relay_log.append(ev)))
2838
 
  {
2839
 
    delete ev;
2840
 
    pthread_mutex_unlock(&mi->data_lock);
2841
 
    return(1);
2842
 
  }
2843
 
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2844
 
  delete ev;
2845
 
  mi->master_log_pos+= inc_pos;
2846
 
err:
2847
 
  pthread_mutex_unlock(&mi->data_lock);
2848
 
  return(0);
2849
 
}
2850
 
 
2851
 
/*
2852
 
  queue_old_event()
2853
 
 
2854
 
  Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
2855
 
  (exactly, slave's) format. To do the conversion, we create a 5.0 event from
2856
 
  the 3.23/4.0 bytes, then write this event to the relay log.
2857
 
 
2858
 
  TODO:
2859
 
    Test this code before release - it has to be tested on a separate
2860
 
    setup with 3.23 master or 4.0 master
2861
 
*/
2862
 
 
2863
 
static int32_t queue_old_event(Master_info *mi, const char *buf,
2864
 
                           uint32_t event_len)
2865
 
{
2866
 
  switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
2867
 
  {
2868
 
  case 1:
2869
 
      return(queue_binlog_ver_1_event(mi,buf,event_len));
2870
 
  case 3:
2871
 
      return(queue_binlog_ver_3_event(mi,buf,event_len));
2872
 
  default: /* unsupported format; eg version 2 */
2873
 
    return(1);
2874
 
  }
2875
 
}
2876
 
 
2877
 
/*
2878
 
  queue_event()
2879
 
 
2880
 
  If the event is 3.23/4.0, passes it to queue_old_event() which will convert
2881
 
  it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
2882
 
  no format conversion, it's pure read/write of bytes.
2883
 
  So a 5.0.0 slave's relay log can contain events in the slave's format or in
2884
 
  any >=5.0.0 format.
2885
 
*/
2886
 
 
2887
 
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
2888
 
{
2889
 
  int32_t error= 0;
2890
 
  String error_msg;
2891
 
  uint32_t inc_pos= 0;
2892
 
  Relay_log_info *rli= &mi->rli;
2893
 
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
2894
 
 
2895
 
 
2896
 
  if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
2897
 
      buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
2898
 
    return(queue_old_event(mi,buf,event_len));
2899
 
 
2900
 
  pthread_mutex_lock(&mi->data_lock);
2901
 
 
2902
 
  switch (buf[EVENT_TYPE_OFFSET]) {
2903
 
  case STOP_EVENT:
2904
 
    /*
2905
 
      We needn't write this event to the relay log. Indeed, it just indicates a
2906
 
      master server shutdown. The only thing this does is cleaning. But
2907
 
      cleaning is already done on a per-master-thread basis (as the master
2908
 
      server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
2909
 
      prepared statements' deletion are TODO only when we binlog prep stmts).
2910
 
 
2911
 
      We don't even increment mi->master_log_pos, because we may be just after
2912
 
      a Rotate event. Btw, in a few milliseconds we are going to have a Start
2913
 
      event from the next binlog (unless the master is presently running
2914
 
      without --log-bin).
2915
 
    */
2916
 
    goto err;
2917
 
  case ROTATE_EVENT:
2918
 
  {
2919
 
    Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
2920
 
    if (unlikely(process_io_rotate(mi,&rev)))
2921
 
    {
2922
 
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2923
 
      goto err;
2924
 
    }
2925
 
    /*
2926
 
      Now the I/O thread has just changed its mi->master_log_name, so
2927
 
      incrementing mi->master_log_pos is nonsense.
2928
 
    */
2929
 
    inc_pos= 0;
2930
 
    break;
2931
 
  }
2932
 
  case FORMAT_DESCRIPTION_EVENT:
2933
 
  {
2934
 
    /*
2935
 
      Create an event, and save it (when we rotate the relay log, we will have
2936
 
      to write this event again).
2937
 
    */
2938
 
    /*
2939
 
      We are the only thread which reads/writes description_event_for_queue.
2940
 
      The relay_log struct does not move (though some members of it can
2941
 
      change), so we needn't any lock (no rli->data_lock, no log lock).
2942
 
    */
2943
 
    Format_description_log_event* tmp;
2944
 
    const char* errmsg;
2945
 
    if (!(tmp= (Format_description_log_event*)
2946
 
          Log_event::read_log_event(buf, event_len, &errmsg,
2947
 
                                    mi->rli.relay_log.description_event_for_queue)))
2948
 
    {
2949
 
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2950
 
      goto err;
2951
 
    }
2952
 
    delete mi->rli.relay_log.description_event_for_queue;
2953
 
    mi->rli.relay_log.description_event_for_queue= tmp;
2954
 
    /*
2955
 
       Though this does some conversion to the slave's format, this will
2956
 
       preserve the master's binlog format version, and number of event types.
2957
 
    */
2958
 
    /*
2959
 
       If the event was not requested by the slave (the slave did not ask for
2960
 
       it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
2961
 
    */
2962
 
    inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
2963
 
  }
2964
 
  break;
2965
 
 
2966
 
  case HEARTBEAT_LOG_EVENT:
2967
 
  {
2968
 
    /*
2969
 
      HB (heartbeat) cannot come before RL (Relay)
2970
 
    */
2971
 
    char  llbuf[22];
2972
 
    Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
2973
 
    if (!hb.is_valid())
2974
 
    {
2975
 
      error= ER_SLAVE_HEARTBEAT_FAILURE;
2976
 
      error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2977
 
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2978
 
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2979
 
      error_msg.append(STRING_WITH_LEN(" log_pos "));
2980
 
      llstr(hb.log_pos, llbuf);
2981
 
      error_msg.append(llbuf, strlen(llbuf));
2982
 
      goto err;
2983
 
    }
2984
 
    mi->received_heartbeats++;
2985
 
    /* 
2986
 
       compare local and event's versions of log_file, log_pos.
2987
 
       
2988
 
       Heartbeat is sent only after an event corresponding to the corrdinates
2989
 
       the heartbeat carries.
2990
 
       Slave can not have a difference in coordinates except in the only
2991
 
       special case when mi->master_log_name, master_log_pos have never
2992
 
       been updated by Rotate event i.e when slave does not have any history
2993
 
       with the master (and thereafter mi->master_log_pos is NULL).
2994
 
 
2995
 
       TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2996
 
    */
2997
 
    if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
2998
 
         && mi->master_log_name != NULL)
2999
 
        || mi->master_log_pos != hb.log_pos)
3000
 
    {
3001
 
      /* missed events of heartbeat from the past */
3002
 
      error= ER_SLAVE_HEARTBEAT_FAILURE;
3003
 
      error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
3004
 
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
3005
 
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
3006
 
      error_msg.append(STRING_WITH_LEN(" log_pos "));
3007
 
      llstr(hb.log_pos, llbuf);
3008
 
      error_msg.append(llbuf, strlen(llbuf));
3009
 
      goto err;
3010
 
    }
3011
 
    goto skip_relay_logging;
3012
 
  }
3013
 
  break;
3014
 
    
3015
 
  default:
3016
 
    inc_pos= event_len;
3017
 
    break;
3018
 
  }
3019
 
 
3020
 
  /*
3021
 
     If this event is originating from this server, don't queue it.
3022
 
     We don't check this for 3.23 events because it's simpler like this; 3.23
3023
 
     will be filtered anyway by the SQL slave thread which also tests the
3024
 
     server id (we must also keep this test in the SQL thread, in case somebody
3025
 
     upgrades a 4.0 slave which has a not-filtered relay log).
3026
 
 
3027
 
     ANY event coming from ourselves can be ignored: it is obvious for queries;
3028
 
     for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
3029
 
     (--log-slave-updates would not log that) unless this slave is also its
3030
 
     direct master (an unsupported, useless setup!).
3031
 
  */
3032
 
 
3033
 
  pthread_mutex_lock(log_lock);
3034
 
 
3035
 
  if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
3036
 
      !mi->rli.replicate_same_server_id)
3037
 
  {
3038
 
    /*
3039
 
      Do not write it to the relay log.
3040
 
      a) We still want to increment mi->master_log_pos, so that we won't
3041
 
      re-read this event from the master if the slave IO thread is now
3042
 
      stopped/restarted (more efficient if the events we are ignoring are big
3043
 
      LOAD DATA INFILE).
3044
 
      b) We want to record that we are skipping events, for the information of
3045
 
      the slave SQL thread, otherwise that thread may let
3046
 
      rli->group_relay_log_pos stay too small if the last binlog's event is
3047
 
      ignored.
3048
 
      But events which were generated by this slave and which do not exist in
3049
 
      the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
3050
 
      mi->master_log_pos.
3051
 
    */
3052
 
    if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
3053
 
        buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3054
 
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3055
 
    {
3056
 
      mi->master_log_pos+= inc_pos;
3057
 
      memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
3058
 
      assert(rli->ign_master_log_name_end[0]);
3059
 
      rli->ign_master_log_pos_end= mi->master_log_pos;
3060
 
    }
3061
 
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3062
 
  }
3063
 
  else
3064
 
  {
3065
 
    /* write the event to the relay log */
3066
 
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3067
 
    {
3068
 
      mi->master_log_pos+= inc_pos;
3069
 
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3070
 
    }
3071
 
    else
3072
 
    {
3073
 
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
3074
 
    }
3075
 
    rli->ign_master_log_name_end[0]= 0; // last event is not ignored
3076
 
  }
3077
 
  pthread_mutex_unlock(log_lock);
3078
 
 
3079
 
skip_relay_logging:
3080
 
  
3081
 
err:
3082
 
  pthread_mutex_unlock(&mi->data_lock);
3083
 
  if (error)
3084
 
    mi->report(ERROR_LEVEL, error, ER(error),
3085
 
               (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
3086
 
               _("could not queue event from master") :
3087
 
               error_msg.ptr());
3088
 
  return(error);
3089
 
}
3090
 
 
3091
 
 
3092
 
void end_relay_log_info(Relay_log_info* rli)
3093
 
{
3094
 
  if (!rli->inited)
3095
 
    return;
3096
 
  if (rli->info_fd >= 0)
3097
 
  {
3098
 
    end_io_cache(&rli->info_file);
3099
 
    (void) my_close(rli->info_fd, MYF(MY_WME));
3100
 
    rli->info_fd = -1;
3101
 
  }
3102
 
  if (rli->cur_log_fd >= 0)
3103
 
  {
3104
 
    end_io_cache(&rli->cache_buf);
3105
 
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
3106
 
    rli->cur_log_fd = -1;
3107
 
  }
3108
 
  rli->inited = 0;
3109
 
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
3110
 
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3111
 
  /*
3112
 
    Delete the slave's temporary tables from memory.
3113
 
    In the future there will be other actions than this, to ensure persistance
3114
 
    of slave's temp tables after shutdown.
3115
 
  */
3116
 
  rli->close_temporary_tables();
3117
 
  return;
3118
 
}
3119
 
 
3120
 
/*
3121
 
  Try to connect until successful or slave killed
3122
 
 
3123
 
  SYNPOSIS
3124
 
    safe_connect()
3125
 
    thd                 Thread handler for slave
3126
 
    DRIZZLE               DRIZZLE connection handle
3127
 
    mi                  Replication handle
3128
 
 
3129
 
  RETURN
3130
 
    0   ok
3131
 
    #   Error
3132
 
*/
3133
 
 
3134
 
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi)
3135
 
{
3136
 
  return(connect_to_master(thd, drizzle, mi, 0, 0));
3137
 
}
3138
 
 
3139
 
 
3140
 
/*
3141
 
  SYNPOSIS
3142
 
    connect_to_master()
3143
 
 
3144
 
  IMPLEMENTATION
3145
 
    Try to connect until successful or slave killed or we have retried
3146
 
    master_retry_count times
3147
 
*/
3148
 
 
3149
 
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3150
 
                             bool reconnect, bool suppress_warnings)
3151
 
{
3152
 
  int32_t slave_was_killed;
3153
 
  int32_t last_errno= -2;                           // impossible error
3154
 
  uint32_t err_count=0;
3155
 
  char llbuff[22];
3156
 
 
3157
 
  mi->events_till_disconnect = disconnect_slave_event_count;
3158
 
  uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
3159
 
  if (opt_slave_compressed_protocol)
3160
 
    client_flag=CLIENT_COMPRESS;                /* We will use compression */
3161
 
 
3162
 
  drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3163
 
  drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3164
 
 
3165
 
  drizzle_options(drizzle, DRIZZLE_SET_CHARSET_NAME, default_charset_info->csname);
3166
 
  /* This one is not strictly needed but we have it here for completeness */
3167
 
  drizzle_options(drizzle, DRIZZLE_SET_CHARSET_DIR, (char *) charsets_dir);
3168
 
 
3169
 
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3170
 
         (reconnect ? drizzle_reconnect(drizzle) != 0 :
3171
 
          drizzle_connect(drizzle, mi->host, mi->user, mi->password, 0,
3172
 
                             mi->port, 0, client_flag) == 0))
3173
 
  {
3174
 
    /* Don't repeat last error */
3175
 
    if ((int32_t)drizzle_errno(drizzle) != last_errno)
3176
 
    {
3177
 
      last_errno=drizzle_errno(drizzle);
3178
 
      suppress_warnings= 0;
3179
 
      mi->report(ERROR_LEVEL, last_errno,
3180
 
                 _("error %s to master '%s@%s:%d'"
3181
 
                   " - retry-time: %d  retries: %u"),
3182
 
                 (reconnect ? _("reconnecting") : _("connecting")),
3183
 
                 mi->user, mi->host, mi->port,
3184
 
                 mi->connect_retry, master_retry_count);
3185
 
    }
3186
 
    /*
3187
 
      By default we try forever. The reason is that failure will trigger
3188
 
      master election, so if the user did not set master_retry_count we
3189
 
      do not want to have election triggered on the first failure to
3190
 
      connect
3191
 
    */
3192
 
    if (++err_count == master_retry_count)
3193
 
    {
3194
 
      slave_was_killed=1;
3195
 
      if (reconnect)
3196
 
        change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
3197
 
      break;
3198
 
    }
3199
 
    safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3200
 
               (void*)mi);
3201
 
  }
3202
 
 
3203
 
  if (!slave_was_killed)
3204
 
  {
3205
 
    if (reconnect)
3206
 
    {
3207
 
      if (!suppress_warnings && global_system_variables.log_warnings)
3208
 
        sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3209
 
                                "replication resumed in log '%s' at "
3210
 
                                "position %s"), mi->user,
3211
 
                                mi->host, mi->port,
3212
 
                                IO_RPL_LOG_NAME,
3213
 
                                llstr(mi->master_log_pos,llbuff));
3214
 
    }
3215
 
    else
3216
 
    {
3217
 
      change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
3218
 
      general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d",
3219
 
                        mi->user, mi->host, mi->port);
3220
 
    }
3221
 
#ifdef SIGNAL_WITH_VIO_CLOSE
3222
 
    thd->set_active_vio(drizzle->net.vio);
3223
 
#endif
3224
 
  }
3225
 
  drizzle->reconnect= 1;
3226
 
  return(slave_was_killed);
3227
 
}
3228
 
 
3229
 
 
3230
 
/*
3231
 
  safe_reconnect()
3232
 
 
3233
 
  IMPLEMENTATION
3234
 
    Try to connect until successful or slave killed or we have retried
3235
 
    master_retry_count times
3236
 
*/
3237
 
 
3238
 
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3239
 
                          bool suppress_warnings)
3240
 
{
3241
 
  return(connect_to_master(thd, drizzle, mi, 1, suppress_warnings));
3242
 
}
3243
 
 
3244
 
 
3245
 
/*
3246
 
  Store the file and position where the execute-slave thread are in the
3247
 
  relay log.
3248
 
 
3249
 
  SYNOPSIS
3250
 
    flush_relay_log_info()
3251
 
    rli                 Relay log information
3252
 
 
3253
 
  NOTES
3254
 
    - As this is only called by the slave thread, we don't need to
3255
 
      have a lock on this.
3256
 
    - If there is an active transaction, then we don't update the position
3257
 
      in the relay log.  This is to ensure that we re-execute statements
3258
 
      if we die in the middle of an transaction that was rolled back.
3259
 
    - As a transaction never spans binary logs, we don't have to handle the
3260
 
      case where we do a relay-log-rotation in the middle of the transaction.
3261
 
      If this would not be the case, we would have to ensure that we
3262
 
      don't delete the relay log file where the transaction started when
3263
 
      we switch to a new relay log file.
3264
 
 
3265
 
  TODO
3266
 
    - Change the log file information to a binary format to avoid calling
3267
 
      int64_t2str.
3268
 
 
3269
 
  RETURN VALUES
3270
 
    0   ok
3271
 
    1   write error
3272
 
*/
3273
 
 
3274
 
bool flush_relay_log_info(Relay_log_info* rli)
3275
 
{
3276
 
  bool error=0;
3277
 
 
3278
 
  if (unlikely(rli->no_storage))
3279
 
    return(0);
3280
 
 
3281
 
  IO_CACHE *file = &rli->info_file;
3282
 
  char buff[FN_REFLEN*2+22*2+4], *pos;
3283
 
 
3284
 
  my_b_seek(file, 0L);
3285
 
  pos=stpcpy(buff, rli->group_relay_log_name);
3286
 
  *pos++='\n';
3287
 
  pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
3288
 
  *pos++='\n';
3289
 
  pos=stpcpy(pos, rli->group_master_log_name);
3290
 
  *pos++='\n';
3291
 
  pos=int64_t2str(rli->group_master_log_pos, pos, 10);
3292
 
  *pos='\n';
3293
 
  if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
3294
 
    error=1;
3295
 
  if (flush_io_cache(file))
3296
 
    error=1;
3297
 
 
3298
 
  /* Flushing the relay log is done by the slave I/O thread */
3299
 
  return(error);
3300
 
}
3301
 
 
3302
 
 
3303
 
/*
3304
 
  Called when we notice that the current "hot" log got rotated under our feet.
3305
 
*/
3306
 
 
3307
 
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
3308
 
{
3309
 
  assert(rli->cur_log != &rli->cache_buf);
3310
 
  assert(rli->cur_log_fd == -1);
3311
 
 
3312
 
  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3313
 
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
3314
 
                                   errmsg)) <0)
3315
 
    return(0);
3316
 
  /*
3317
 
    We want to start exactly where we was before:
3318
 
    relay_log_pos       Current log pos
3319
 
    pending             Number of bytes already processed from the event
3320
 
  */
3321
 
  rli->event_relay_log_pos= max(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3322
 
  my_b_seek(cur_log,rli->event_relay_log_pos);
3323
 
  return(cur_log);
3324
 
}
3325
 
 
3326
 
 
3327
 
static Log_event* next_event(Relay_log_info* rli)
3328
 
{
3329
 
  Log_event* ev;
3330
 
  IO_CACHE* cur_log = rli->cur_log;
3331
 
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3332
 
  const char* errmsg=0;
3333
 
  THD* thd = rli->sql_thd;
3334
 
 
3335
 
  assert(thd != 0);
3336
 
 
3337
 
  if (abort_slave_event_count && !rli->events_till_abort--)
3338
 
    return(0);
3339
 
 
3340
 
  /*
3341
 
    For most operations we need to protect rli members with data_lock,
3342
 
    so we assume calling function acquired this mutex for us and we will
3343
 
    hold it for the most of the loop below However, we will release it
3344
 
    whenever it is worth the hassle,  and in the cases when we go into a
3345
 
    pthread_cond_wait() with the non-data_lock mutex
3346
 
  */
3347
 
  safe_mutex_assert_owner(&rli->data_lock);
3348
 
 
3349
 
  while (!sql_slave_killed(thd,rli))
3350
 
  {
3351
 
    /*
3352
 
      We can have two kinds of log reading:
3353
 
      hot_log:
3354
 
        rli->cur_log points at the IO_CACHE of relay_log, which
3355
 
        is actively being updated by the I/O thread. We need to be careful
3356
 
        in this case and make sure that we are not looking at a stale log that
3357
 
        has already been rotated. If it has been, we reopen the log.
3358
 
 
3359
 
      The other case is much simpler:
3360
 
        We just have a read only log that nobody else will be updating.
3361
 
    */
3362
 
    bool hot_log;
3363
 
    if ((hot_log = (cur_log != &rli->cache_buf)))
3364
 
    {
3365
 
      assert(rli->cur_log_fd == -1); // foreign descriptor
3366
 
      pthread_mutex_lock(log_lock);
3367
 
 
3368
 
      /*
3369
 
        Reading xxx_file_id is safe because the log will only
3370
 
        be rotated when we hold relay_log.LOCK_log
3371
 
      */
3372
 
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
3373
 
      {
3374
 
        // The master has switched to a new log file; Reopen the old log file
3375
 
        cur_log=reopen_relay_log(rli, &errmsg);
3376
 
        pthread_mutex_unlock(log_lock);
3377
 
        if (!cur_log)                           // No more log files
3378
 
          goto err;
3379
 
        hot_log=0;                              // Using old binary log
3380
 
      }
3381
 
    }
3382
 
    /* 
3383
 
      As there is no guarantee that the relay is open (for example, an I/O
3384
 
      error during a write by the slave I/O thread may have closed it), we
3385
 
      have to test it.
3386
 
    */
3387
 
    if (!my_b_inited(cur_log))
3388
 
      goto err;
3389
 
    assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3390
 
    assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
3391
 
 
3392
 
    /*
3393
 
      Relay log is always in new format - if the master is 3.23, the
3394
 
      I/O thread will convert the format for us.
3395
 
      A problem: the description event may be in a previous relay log. So if
3396
 
      the slave has been shutdown meanwhile, we would have to look in old relay
3397
 
      logs, which may even have been deleted. So we need to write this
3398
 
      description event at the beginning of the relay log.
3399
 
      When the relay log is created when the I/O thread starts, easy: the
3400
 
      master will send the description event and we will queue it.
3401
 
      But if the relay log is created by new_file(): then the solution is:
3402
 
      DRIZZLE_BIN_LOG::open() will write the buffered description event.
3403
 
    */
3404
 
    if ((ev=Log_event::read_log_event(cur_log,0,
3405
 
                                      rli->relay_log.description_event_for_exec)))
3406
 
 
3407
 
    {
3408
 
      assert(thd==rli->sql_thd);
3409
 
      /*
3410
 
        read it while we have a lock, to avoid a mutex lock in
3411
 
        inc_event_relay_log_pos()
3412
 
      */
3413
 
      rli->future_event_relay_log_pos= my_b_tell(cur_log);
3414
 
      if (hot_log)
3415
 
        pthread_mutex_unlock(log_lock);
3416
 
      return(ev);
3417
 
    }
3418
 
    assert(thd==rli->sql_thd);
3419
 
    if (opt_reckless_slave)                     // For mysql-test
3420
 
      cur_log->error = 0;
3421
 
    if (cur_log->error < 0)
3422
 
    {
3423
 
      errmsg = "slave SQL thread aborted because of I/O error";
3424
 
      if (hot_log)
3425
 
        pthread_mutex_unlock(log_lock);
3426
 
      goto err;
3427
 
    }
3428
 
    if (!cur_log->error) /* EOF */
3429
 
    {
3430
 
      /*
3431
 
        On a hot log, EOF means that there are no more updates to
3432
 
        process and we must block until I/O thread adds some and
3433
 
        signals us to continue
3434
 
      */
3435
 
      if (hot_log)
3436
 
      {
3437
 
        /*
3438
 
          We say in Seconds_Behind_Master that we have "caught up". Note that
3439
 
          for example if network link is broken but I/O slave thread hasn't
3440
 
          noticed it (slave_net_timeout not elapsed), then we'll say "caught
3441
 
          up" whereas we're not really caught up. Fixing that would require
3442
 
          internally cutting timeout in smaller pieces in network read, no
3443
 
          thanks. Another example: SQL has caught up on I/O, now I/O has read
3444
 
          a new event and is queuing it; the false "0" will exist until SQL
3445
 
          finishes executing the new event; it will be look abnormal only if
3446
 
          the events have old timestamps (then you get "many", 0, "many").
3447
 
 
3448
 
          Transient phases like this can be fixed with implemeting
3449
 
          Heartbeat event which provides the slave the status of the
3450
 
          master at time the master does not have any new update to send.
3451
 
          Seconds_Behind_Master would be zero only when master has no
3452
 
          more updates in binlog for slave. The heartbeat can be sent
3453
 
          in a (small) fraction of slave_net_timeout. Until it's done
3454
 
          rli->last_master_timestamp is temporarely (for time of
3455
 
          waiting for the following event) reset whenever EOF is
3456
 
          reached.
3457
 
        */
3458
 
        time_t save_timestamp= rli->last_master_timestamp;
3459
 
        rli->last_master_timestamp= 0;
3460
 
 
3461
 
        assert(rli->relay_log.get_open_count() ==
3462
 
                    rli->cur_log_old_open_count);
3463
 
 
3464
 
        if (rli->ign_master_log_name_end[0])
3465
 
        {
3466
 
          /* We generate and return a Rotate, to make our positions advance */
3467
 
          ev= new Rotate_log_event(rli->ign_master_log_name_end,
3468
 
                                   0, rli->ign_master_log_pos_end,
3469
 
                                   Rotate_log_event::DUP_NAME);
3470
 
          rli->ign_master_log_name_end[0]= 0;
3471
 
          pthread_mutex_unlock(log_lock);
3472
 
          if (unlikely(!ev))
3473
 
          {
3474
 
            errmsg= "Slave SQL thread failed to create a Rotate event "
3475
 
              "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
3476
 
            goto err;
3477
 
          }
3478
 
          ev->server_id= 0; // don't be ignored by slave SQL thread
3479
 
          return(ev);
3480
 
        }
3481
 
 
3482
 
        /*
3483
 
          We can, and should release data_lock while we are waiting for
3484
 
          update. If we do not, show slave status will block
3485
 
        */
3486
 
        pthread_mutex_unlock(&rli->data_lock);
3487
 
 
3488
 
        /*
3489
 
          Possible deadlock :
3490
 
          - the I/O thread has reached log_space_limit
3491
 
          - the SQL thread has read all relay logs, but cannot purge for some
3492
 
          reason:
3493
 
            * it has already purged all logs except the current one
3494
 
            * there are other logs than the current one but they're involved in
3495
 
            a transaction that finishes in the current one (or is not finished)
3496
 
          Solution :
3497
 
          Wake up the possibly waiting I/O thread, and set a boolean asking
3498
 
          the I/O thread to temporarily ignore the log_space_limit
3499
 
          constraint, because we do not want the I/O thread to block because of
3500
 
          space (it's ok if it blocks for any other reason (e.g. because the
3501
 
          master does not send anything). Then the I/O thread stops waiting
3502
 
          and reads more events.
3503
 
          The SQL thread decides when the I/O thread should take log_space_limit
3504
 
          into account again : ignore_log_space_limit is reset to 0
3505
 
          in purge_first_log (when the SQL thread purges the just-read relay
3506
 
          log), and also when the SQL thread starts. We should also reset
3507
 
          ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
3508
 
          fact, no need as RESET SLAVE requires that the slave
3509
 
          be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
3510
 
          it stops.
3511
 
        */
3512
 
        pthread_mutex_lock(&rli->log_space_lock);
3513
 
        // prevent the I/O thread from blocking next times
3514
 
        rli->ignore_log_space_limit= 1;
3515
 
        /*
3516
 
          If the I/O thread is blocked, unblock it.  Ok to broadcast
3517
 
          after unlock, because the mutex is only destroyed in
3518
 
          ~Relay_log_info(), i.e. when rli is destroyed, and rli will
3519
 
          not be destroyed before we exit the present function.
3520
 
        */
3521
 
        pthread_mutex_unlock(&rli->log_space_lock);
3522
 
        pthread_cond_broadcast(&rli->log_space_cond);
3523
 
        // Note that wait_for_update_relay_log unlocks lock_log !
3524
 
        rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
3525
 
        // re-acquire data lock since we released it earlier
3526
 
        pthread_mutex_lock(&rli->data_lock);
3527
 
        rli->last_master_timestamp= save_timestamp;
3528
 
        continue;
3529
 
      }
3530
 
      /*
3531
 
        If the log was not hot, we need to move to the next log in
3532
 
        sequence. The next log could be hot or cold, we deal with both
3533
 
        cases separately after doing some common initialization
3534
 
      */
3535
 
      end_io_cache(cur_log);
3536
 
      assert(rli->cur_log_fd >= 0);
3537
 
      my_close(rli->cur_log_fd, MYF(MY_WME));
3538
 
      rli->cur_log_fd = -1;
3539
 
 
3540
 
      if (relay_log_purge)
3541
 
      {
3542
 
        /*
3543
 
          purge_first_log will properly set up relay log coordinates in rli.
3544
 
          If the group's coordinates are equal to the event's coordinates
3545
 
          (i.e. the relay log was not rotated in the middle of a group),
3546
 
          we can purge this relay log too.
3547
 
          We do uint64_t and string comparisons, this may be slow but
3548
 
          - purging the last relay log is nice (it can save 1GB of disk), so we
3549
 
          like to detect the case where we can do it, and given this,
3550
 
          - I see no better detection method
3551
 
          - purge_first_log is not called that often
3552
 
        */
3553
 
        if (rli->relay_log.purge_first_log
3554
 
            (rli,
3555
 
             rli->group_relay_log_pos == rli->event_relay_log_pos
3556
 
             && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
3557
 
        {
3558
 
          errmsg = "Error purging processed logs";
3559
 
          goto err;
3560
 
        }
3561
 
      }
3562
 
      else
3563
 
      {
3564
 
        /*
3565
 
          If hot_log is set, then we already have a lock on
3566
 
          LOCK_log.  If not, we have to get the lock.
3567
 
 
3568
 
          According to Sasha, the only time this code will ever be executed
3569
 
          is if we are recovering from a bug.
3570
 
        */
3571
 
        if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
3572
 
        {
3573
 
          errmsg = "error switching to the next log";
3574
 
          goto err;
3575
 
        }
3576
 
        rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3577
 
        strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
3578
 
                sizeof(rli->event_relay_log_name)-1);
3579
 
        flush_relay_log_info(rli);
3580
 
      }
3581
 
 
3582
 
      /*
3583
 
        Now we want to open this next log. To know if it's a hot log (the one
3584
 
        being written by the I/O thread now) or a cold log, we can use
3585
 
        is_active(); if it is hot, we use the I/O cache; if it's cold we open
3586
 
        the file normally. But if is_active() reports that the log is hot, this
3587
 
        may change between the test and the consequence of the test. So we may
3588
 
        open the I/O cache whereas the log is now cold, which is nonsense.
3589
 
        To guard against this, we need to have LOCK_log.
3590
 
      */
3591
 
 
3592
 
      if (!hot_log) /* if hot_log, we already have this mutex */
3593
 
        pthread_mutex_lock(log_lock);
3594
 
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
3595
 
      {
3596
 
#ifdef EXTRA_DEBUG
3597
 
        if (global_system_variables.log_warnings)
3598
 
          sql_print_information(_("next log '%s' is currently active"),
3599
 
                                rli->linfo.log_file_name);
3600
 
#endif
3601
 
        rli->cur_log= cur_log= rli->relay_log.get_log_file();
3602
 
        rli->cur_log_old_open_count= rli->relay_log.get_open_count();
3603
 
        assert(rli->cur_log_fd == -1);
3604
 
 
3605
 
        /*
3606
 
          Read pointer has to be at the start since we are the only
3607
 
          reader.
3608
 
          We must keep the LOCK_log to read the 4 first bytes, as this is a hot
3609
 
          log (same as when we call read_log_event() above: for a hot log we
3610
 
          take the mutex).
3611
 
        */
3612
 
        if (check_binlog_magic(cur_log,&errmsg))
3613
 
        {
3614
 
          if (!hot_log) pthread_mutex_unlock(log_lock);
3615
 
          goto err;
3616
 
        }
3617
 
        if (!hot_log) pthread_mutex_unlock(log_lock);
3618
 
        continue;
3619
 
      }
3620
 
      if (!hot_log) pthread_mutex_unlock(log_lock);
3621
 
      /*
3622
 
        if we get here, the log was not hot, so we will have to open it
3623
 
        ourselves. We are sure that the log is still not hot now (a log can get
3624
 
        from hot to cold, but not from cold to hot). No need for LOCK_log.
3625
 
      */
3626
 
#ifdef EXTRA_DEBUG
3627
 
      if (global_system_variables.log_warnings)
3628
 
        sql_print_information(_("next log '%s' is not active"),
3629
 
                              rli->linfo.log_file_name);
3630
 
#endif
3631
 
      // open_binlog() will check the magic header
3632
 
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
3633
 
                                       &errmsg)) <0)
3634
 
        goto err;
3635
 
    }
3636
 
    else
3637
 
    {
3638
 
      /*
3639
 
        Read failed with a non-EOF error.
3640
 
        TODO: come up with something better to handle this error
3641
 
      */
3642
 
      if (hot_log)
3643
 
        pthread_mutex_unlock(log_lock);
3644
 
      sql_print_error(_("Slave SQL thread: I/O error reading "
3645
 
                        "event(errno: %d  cur_log->error: %d)"),
3646
 
                      my_errno,cur_log->error);
3647
 
      // set read position to the beginning of the event
3648
 
      my_b_seek(cur_log,rli->event_relay_log_pos);
3649
 
      /* otherwise, we have had a partial read */
3650
 
      errmsg = _("Aborting slave SQL thread because of partial event read");
3651
 
      break;                                    // To end of function
3652
 
    }
3653
 
  }
3654
 
  if (!errmsg && global_system_variables.log_warnings)
3655
 
  {
3656
 
    sql_print_information(_("Error reading relay log event: %s"),
3657
 
                          _("slave SQL thread was killed"));
3658
 
    return(0);
3659
 
  }
3660
 
 
3661
 
err:
3662
 
  if (errmsg)
3663
 
    sql_print_error(_("Error reading relay log event: %s"), errmsg);
3664
 
  return(0);
3665
 
}
3666
 
 
3667
 
/*
3668
 
  Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
3669
 
  because of size is simpler because when we do it we already have all relevant
3670
 
  locks; here we don't, so this function is mainly taking locks).
3671
 
  Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
3672
 
  is void).
3673
 
*/
3674
 
 
3675
 
void rotate_relay_log(Master_info* mi)
3676
 
{
3677
 
  Relay_log_info* rli= &mi->rli;
3678
 
 
3679
 
  /* We don't lock rli->run_lock. This would lead to deadlocks. */
3680
 
  pthread_mutex_lock(&mi->run_lock);
3681
 
 
3682
 
  /*
3683
 
     We need to test inited because otherwise, new_file() will attempt to lock
3684
 
     LOCK_log, which may not be inited (if we're not a slave).
3685
 
  */
3686
 
  if (!rli->inited)
3687
 
  {
3688
 
    goto end;
3689
 
  }
3690
 
 
3691
 
  /* If the relay log is closed, new_file() will do nothing. */
3692
 
  rli->relay_log.new_file();
3693
 
 
3694
 
  /*
3695
 
    We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
3696
 
    be counted, so imagine a succession of FLUSH LOGS  and assume the slave
3697
 
    threads are started:
3698
 
    relay_log_space decreases by the size of the deleted relay log, but does
3699
 
    not increase, so flush-after-flush we may become negative, which is wrong.
3700
 
    Even if this will be corrected as soon as a query is replicated on the
3701
 
    slave (because the I/O thread will then call harvest_bytes_written() which
3702
 
    will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
3703
 
    output in SHOW SLAVE STATUS meanwhile. So we harvest now.
3704
 
    If the log is closed, then this will just harvest the last writes, probably
3705
 
    0 as they probably have been harvested.
3706
 
  */
3707
 
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3708
 
end:
3709
 
  pthread_mutex_unlock(&mi->run_lock);
3710
 
  return;
3711
 
}
3712
 
 
3713
 
 
3714
 
/**
3715
 
   Detects, based on master's version (as found in the relay log), if master
3716
 
   has a certain bug.
3717
 
   @param rli Relay_log_info which tells the master's version
3718
 
   @param bug_id Number of the bug as found in bugs.mysql.com
3719
 
   @param report bool report error message, default TRUE
3720
 
   @return true if master has the bug, FALSE if it does not.
3721
 
*/
3722
 
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
3723
 
{
3724
 
  struct st_version_range_for_one_bug {
3725
 
    uint32_t        bug_id;
3726
 
    const uchar introduced_in[3]; // first version with bug
3727
 
    const uchar fixed_in[3];      // first version with fix
3728
 
  };
3729
 
  static struct st_version_range_for_one_bug versions_for_all_bugs[]=
3730
 
  {
3731
 
    {24432, { 5, 0, 24 }, { 5, 0, 38 } },
3732
 
    {24432, { 5, 1, 12 }, { 5, 1, 17 } },
3733
 
    {33029, { 5, 0,  0 }, { 5, 0, 58 } },
3734
 
    {33029, { 5, 1,  0 }, { 5, 1, 12 } },
3735
 
  };
3736
 
  const uchar *master_ver=
3737
 
    rli->relay_log.description_event_for_exec->server_version_split;
3738
 
 
3739
 
  assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3740
 
 
3741
 
  for (uint32_t i= 0;
3742
 
       i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3743
 
  {
3744
 
    const uchar *introduced_in= versions_for_all_bugs[i].introduced_in,
3745
 
      *fixed_in= versions_for_all_bugs[i].fixed_in;
3746
 
    if ((versions_for_all_bugs[i].bug_id == bug_id) &&
3747
 
        (memcmp(introduced_in, master_ver, 3) <= 0) &&
3748
 
        (memcmp(fixed_in,      master_ver, 3) >  0))
3749
 
    {
3750
 
      if (!report)
3751
 
        return true;
3752
 
 
3753
 
      // a short message for SHOW SLAVE STATUS (message length constraints)
3754
 
      my_printf_error(ER_UNKNOWN_ERROR,
3755
 
                      _("master may suffer from"
3756
 
                        " http://bugs.mysql.com/bug.php?id=%u"
3757
 
                        " so slave stops; check error log on slave"
3758
 
                        " for more info"), MYF(0), bug_id);
3759
 
      // a verbose message for the error log
3760
 
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
3761
 
                  _("According to the master's version ('%s'),"
3762
 
                    " it is probable that master suffers from this bug:"
3763
 
                    " http://bugs.mysql.com/bug.php?id=%u"
3764
 
                    " and thus replicating the current binary log event"
3765
 
                    " may make the slave's data become different from the"
3766
 
                    " master's data."
3767
 
                    " To take no risk, slave refuses to replicate"
3768
 
                    " this event and stops."
3769
 
                    " We recommend that all updates be stopped on the"
3770
 
                    " master and slave, that the data of both be"
3771
 
                    " manually synchronized,"
3772
 
                    " that master's binary logs be deleted,"
3773
 
                    " that master be upgraded to a version at least"
3774
 
                    " equal to '%d.%d.%d'. Then replication can be"
3775
 
                    " restarted."),
3776
 
                  rli->relay_log.description_event_for_exec->server_version,
3777
 
                  bug_id,
3778
 
                  fixed_in[0], fixed_in[1], fixed_in[2]);
3779
 
      return true;
3780
 
    }
3781
 
  }
3782
 
  return false;
3783
 
}
3784
 
 
3785
 
/**
3786
 
   BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
3787
 
   exclusive, if one statement in a SP generated AUTO_INCREMENT value
3788
 
   by the top statement, all statements after it would be considered
3789
 
   generated AUTO_INCREMENT value by the top statement, and a
3790
 
   erroneous INSERT_ID value might be associated with these statement,
3791
 
   which could cause duplicate entry error and stop the slave.
3792
 
 
3793
 
   Detect buggy master to work around.
3794
 
 */
3795
 
bool rpl_master_erroneous_autoinc(THD *thd)
3796
 
{
3797
 
  if (active_mi && active_mi->rli.sql_thd == thd)
3798
 
  {
3799
 
    Relay_log_info *rli= &active_mi->rli;
3800
 
    return rpl_master_has_bug(rli, 33029, false);
3801
 
  }
3802
 
  return false;
3803
 
}
3804
 
 
3805
 
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
3806
 
template class I_List_iterator<i_string>;
3807
 
template class I_List_iterator<i_string_pair>;
3808
 
#endif
3809
 
 
3810
 
/**
3811
 
  @} (end of group Replication)
3812
 
*/
3813
 
 
3814
 
#endif /* HAVE_REPLICATION */