~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/slave.cc

  • Committer: Monty Taylor
  • Date: 2008-09-16 00:00:48 UTC
  • mto: This revision was merged to the branch mainline in revision 391.
  • Revision ID: monty@inaugust.com-20080916000048-3rvrv3gv9l0ad3gs
Fixed copyright headers in drizzled/

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2003 DRIZZLE AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/**
 
18
  @addtogroup Replication
 
19
  @{
 
20
 
 
21
  @file
 
22
 
 
23
  @brief Code to run the io thread and the sql thread on the
 
24
  replication slave.
 
25
*/
 
26
#include <drizzled/server_includes.h>
 
27
 
 
28
#include <storage/myisam/myisam.h>
 
29
#include "rpl_mi.h"
 
30
#include "rpl_rli.h"
 
31
#include "sql_repl.h"
 
32
#include "rpl_filter.h"
 
33
#include "repl_failsafe.h"
 
34
#include <mysys/thr_alarm.h>
 
35
#include <libdrizzle/sql_common.h>
 
36
#include <libdrizzle/errmsg.h>
 
37
#include <mysys/mysys_err.h>
 
38
#include <drizzled/drizzled_error_messages.h>
 
39
 
 
40
#ifdef HAVE_REPLICATION
 
41
 
 
42
#include "rpl_tblmap.h"
 
43
 
 
44
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
 
45
 
 
46
#define MAX_SLAVE_RETRY_PAUSE 5
 
47
bool use_slave_mask = 0;
 
48
MY_BITMAP slave_error_mask;
 
49
 
 
50
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
 
51
 
 
52
char* slave_load_tmpdir = 0;
 
53
Master_info *active_mi= 0;
 
54
bool replicate_same_server_id;
 
55
uint64_t relay_log_space_limit = 0;
 
56
 
 
57
/*
 
58
  When slave thread exits, we need to remember the temporary tables so we
 
59
  can re-use them on slave start.
 
60
 
 
61
  TODO: move the vars below under Master_info
 
62
*/
 
63
 
 
64
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
 
65
int32_t events_till_abort = -1;
 
66
 
 
67
enum enum_slave_reconnect_actions
 
68
{
 
69
  SLAVE_RECON_ACT_REG= 0,
 
70
  SLAVE_RECON_ACT_DUMP= 1,
 
71
  SLAVE_RECON_ACT_EVENT= 2,
 
72
  SLAVE_RECON_ACT_MAX
 
73
};
 
74
 
 
75
enum enum_slave_reconnect_messages
 
76
{
 
77
  SLAVE_RECON_MSG_WAIT= 0,
 
78
  SLAVE_RECON_MSG_KILLED_WAITING= 1,
 
79
  SLAVE_RECON_MSG_AFTER= 2,
 
80
  SLAVE_RECON_MSG_FAILED= 3,
 
81
  SLAVE_RECON_MSG_COMMAND= 4,
 
82
  SLAVE_RECON_MSG_KILLED_AFTER= 5,
 
83
  SLAVE_RECON_MSG_MAX
 
84
};
 
85
 
 
86
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
 
87
{
 
88
  {
 
89
    N_("Waiting to reconnect after a failed registration on master"),
 
90
    N_("Slave I/O thread killed while waitnig to reconnect after a "
 
91
                 "failed registration on master"),
 
92
    N_("Reconnecting after a failed registration on master"),
 
93
    N_("failed registering on master, reconnecting to try again, "
 
94
                 "log '%s' at postion %s"),
 
95
    "COM_REGISTER_SLAVE",
 
96
    N_("Slave I/O thread killed during or after reconnect")
 
97
  },
 
98
  {
 
99
    N_("Waiting to reconnect after a failed binlog dump request"),
 
100
    N_("Slave I/O thread killed while retrying master dump"),
 
101
    N_("Reconnecting after a failed binlog dump request"),
 
102
    N_("failed dump request, reconnecting to try again, "
 
103
                 "log '%s' at postion %s"),
 
104
    "COM_BINLOG_DUMP",
 
105
    N_("Slave I/O thread killed during or after reconnect")
 
106
  },
 
107
  {
 
108
    N_("Waiting to reconnect after a failed master event read"),
 
109
    N_("Slave I/O thread killed while waiting to reconnect "
 
110
                 "after a failed read"),
 
111
    N_("Reconnecting after a failed master event read"),
 
112
    N_("Slave I/O thread: Failed reading log event, "
 
113
                 "reconnecting to retry, log '%s' at postion %s"),
 
114
    "",
 
115
    N_("Slave I/O thread killed during or after a "
 
116
                 "reconnect done to recover from failed read")
 
117
  }
 
118
};
 
119
 
 
120
 
 
121
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
 
122
 
 
123
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
 
124
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
 
125
static bool wait_for_relay_log_space(Relay_log_info* rli);
 
126
static inline bool io_slave_killed(THD* thd,Master_info* mi);
 
127
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
 
128
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
 
129
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi);
 
130
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
131
                          bool suppress_warnings);
 
132
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
133
                             bool reconnect, bool suppress_warnings);
 
134
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
135
                      void* thread_killed_arg);
 
136
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
 
137
static Log_event* next_event(Relay_log_info* rli);
 
138
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
 
139
static int32_t terminate_slave_thread(THD *thd,
 
140
                                  pthread_mutex_t* term_lock,
 
141
                                  pthread_cond_t* term_cond,
 
142
                                  volatile uint32_t *slave_running,
 
143
                                  bool skip_lock);
 
144
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
 
145
 
 
146
/*
 
147
  Find out which replications threads are running
 
148
 
 
149
  SYNOPSIS
 
150
    init_thread_mask()
 
151
    mask                Return value here
 
152
    mi                  master_info for slave
 
153
    inverse             If set, returns which threads are not running
 
154
 
 
155
  IMPLEMENTATION
 
156
    Get a bit mask for which threads are running so that we can later restart
 
157
    these threads.
 
158
 
 
159
  RETURN
 
160
    mask        If inverse == 0, running threads
 
161
                If inverse == 1, stopped threads
 
162
*/
 
163
 
 
164
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
 
165
{
 
166
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
 
167
  register int32_t tmp_mask=0;
 
168
 
 
169
  if (set_io)
 
170
    tmp_mask |= SLAVE_IO;
 
171
  if (set_sql)
 
172
    tmp_mask |= SLAVE_SQL;
 
173
  if (inverse)
 
174
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
 
175
  *mask = tmp_mask;
 
176
  return;
 
177
}
 
178
 
 
179
 
 
180
/*
 
181
  lock_slave_threads()
 
182
*/
 
183
 
 
184
void lock_slave_threads(Master_info* mi)
 
185
{
 
186
  //TODO: see if we can do this without dual mutex
 
187
  pthread_mutex_lock(&mi->run_lock);
 
188
  pthread_mutex_lock(&mi->rli.run_lock);
 
189
  return;
 
190
}
 
191
 
 
192
 
 
193
/*
 
194
  unlock_slave_threads()
 
195
*/
 
196
 
 
197
void unlock_slave_threads(Master_info* mi)
 
198
{
 
199
  //TODO: see if we can do this without dual mutex
 
200
  pthread_mutex_unlock(&mi->rli.run_lock);
 
201
  pthread_mutex_unlock(&mi->run_lock);
 
202
  return;
 
203
}
 
204
 
 
205
 
 
206
/* Initialize slave structures */
 
207
 
 
208
int32_t init_slave()
 
209
{
 
210
  /*
 
211
    This is called when mysqld starts. Before client connections are
 
212
    accepted. However bootstrap may conflict with us if it does START SLAVE.
 
213
    So it's safer to take the lock.
 
214
  */
 
215
  pthread_mutex_lock(&LOCK_active_mi);
 
216
  /*
 
217
    TODO: re-write this to interate through the list of files
 
218
    for multi-master
 
219
  */
 
220
  active_mi= new Master_info;
 
221
 
 
222
  /*
 
223
    If master_host is not specified, try to read it from the master_info file.
 
224
    If master_host is specified, create the master_info file if it doesn't
 
225
    exists.
 
226
  */
 
227
  if (!active_mi)
 
228
  {
 
229
    sql_print_error(_("Failed to allocate memory for the master info structure"));
 
230
    goto err;
 
231
  }
 
232
 
 
233
  if (init_master_info(active_mi,master_info_file,relay_log_info_file,
 
234
                       1, (SLAVE_IO | SLAVE_SQL)))
 
235
  {
 
236
    sql_print_error(_("Failed to initialize the master info structure"));
 
237
    goto err;
 
238
  }
 
239
 
 
240
  /* If server id is not set, start_slave_thread() will say it */
 
241
 
 
242
  if (active_mi->host[0] && !opt_skip_slave_start)
 
243
  {
 
244
    if (start_slave_threads(1 /* need mutex */,
 
245
                            0 /* no wait for start*/,
 
246
                            active_mi,
 
247
                            master_info_file,
 
248
                            relay_log_info_file,
 
249
                            SLAVE_IO | SLAVE_SQL))
 
250
    {
 
251
      sql_print_error(_("Failed to create slave threads"));
 
252
      goto err;
 
253
    }
 
254
  }
 
255
  pthread_mutex_unlock(&LOCK_active_mi);
 
256
  return(0);
 
257
 
 
258
err:
 
259
  pthread_mutex_unlock(&LOCK_active_mi);
 
260
  return(1);
 
261
}
 
262
 
 
263
 
 
264
/*
 
265
  Init function to set up array for errors that should be skipped for slave
 
266
 
 
267
  SYNOPSIS
 
268
    init_slave_skip_errors()
 
269
    arg         List of errors numbers to skip, separated with ','
 
270
 
 
271
  NOTES
 
272
    Called from get_options() in mysqld.cc on start-up
 
273
*/
 
274
 
 
275
void init_slave_skip_errors(const char* arg)
 
276
{
 
277
  const char *p;
 
278
 
 
279
  if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
 
280
  {
 
281
    fprintf(stderr, "Badly out of memory, please check your system status\n");
 
282
    exit(1);
 
283
  }
 
284
  use_slave_mask = 1;
 
285
  for (;my_isspace(system_charset_info,*arg);++arg)
 
286
    /* empty */;
 
287
  if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
 
288
  {
 
289
    bitmap_set_all(&slave_error_mask);
 
290
    return;
 
291
  }
 
292
  for (p= arg ; *p; )
 
293
  {
 
294
    long err_code;
 
295
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
 
296
      break;
 
297
    if (err_code < MAX_SLAVE_ERROR)
 
298
       bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
 
299
    while (!my_isdigit(system_charset_info,*p) && *p)
 
300
      p++;
 
301
  }
 
302
  return;
 
303
}
 
304
 
 
305
 
 
306
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
 
307
{
 
308
  if (!mi->inited)
 
309
    return(0); /* successfully do nothing */
 
310
  int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
 
311
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
 
312
 
 
313
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
 
314
  {
 
315
    mi->abort_slave=1;
 
316
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
 
317
                                      &mi->stop_cond,
 
318
                                      &mi->slave_running,
 
319
                                      skip_lock)) &&
 
320
        !force_all)
 
321
      return(error);
 
322
  }
 
323
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
 
324
  {
 
325
    mi->rli.abort_slave=1;
 
326
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
 
327
                                      &mi->rli.stop_cond,
 
328
                                      &mi->rli.slave_running,
 
329
                                      skip_lock)) &&
 
330
        !force_all)
 
331
      return(error);
 
332
  }
 
333
  return(0);
 
334
}
 
335
 
 
336
 
 
337
/**
 
338
   Wait for a slave thread to terminate.
 
339
 
 
340
   This function is called after requesting the thread to terminate
 
341
   (by setting @c abort_slave member of @c Relay_log_info or @c
 
342
   Master_info structure to 1). Termination of the thread is
 
343
   controlled with the the predicate <code>*slave_running</code>.
 
344
 
 
345
   Function will acquire @c term_lock before waiting on the condition
 
346
   unless @c skip_lock is true in which case the mutex should be owned
 
347
   by the caller of this function and will remain acquired after
 
348
   return from the function.
 
349
 
 
350
   @param term_lock
 
351
          Associated lock to use when waiting for @c term_cond
 
352
 
 
353
   @param term_cond
 
354
          Condition that is signalled when the thread has terminated
 
355
 
 
356
   @param slave_running
 
357
          Pointer to predicate to check for slave thread termination
 
358
 
 
359
   @param skip_lock
 
360
          If @c true the lock will not be acquired before waiting on
 
361
          the condition. In this case, it is assumed that the calling
 
362
          function acquires the lock before calling this function.
 
363
 
 
364
   @retval 0 All OK
 
365
 */
 
366
static int32_t
 
367
terminate_slave_thread(THD *thd,
 
368
                       pthread_mutex_t* term_lock,
 
369
                       pthread_cond_t* term_cond,
 
370
                       volatile uint32_t *slave_running,
 
371
                       bool skip_lock)
 
372
{
 
373
  int32_t error;
 
374
 
 
375
  if (!skip_lock)
 
376
    pthread_mutex_lock(term_lock);
 
377
 
 
378
  safe_mutex_assert_owner(term_lock);
 
379
 
 
380
  if (!*slave_running)
 
381
  {
 
382
    if (!skip_lock)
 
383
      pthread_mutex_unlock(term_lock);
 
384
    return(ER_SLAVE_NOT_RUNNING);
 
385
  }
 
386
  assert(thd != 0);
 
387
  THD_CHECK_SENTRY(thd);
 
388
 
 
389
  /*
 
390
    Is is critical to test if the slave is running. Otherwise, we might
 
391
    be referening freed memory trying to kick it
 
392
  */
 
393
 
 
394
  while (*slave_running)                        // Should always be true
 
395
  {
 
396
    pthread_mutex_lock(&thd->LOCK_delete);
 
397
#ifndef DONT_USE_THR_ALARM
 
398
    /*
 
399
      Error codes from pthread_kill are:
 
400
      EINVAL: invalid signal number (can't happen)
 
401
      ESRCH: thread already killed (can happen, should be ignored)
 
402
    */
 
403
    int32_t err= pthread_kill(thd->real_id, thr_client_alarm);
 
404
    assert(err != EINVAL);
 
405
#endif
 
406
    thd->awake(THD::NOT_KILLED);
 
407
    pthread_mutex_unlock(&thd->LOCK_delete);
 
408
 
 
409
    /*
 
410
      There is a small chance that slave thread might miss the first
 
411
      alarm. To protect againts it, resend the signal until it reacts
 
412
    */
 
413
    struct timespec abstime;
 
414
    set_timespec(abstime,2);
 
415
    error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
 
416
    assert(error == ETIMEDOUT || error == 0);
 
417
  }
 
418
 
 
419
  assert(*slave_running == 0);
 
420
 
 
421
  if (!skip_lock)
 
422
    pthread_mutex_unlock(term_lock);
 
423
  return(0);
 
424
}
 
425
 
 
426
 
 
427
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
 
428
                           pthread_mutex_t *cond_lock,
 
429
                           pthread_cond_t *start_cond,
 
430
                           volatile uint32_t *slave_running,
 
431
                           volatile uint32_t *slave_run_id,
 
432
                           Master_info* mi,
 
433
                           bool high_priority)
 
434
{
 
435
  pthread_t th;
 
436
  uint32_t start_id;
 
437
 
 
438
  assert(mi->inited);
 
439
 
 
440
  if (start_lock)
 
441
    pthread_mutex_lock(start_lock);
 
442
  if (!server_id)
 
443
  {
 
444
    if (start_cond)
 
445
      pthread_cond_broadcast(start_cond);
 
446
    if (start_lock)
 
447
      pthread_mutex_unlock(start_lock);
 
448
    sql_print_error(_("Server id not set, will not start slave"));
 
449
    return(ER_BAD_SLAVE);
 
450
  }
 
451
 
 
452
  if (*slave_running)
 
453
  {
 
454
    if (start_cond)
 
455
      pthread_cond_broadcast(start_cond);
 
456
    if (start_lock)
 
457
      pthread_mutex_unlock(start_lock);
 
458
    return(ER_SLAVE_MUST_STOP);
 
459
  }
 
460
  start_id= *slave_run_id;
 
461
  if (high_priority)
 
462
  {
 
463
    struct sched_param tmp_sched_param;
 
464
 
 
465
    memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
 
466
    tmp_sched_param.sched_priority= CONNECT_PRIOR;
 
467
    (void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
 
468
  }
 
469
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
 
470
  {
 
471
    if (start_lock)
 
472
      pthread_mutex_unlock(start_lock);
 
473
    return(ER_SLAVE_THREAD);
 
474
  }
 
475
  if (start_cond && cond_lock) // caller has cond_lock
 
476
  {
 
477
    THD* thd = current_thd;
 
478
    while (start_id == *slave_run_id)
 
479
    {
 
480
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
 
481
                                            "Waiting for slave thread to start");
 
482
      pthread_cond_wait(start_cond,cond_lock);
 
483
      thd->exit_cond(old_msg);
 
484
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
 
485
      if (thd->killed)
 
486
        return(thd->killed_errno());
 
487
    }
 
488
  }
 
489
  if (start_lock)
 
490
    pthread_mutex_unlock(start_lock);
 
491
  return(0);
 
492
}
 
493
 
 
494
 
 
495
/*
 
496
  start_slave_threads()
 
497
 
 
498
  NOTES
 
499
    SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
 
500
    sense to do that for starting a slave--we always care if it actually
 
501
    started the threads that were not previously running
 
502
*/
 
503
 
 
504
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
 
505
                        Master_info* mi,
 
506
                        const char* master_info_fname __attribute__((unused)),
 
507
                        const char* slave_info_fname __attribute__((unused)),
 
508
                        int32_t thread_mask)
 
509
{
 
510
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
 
511
  pthread_cond_t* cond_io=0,*cond_sql=0;
 
512
  int32_t error=0;
 
513
 
 
514
  if (need_slave_mutex)
 
515
  {
 
516
    lock_io = &mi->run_lock;
 
517
    lock_sql = &mi->rli.run_lock;
 
518
  }
 
519
  if (wait_for_start)
 
520
  {
 
521
    cond_io = &mi->start_cond;
 
522
    cond_sql = &mi->rli.start_cond;
 
523
    lock_cond_io = &mi->run_lock;
 
524
    lock_cond_sql = &mi->rli.run_lock;
 
525
  }
 
526
 
 
527
  if (thread_mask & SLAVE_IO)
 
528
    error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
 
529
                              cond_io,
 
530
                              &mi->slave_running, &mi->slave_run_id,
 
531
                              mi, 1); //high priority, to read the most possible
 
532
  if (!error && (thread_mask & SLAVE_SQL))
 
533
  {
 
534
    error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
 
535
                              cond_sql,
 
536
                              &mi->rli.slave_running, &mi->rli.slave_run_id,
 
537
                              mi, 0);
 
538
    if (error)
 
539
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
 
540
  }
 
541
  return(error);
 
542
}
 
543
 
 
544
 
 
545
#ifdef NOT_USED_YET
 
546
static int32_t end_slave_on_walk(Master_info* mi, uchar* /*unused*/)
 
547
{
 
548
  end_master_info(mi);
 
549
  return(0);
 
550
}
 
551
#endif
 
552
 
 
553
 
 
554
/*
 
555
  Free all resources used by slave
 
556
 
 
557
  SYNOPSIS
 
558
    end_slave()
 
559
*/
 
560
 
 
561
void end_slave()
 
562
{
 
563
  /*
 
564
    This is called when the server terminates, in close_connections().
 
565
    It terminates slave threads. However, some CHANGE MASTER etc may still be
 
566
    running presently. If a START SLAVE was in progress, the mutex lock below
 
567
    will make us wait until slave threads have started, and START SLAVE
 
568
    returns, then we terminate them here.
 
569
  */
 
570
  pthread_mutex_lock(&LOCK_active_mi);
 
571
  if (active_mi)
 
572
  {
 
573
    /*
 
574
      TODO: replace the line below with
 
575
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
 
576
      once multi-master code is ready.
 
577
    */
 
578
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
 
579
    end_master_info(active_mi);
 
580
    delete active_mi;
 
581
    active_mi= 0;
 
582
  }
 
583
  pthread_mutex_unlock(&LOCK_active_mi);
 
584
  return;
 
585
}
 
586
 
 
587
 
 
588
static bool io_slave_killed(THD* thd, Master_info* mi)
 
589
{
 
590
  assert(mi->io_thd == thd);
 
591
  assert(mi->slave_running); // tracking buffer overrun
 
592
  return(mi->abort_slave || abort_loop || thd->killed);
 
593
}
 
594
 
 
595
 
 
596
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
 
597
{
 
598
  assert(rli->sql_thd == thd);
 
599
  assert(rli->slave_running == 1);// tracking buffer overrun
 
600
  if (abort_loop || thd->killed || rli->abort_slave)
 
601
  {
 
602
    /*
 
603
      If we are in an unsafe situation (stopping could corrupt replication),
 
604
      we give one minute to the slave SQL thread of grace before really
 
605
      terminating, in the hope that it will be able to read more events and
 
606
      the unsafe situation will soon be left. Note that this one minute starts
 
607
      from the last time anything happened in the slave SQL thread. So it's
 
608
      really one minute of idleness, we don't timeout if the slave SQL thread
 
609
      is actively working.
 
610
    */
 
611
    if (rli->last_event_start_time == 0)
 
612
      return(1);
 
613
    if (difftime(time(0), rli->last_event_start_time) > 60)
 
614
    {
 
615
      rli->report(ERROR_LEVEL, 0,
 
616
                  _("SQL thread had to stop in an unsafe situation, in "
 
617
                  "the middle of applying updates to a "
 
618
                  "non-transactional table without any primary key. "
 
619
                  "There is a risk of duplicate updates when the slave "
 
620
                  "SQL thread is restarted. Please check your tables' "
 
621
                  "contents after restart."));
 
622
      return(1);
 
623
    }
 
624
  }
 
625
  return(0);
 
626
}
 
627
 
 
628
 
 
629
/*
 
630
  skip_load_data_infile()
 
631
 
 
632
  NOTES
 
633
    This is used to tell a 3.23 master to break send_file()
 
634
*/
 
635
 
 
636
void skip_load_data_infile(NET *net)
 
637
{
 
638
  (void)net_request_file(net, "/dev/null");
 
639
  (void)my_net_read(net);                               // discard response
 
640
  (void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok
 
641
  return;
 
642
}
 
643
 
 
644
 
 
645
bool net_request_file(NET* net, const char* fname)
 
646
{
 
647
  return(net_write_command(net, 251, (uchar*) fname, strlen(fname),
 
648
                                (uchar*) "", 0));
 
649
}
 
650
 
 
651
/*
 
652
  From other comments and tests in code, it looks like
 
653
  sometimes Query_log_event and Load_log_event can have db == 0
 
654
  (see rewrite_db() above for example)
 
655
  (cases where this happens are unclear; it may be when the master is 3.23).
 
656
*/
 
657
 
 
658
const char *print_slave_db_safe(const char* db)
 
659
{
 
660
  return((db ? db : ""));
 
661
}
 
662
 
 
663
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
 
664
                                 const char *default_val)
 
665
{
 
666
  uint32_t length;
 
667
 
 
668
  if ((length=my_b_gets(f,var, max_size)))
 
669
  {
 
670
    char* last_p = var + length -1;
 
671
    if (*last_p == '\n')
 
672
      *last_p = 0; // if we stopped on newline, kill it
 
673
    else
 
674
    {
 
675
      /*
 
676
        If we truncated a line or stopped on last char, remove all chars
 
677
        up to and including newline.
 
678
      */
 
679
      int32_t c;
 
680
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
 
681
    }
 
682
    return(0);
 
683
  }
 
684
  else if (default_val)
 
685
  {
 
686
    strmake(var,  default_val, max_size-1);
 
687
    return(0);
 
688
  }
 
689
  return(1);
 
690
}
 
691
 
 
692
 
 
693
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
 
694
{
 
695
  char buf[32];
 
696
 
 
697
 
 
698
  if (my_b_gets(f, buf, sizeof(buf)))
 
699
  {
 
700
    *var = atoi(buf);
 
701
    return(0);
 
702
  }
 
703
  else if (default_val)
 
704
  {
 
705
    *var = default_val;
 
706
    return(0);
 
707
  }
 
708
  return(1);
 
709
}
 
710
 
 
711
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
 
712
{
 
713
  char buf[16];
 
714
 
 
715
 
 
716
  if (my_b_gets(f, buf, sizeof(buf)))
 
717
  {
 
718
    if (sscanf(buf, "%f", var) != 1)
 
719
      return(1);
 
720
    else
 
721
      return(0);
 
722
  }
 
723
  else if (default_val != 0.0)
 
724
  {
 
725
    *var = default_val;
 
726
    return(0);
 
727
  }
 
728
  return(1);
 
729
}
 
730
 
 
731
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
 
732
{
 
733
  if (io_slave_killed(thd, mi))
 
734
  {
 
735
    if (info && global_system_variables.log_warnings)
 
736
      sql_print_information(info);
 
737
    return true;
 
738
  }
 
739
  return false;
 
740
}
 
741
 
 
742
 
 
743
/*
 
744
  Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
 
745
  relying on the binlog's version. This is not perfect: imagine an upgrade
 
746
  of the master without waiting that all slaves are in sync with the master;
 
747
  then a slave could be fooled about the binlog's format. This is what happens
 
748
  when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
 
749
  slaves are fooled. So we do this only to distinguish between 3.23 and more
 
750
  recent masters (it's too late to change things for 3.23).
 
751
 
 
752
  RETURNS
 
753
  0       ok
 
754
  1       error
 
755
*/
 
756
 
 
757
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
 
758
{
 
759
  char error_buf[512];
 
760
  String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
 
761
  char err_buff[MAX_SLAVE_ERRMSG];
 
762
  const char* errmsg= 0;
 
763
  int32_t err_code= 0;
 
764
  DRIZZLE_RES *master_res= 0;
 
765
  DRIZZLE_ROW master_row;
 
766
 
 
767
  err_msg.length(0);
 
768
  /*
 
769
    Free old description_event_for_queue (that is needed if we are in
 
770
    a reconnection).
 
771
  */
 
772
  delete mi->rli.relay_log.description_event_for_queue;
 
773
  mi->rli.relay_log.description_event_for_queue= 0;
 
774
 
 
775
  if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
 
776
  {
 
777
    errmsg = _("Master reported unrecognized DRIZZLE version");
 
778
    err_code= ER_SLAVE_FATAL_ERROR;
 
779
    sprintf(err_buff, ER(err_code), errmsg);
 
780
    err_msg.append(err_buff);
 
781
  }
 
782
  else
 
783
  {
 
784
    /*
 
785
      Note the following switch will bug when we have DRIZZLE branch 30 ;)
 
786
    */
 
787
    switch (*drizzle->server_version)
 
788
    {
 
789
    case '0':
 
790
    case '1':
 
791
    case '2':
 
792
      errmsg = _("Master reported unrecognized DRIZZLE version");
 
793
      err_code= ER_SLAVE_FATAL_ERROR;
 
794
      sprintf(err_buff, ER(err_code), errmsg);
 
795
      err_msg.append(err_buff);
 
796
      break;
 
797
    case '3':
 
798
      mi->rli.relay_log.description_event_for_queue= new
 
799
        Format_description_log_event(1, drizzle->server_version);
 
800
      break;
 
801
    case '4':
 
802
      mi->rli.relay_log.description_event_for_queue= new
 
803
        Format_description_log_event(3, drizzle->server_version);
 
804
      break;
 
805
    default:
 
806
      /*
 
807
        Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
 
808
        take the early steps (like tests for "is this a 3.23 master") which we
 
809
        have to take before we receive the real master's Format_desc which will
 
810
        override this one. Note that the Format_desc we create below is garbage
 
811
        (it has the format of the *slave*); it's only good to help know if the
 
812
        master is 3.23, 4.0, etc.
 
813
      */
 
814
      mi->rli.relay_log.description_event_for_queue= new
 
815
        Format_description_log_event(4, drizzle->server_version);
 
816
      break;
 
817
    }
 
818
  }
 
819
 
 
820
  /*
 
821
     This does not mean that a 5.0 slave will be able to read a 6.0 master; but
 
822
     as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
 
823
     can't read a 6.0 master, this will show up when the slave can't read some
 
824
     events sent by the master, and there will be error messages.
 
825
  */
 
826
 
 
827
  if (err_msg.length() != 0)
 
828
    goto err;
 
829
 
 
830
  /* as we are here, we tried to allocate the event */
 
831
  if (!mi->rli.relay_log.description_event_for_queue)
 
832
  {
 
833
    errmsg= _("default Format_description_log_event");
 
834
    err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
 
835
    sprintf(err_buff, ER(err_code), errmsg);
 
836
    err_msg.append(err_buff);
 
837
    goto err;
 
838
  }
 
839
 
 
840
  /*
 
841
    Compare the master and slave's clock. Do not die if master's clock is
 
842
    unavailable (very old master not supporting UNIX_TIMESTAMP()?).
 
843
  */
 
844
 
 
845
  if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
 
846
      (master_res= drizzle_store_result(drizzle)) &&
 
847
      (master_row= drizzle_fetch_row(master_res)))
 
848
  {
 
849
    mi->clock_diff_with_master=
 
850
      (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
 
851
  }
 
852
  else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
 
853
  {
 
854
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
 
855
    sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
 
856
                        "do not trust column Seconds_Behind_Master of SHOW "
 
857
                        "SLAVE STATUS. Error: %s (%d)"),
 
858
                      drizzle_error(drizzle), drizzle_errno(drizzle));
 
859
  }
 
860
  if (master_res)
 
861
    drizzle_free_result(master_res);
 
862
 
 
863
  /*
 
864
    Check that the master's server id and ours are different. Because if they
 
865
    are equal (which can result from a simple copy of master's datadir to slave,
 
866
    thus copying some my.cnf), replication will work but all events will be
 
867
    skipped.
 
868
    Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
 
869
    master?).
 
870
    Note: we could have put a @@SERVER_ID in the previous SELECT
 
871
    UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
 
872
  */
 
873
  if (!drizzle_real_query(drizzle,
 
874
                        STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
 
875
      (master_res= drizzle_store_result(drizzle)))
 
876
  {
 
877
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
878
        (::server_id == strtoul(master_row[1], 0, 10)) &&
 
879
        !mi->rli.replicate_same_server_id)
 
880
    {
 
881
      errmsg=
 
882
        _("The slave I/O thread stops because master and slave have equal "
 
883
          "DRIZZLE server ids; these ids must be different "
 
884
          "for replication to work (or "
 
885
          "the --replicate-same-server-id option must be used "
 
886
          "on slave but this does"
 
887
          "not always make sense; please check the manual before using it).");
 
888
      err_code= ER_SLAVE_FATAL_ERROR;
 
889
      sprintf(err_buff, ER(err_code), errmsg);
 
890
      err_msg.append(err_buff);
 
891
    }
 
892
    drizzle_free_result(master_res);
 
893
    if (errmsg)
 
894
      goto err;
 
895
  }
 
896
 
 
897
  /*
 
898
    Check that the master's global character_set_server and ours are the same.
 
899
    Not fatal if query fails (old master?).
 
900
    Note that we don't check for equality of global character_set_client and
 
901
    collation_connection (neither do we prevent their setting in
 
902
    set_var.cc). That's because from what I (Guilhem) have tested, the global
 
903
    values of these 2 are never used (new connections don't use them).
 
904
    We don't test equality of global collation_database either as it's is
 
905
    going to be deprecated (made read-only) in 4.1 very soon.
 
906
    The test is only relevant if master < 5.0.3 (we'll test only if it's older
 
907
    than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
 
908
    charset info in each binlog event.
 
909
    We don't do it for 3.23 because masters <3.23.50 hang on
 
910
    SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
 
911
    test only if master is 4.x.
 
912
  */
 
913
 
 
914
  /* redundant with rest of code but safer against later additions */
 
915
  if (*drizzle->server_version == '3')
 
916
    goto err;
 
917
 
 
918
  if ((*drizzle->server_version == '4') &&
 
919
      !drizzle_real_query(drizzle,
 
920
                        STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
 
921
      (master_res= drizzle_store_result(drizzle)))
 
922
  {
 
923
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
924
        strcmp(master_row[0], global_system_variables.collation_server->name))
 
925
    {
 
926
      errmsg=
 
927
        _("The slave I/O thread stops because master and slave have"
 
928
          " different values for the COLLATION_SERVER global variable."
 
929
          " The values must be equal for replication to work");
 
930
      err_code= ER_SLAVE_FATAL_ERROR;
 
931
      sprintf(err_buff, ER(err_code), errmsg);
 
932
      err_msg.append(err_buff);
 
933
    }
 
934
    drizzle_free_result(master_res);
 
935
    if (errmsg)
 
936
      goto err;
 
937
  }
 
938
 
 
939
  /*
 
940
    Perform analogous check for time zone. Theoretically we also should
 
941
    perform check here to verify that SYSTEM time zones are the same on
 
942
    slave and master, but we can't rely on value of @@system_time_zone
 
943
    variable (it is time zone abbreviation) since it determined at start
 
944
    time and so could differ for slave and master even if they are really
 
945
    in the same system time zone. So we are omiting this check and just
 
946
    relying on documentation. Also according to Monty there are many users
 
947
    who are using replication between servers in various time zones. Hence
 
948
    such check will broke everything for them. (And now everything will
 
949
    work for them because by default both their master and slave will have
 
950
    'SYSTEM' time zone).
 
951
    This check is only necessary for 4.x masters (and < 5.0.4 masters but
 
952
    those were alpha).
 
953
  */
 
954
  if ((*drizzle->server_version == '4') &&
 
955
      !drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
 
956
      (master_res= drizzle_store_result(drizzle)))
 
957
  {
 
958
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
959
        strcmp(master_row[0],
 
960
               global_system_variables.time_zone->get_name()->ptr()))
 
961
    {
 
962
      errmsg=
 
963
        _("The slave I/O thread stops because master and slave have"
 
964
          " different values for the TIME_ZONE global variable."
 
965
          " The values must be equal for replication to work");
 
966
      err_code= ER_SLAVE_FATAL_ERROR;
 
967
      sprintf(err_buff, ER(err_code), errmsg);
 
968
      err_msg.append(err_buff);
 
969
    }
 
970
    drizzle_free_result(master_res);
 
971
 
 
972
    if (errmsg)
 
973
      goto err;
 
974
  }
 
975
 
 
976
  if (mi->heartbeat_period != 0.0)
 
977
  {
 
978
    char llbuf[22];
 
979
    const char query_format[]= "SET @master_heartbeat_period= %s";
 
980
    char query[sizeof(query_format) - 2 + sizeof(llbuf)];
 
981
    /* 
 
982
       the period is an uint64_t of nano-secs. 
 
983
    */
 
984
    llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
 
985
    sprintf(query, query_format, llbuf);
 
986
 
 
987
    if (drizzle_real_query(drizzle, query, strlen(query))
 
988
        && !check_io_slave_killed(mi->io_thd, mi, NULL))
 
989
    {
 
990
      err_msg.append("The slave I/O thread stops because querying master with '");
 
991
      err_msg.append(query);
 
992
      err_msg.append("' failed;");
 
993
      err_msg.append(" error: ");
 
994
      err_code= drizzle_errno(drizzle);
 
995
      err_msg.qs_append(err_code);
 
996
      err_msg.append("  '");
 
997
      err_msg.append(drizzle_error(drizzle));
 
998
      err_msg.append("'");
 
999
      drizzle_free_result(drizzle_store_result(drizzle));
 
1000
      goto err;
 
1001
    }
 
1002
    drizzle_free_result(drizzle_store_result(drizzle));
 
1003
  }
 
1004
  
 
1005
err:
 
1006
  if (err_msg.length() != 0)
 
1007
  {
 
1008
    sql_print_error(err_msg.ptr());
 
1009
    assert(err_code != 0);
 
1010
    mi->report(ERROR_LEVEL, err_code, err_msg.ptr());
 
1011
    return(1);
 
1012
  }
 
1013
 
 
1014
  return(0);
 
1015
}
 
1016
 
 
1017
 
 
1018
static bool wait_for_relay_log_space(Relay_log_info* rli)
 
1019
{
 
1020
  bool slave_killed=0;
 
1021
  Master_info* mi = rli->mi;
 
1022
  const char *save_proc_info;
 
1023
  THD* thd = mi->io_thd;
 
1024
 
 
1025
  pthread_mutex_lock(&rli->log_space_lock);
 
1026
  save_proc_info= thd->enter_cond(&rli->log_space_cond,
 
1027
                                  &rli->log_space_lock,
 
1028
                                  _("Waiting for the slave SQL thread "
 
1029
                                    "to free enough relay log space"));
 
1030
  while (rli->log_space_limit < rli->log_space_total &&
 
1031
         !(slave_killed=io_slave_killed(thd,mi)) &&
 
1032
         !rli->ignore_log_space_limit)
 
1033
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
 
1034
  thd->exit_cond(save_proc_info);
 
1035
  return(slave_killed);
 
1036
}
 
1037
 
 
1038
 
 
1039
/*
 
1040
  Builds a Rotate from the ignored events' info and writes it to relay log.
 
1041
 
 
1042
  SYNOPSIS
 
1043
  write_ignored_events_info_to_relay_log()
 
1044
    thd             pointer to I/O thread's thd
 
1045
    mi
 
1046
 
 
1047
  DESCRIPTION
 
1048
    Slave I/O thread, going to die, must leave a durable trace of the
 
1049
    ignored events' end position for the use of the slave SQL thread, by
 
1050
    calling this function. Only that thread can call it (see assertion).
 
1051
 */
 
1052
static void write_ignored_events_info_to_relay_log(THD *thd __attribute__((unused)),
 
1053
                                                   Master_info *mi)
 
1054
{
 
1055
  Relay_log_info *rli= &mi->rli;
 
1056
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
 
1057
 
 
1058
  assert(thd == mi->io_thd);
 
1059
  pthread_mutex_lock(log_lock);
 
1060
  if (rli->ign_master_log_name_end[0])
 
1061
  {
 
1062
    Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
 
1063
                                               0, rli->ign_master_log_pos_end,
 
1064
                                               Rotate_log_event::DUP_NAME);
 
1065
    rli->ign_master_log_name_end[0]= 0;
 
1066
    /* can unlock before writing as slave SQL thd will soon see our Rotate */
 
1067
    pthread_mutex_unlock(log_lock);
 
1068
    if (likely((bool)ev))
 
1069
    {
 
1070
      ev->server_id= 0; // don't be ignored by slave SQL thread
 
1071
      if (unlikely(rli->relay_log.append(ev)))
 
1072
        mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
1073
                   ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
1074
                   _("failed to write a Rotate event"
 
1075
                     " to the relay log, SHOW SLAVE STATUS may be"
 
1076
                     " inaccurate"));
 
1077
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
1078
      if (flush_master_info(mi, 1))
 
1079
        sql_print_error(_("Failed to flush master info file"));
 
1080
      delete ev;
 
1081
    }
 
1082
    else
 
1083
      mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
 
1084
                 ER(ER_SLAVE_CREATE_EVENT_FAILURE),
 
1085
                 _("Rotate_event (out of memory?),"
 
1086
                   " SHOW SLAVE STATUS may be inaccurate"));
 
1087
  }
 
1088
  else
 
1089
    pthread_mutex_unlock(log_lock);
 
1090
  return;
 
1091
}
 
1092
 
 
1093
 
 
1094
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
 
1095
                             bool *suppress_warnings)
 
1096
{
 
1097
  uchar buf[1024], *pos= buf;
 
1098
  uint32_t report_host_len, report_user_len=0, report_password_len=0;
 
1099
 
 
1100
  *suppress_warnings= false;
 
1101
  if (!report_host)
 
1102
    return(0);
 
1103
  report_host_len= strlen(report_host);
 
1104
  if (report_user)
 
1105
    report_user_len= strlen(report_user);
 
1106
  if (report_password)
 
1107
    report_password_len= strlen(report_password);
 
1108
  /* 30 is a good safety margin */
 
1109
  if (report_host_len + report_user_len + report_password_len + 30 >
 
1110
      sizeof(buf))
 
1111
    return(0);                                     // safety
 
1112
 
 
1113
  int4store(pos, server_id); pos+= 4;
 
1114
  pos= net_store_data(pos, (uchar*) report_host, report_host_len);
 
1115
  pos= net_store_data(pos, (uchar*) report_user, report_user_len);
 
1116
  pos= net_store_data(pos, (uchar*) report_password, report_password_len);
 
1117
  int2store(pos, (uint16_t) report_port); pos+= 2;
 
1118
  int4store(pos, rpl_recovery_rank);    pos+= 4;
 
1119
  /* The master will fill in master_id */
 
1120
  int4store(pos, 0);                    pos+= 4;
 
1121
 
 
1122
  if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
 
1123
  {
 
1124
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1125
    {
 
1126
      *suppress_warnings= true;                 // Suppress reconnect warning
 
1127
    }
 
1128
    else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
 
1129
    {
 
1130
      char buf[256];
 
1131
      snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle), 
 
1132
               drizzle_errno(drizzle));
 
1133
      mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
 
1134
                 ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
 
1135
    }
 
1136
    return(1);
 
1137
  }
 
1138
  return(0);
 
1139
}
 
1140
 
 
1141
 
 
1142
bool show_master_info(THD* thd, Master_info* mi)
 
1143
{
 
1144
  // TODO: fix this for multi-master
 
1145
  List<Item> field_list;
 
1146
  Protocol *protocol= thd->protocol;
 
1147
 
 
1148
  field_list.push_back(new Item_empty_string("Slave_IO_State",
 
1149
                                                     14));
 
1150
  field_list.push_back(new Item_empty_string("Master_Host",
 
1151
                                                     sizeof(mi->host)));
 
1152
  field_list.push_back(new Item_empty_string("Master_User",
 
1153
                                                     sizeof(mi->user)));
 
1154
  field_list.push_back(new Item_return_int("Master_Port", 7,
 
1155
                                           DRIZZLE_TYPE_LONG));
 
1156
  field_list.push_back(new Item_return_int("Connect_Retry", 10,
 
1157
                                           DRIZZLE_TYPE_LONG));
 
1158
  field_list.push_back(new Item_empty_string("Master_Log_File",
 
1159
                                             FN_REFLEN));
 
1160
  field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
 
1161
                                           DRIZZLE_TYPE_LONGLONG));
 
1162
  field_list.push_back(new Item_empty_string("Relay_Log_File",
 
1163
                                             FN_REFLEN));
 
1164
  field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
 
1165
                                           DRIZZLE_TYPE_LONGLONG));
 
1166
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
 
1167
                                             FN_REFLEN));
 
1168
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
 
1169
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
 
1170
  field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
 
1171
  field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
 
1172
  field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
 
1173
  field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
 
1174
  field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
 
1175
  field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
 
1176
                                             28));
 
1177
  field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
 
1178
  field_list.push_back(new Item_empty_string("Last_Error", 20));
 
1179
  field_list.push_back(new Item_return_int("Skip_Counter", 10,
 
1180
                                           DRIZZLE_TYPE_LONG));
 
1181
  field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
 
1182
                                           DRIZZLE_TYPE_LONGLONG));
 
1183
  field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
 
1184
                                           DRIZZLE_TYPE_LONGLONG));
 
1185
  field_list.push_back(new Item_empty_string("Until_Condition", 6));
 
1186
  field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
 
1187
  field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
 
1188
                                           DRIZZLE_TYPE_LONGLONG));
 
1189
  field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
 
1190
  field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
 
1191
                                             sizeof(mi->ssl_ca)));
 
1192
  field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
 
1193
                                             sizeof(mi->ssl_capath)));
 
1194
  field_list.push_back(new Item_empty_string("Master_SSL_Cert",
 
1195
                                             sizeof(mi->ssl_cert)));
 
1196
  field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
 
1197
                                             sizeof(mi->ssl_cipher)));
 
1198
  field_list.push_back(new Item_empty_string("Master_SSL_Key",
 
1199
                                             sizeof(mi->ssl_key)));
 
1200
  field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
 
1201
                                           DRIZZLE_TYPE_LONGLONG));
 
1202
  field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
 
1203
                                             3));
 
1204
  field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
 
1205
  field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
 
1206
  field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
 
1207
  field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
 
1208
 
 
1209
  if (protocol->send_fields(&field_list,
 
1210
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1211
    return(true);
 
1212
 
 
1213
  if (mi->host[0])
 
1214
  {
 
1215
    String *packet= &thd->packet;
 
1216
    protocol->prepare_for_resend();
 
1217
 
 
1218
    /*
 
1219
      slave_running can be accessed without run_lock but not other
 
1220
      non-volotile members like mi->io_thd, which is guarded by the mutex.
 
1221
    */
 
1222
    pthread_mutex_lock(&mi->run_lock);
 
1223
    protocol->store(mi->io_thd ? mi->io_thd->get_proc_info() : "", &my_charset_bin);
 
1224
    pthread_mutex_unlock(&mi->run_lock);
 
1225
 
 
1226
    pthread_mutex_lock(&mi->data_lock);
 
1227
    pthread_mutex_lock(&mi->rli.data_lock);
 
1228
    protocol->store(mi->host, &my_charset_bin);
 
1229
    protocol->store(mi->user, &my_charset_bin);
 
1230
    protocol->store((uint32_t) mi->port);
 
1231
    protocol->store((uint32_t) mi->connect_retry);
 
1232
    protocol->store(mi->master_log_name, &my_charset_bin);
 
1233
    protocol->store((uint64_t) mi->master_log_pos);
 
1234
    protocol->store(mi->rli.group_relay_log_name +
 
1235
                    dirname_length(mi->rli.group_relay_log_name),
 
1236
                    &my_charset_bin);
 
1237
    protocol->store((uint64_t) mi->rli.group_relay_log_pos);
 
1238
    protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
 
1239
    protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
 
1240
                    "Yes" : "No", &my_charset_bin);
 
1241
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
 
1242
    protocol->store(rpl_filter->get_do_db());
 
1243
    protocol->store(rpl_filter->get_ignore_db());
 
1244
 
 
1245
    char buf[256];
 
1246
    String tmp(buf, sizeof(buf), &my_charset_bin);
 
1247
    rpl_filter->get_do_table(&tmp);
 
1248
    protocol->store(&tmp);
 
1249
    rpl_filter->get_ignore_table(&tmp);
 
1250
    protocol->store(&tmp);
 
1251
    rpl_filter->get_wild_do_table(&tmp);
 
1252
    protocol->store(&tmp);
 
1253
    rpl_filter->get_wild_ignore_table(&tmp);
 
1254
    protocol->store(&tmp);
 
1255
 
 
1256
    protocol->store(mi->rli.last_error().number);
 
1257
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
 
1258
    protocol->store((uint32_t) mi->rli.slave_skip_counter);
 
1259
    protocol->store((uint64_t) mi->rli.group_master_log_pos);
 
1260
    protocol->store((uint64_t) mi->rli.log_space_total);
 
1261
 
 
1262
    protocol->store(
 
1263
      mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
 
1264
        ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
 
1265
          "Relay"), &my_charset_bin);
 
1266
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
 
1267
    protocol->store((uint64_t) mi->rli.until_log_pos);
 
1268
 
 
1269
    protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
 
1270
    protocol->store(mi->ssl_ca, &my_charset_bin);
 
1271
    protocol->store(mi->ssl_capath, &my_charset_bin);
 
1272
    protocol->store(mi->ssl_cert, &my_charset_bin);
 
1273
    protocol->store(mi->ssl_cipher, &my_charset_bin);
 
1274
    protocol->store(mi->ssl_key, &my_charset_bin);
 
1275
 
 
1276
    /*
 
1277
      Seconds_Behind_Master: if SQL thread is running and I/O thread is
 
1278
      connected, we can compute it otherwise show NULL (i.e. unknown).
 
1279
    */
 
1280
    if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
 
1281
        mi->rli.slave_running)
 
1282
    {
 
1283
      long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
 
1284
                       - mi->clock_diff_with_master);
 
1285
      /*
 
1286
        Apparently on some systems time_diff can be <0. Here are possible
 
1287
        reasons related to MySQL:
 
1288
        - the master is itself a slave of another master whose time is ahead.
 
1289
        - somebody used an explicit SET TIMESTAMP on the master.
 
1290
        Possible reason related to granularity-to-second of time functions
 
1291
        (nothing to do with MySQL), which can explain a value of -1:
 
1292
        assume the master's and slave's time are perfectly synchronized, and
 
1293
        that at slave's connection time, when the master's timestamp is read,
 
1294
        it is at the very end of second 1, and (a very short time later) when
 
1295
        the slave's timestamp is read it is at the very beginning of second
 
1296
        2. Then the recorded value for master is 1 and the recorded value for
 
1297
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
 
1298
        between timestamp of slave and rli->last_master_timestamp is 0
 
1299
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
 
1300
        This confuses users, so we don't go below 0: hence the max().
 
1301
 
 
1302
        last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
 
1303
        special marker to say "consider we have caught up".
 
1304
      */
 
1305
      protocol->store((int64_t)(mi->rli.last_master_timestamp ?
 
1306
                                 max((long)0, time_diff) : 0));
 
1307
    }
 
1308
    else
 
1309
    {
 
1310
      protocol->store_null();
 
1311
    }
 
1312
    protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
 
1313
 
 
1314
    // Last_IO_Errno
 
1315
    protocol->store(mi->last_error().number);
 
1316
    // Last_IO_Error
 
1317
    protocol->store(mi->last_error().message, &my_charset_bin);
 
1318
    // Last_SQL_Errno
 
1319
    protocol->store(mi->rli.last_error().number);
 
1320
    // Last_SQL_Error
 
1321
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
 
1322
 
 
1323
    pthread_mutex_unlock(&mi->rli.data_lock);
 
1324
    pthread_mutex_unlock(&mi->data_lock);
 
1325
 
 
1326
    if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
 
1327
      return(true);
 
1328
  }
 
1329
  my_eof(thd);
 
1330
  return(false);
 
1331
}
 
1332
 
 
1333
 
 
1334
void set_slave_thread_options(THD* thd)
 
1335
{
 
1336
  /*
 
1337
     It's nonsense to constrain the slave threads with max_join_size; if a
 
1338
     query succeeded on master, we HAVE to execute it. So set
 
1339
     OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
 
1340
     (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
 
1341
     SELECT examining more than 4 billion rows would still fail (yes, because
 
1342
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
 
1343
     only for client threads.
 
1344
  */
 
1345
  uint64_t options= thd->options | OPTION_BIG_SELECTS;
 
1346
  if (opt_log_slave_updates)
 
1347
    options|= OPTION_BIN_LOG;
 
1348
  else
 
1349
    options&= ~OPTION_BIN_LOG;
 
1350
  thd->options= options;
 
1351
  thd->variables.completion_type= 0;
 
1352
  return;
 
1353
}
 
1354
 
 
1355
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
 
1356
{
 
1357
  thd->variables.character_set_client=
 
1358
    global_system_variables.character_set_client;
 
1359
  thd->variables.collation_connection=
 
1360
    global_system_variables.collation_connection;
 
1361
  thd->variables.collation_server=
 
1362
    global_system_variables.collation_server;
 
1363
  thd->update_charset();
 
1364
 
 
1365
  /*
 
1366
    We use a const cast here since the conceptual (and externally
 
1367
    visible) behavior of the function is to set the default charset of
 
1368
    the thread.  That the cache has to be invalidated is a secondary
 
1369
    effect.
 
1370
   */
 
1371
  const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
 
1372
  return;
 
1373
}
 
1374
 
 
1375
/*
 
1376
  init_slave_thread()
 
1377
*/
 
1378
 
 
1379
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
 
1380
{
 
1381
  int32_t simulate_error= 0;
 
1382
  thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
 
1383
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
 
1384
  thd->security_ctx->skip_grants();
 
1385
  my_net_init(&thd->net, 0);
 
1386
/*
 
1387
  Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
 
1388
  slave threads, since a replication event can become this much larger
 
1389
  than the corresponding packet (query) sent from client to master.
 
1390
*/
 
1391
  thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
 
1392
    + MAX_LOG_EVENT_HEADER;  /* note, incr over the global not session var */
 
1393
  thd->slave_thread = 1;
 
1394
  thd->enable_slow_log= opt_log_slow_slave_statements;
 
1395
  set_slave_thread_options(thd);
 
1396
  thd->client_capabilities = CLIENT_LOCAL_FILES;
 
1397
  pthread_mutex_lock(&LOCK_thread_count);
 
1398
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
 
1399
  pthread_mutex_unlock(&LOCK_thread_count);
 
1400
 
 
1401
 simulate_error|= (1 << SLAVE_THD_IO);
 
1402
 simulate_error|= (1 << SLAVE_THD_SQL);
 
1403
  if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
 
1404
  {
 
1405
    thd->cleanup();
 
1406
    return(-1);
 
1407
  }
 
1408
  lex_start(thd);
 
1409
 
 
1410
  if (thd_type == SLAVE_THD_SQL)
 
1411
    thd_proc_info(thd, "Waiting for the next event in relay log");
 
1412
  else
 
1413
    thd_proc_info(thd, "Waiting for master update");
 
1414
  thd->version=refresh_version;
 
1415
  thd->set_time();
 
1416
  return(0);
 
1417
}
 
1418
 
 
1419
 
 
1420
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
1421
                      void* thread_killed_arg)
 
1422
{
 
1423
  int32_t nap_time;
 
1424
  thr_alarm_t alarmed;
 
1425
 
 
1426
  thr_alarm_init(&alarmed);
 
1427
  time_t start_time= my_time(0);
 
1428
  time_t end_time= start_time+sec;
 
1429
 
 
1430
  while ((nap_time= (int32_t) (end_time - start_time)) > 0)
 
1431
  {
 
1432
    ALARM alarm_buff;
 
1433
    /*
 
1434
      The only reason we are asking for alarm is so that
 
1435
      we will be woken up in case of murder, so if we do not get killed,
 
1436
      set the alarm so it goes off after we wake up naturally
 
1437
    */
 
1438
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
 
1439
    sleep(nap_time);
 
1440
    thr_end_alarm(&alarmed);
 
1441
 
 
1442
    if ((*thread_killed)(thd,thread_killed_arg))
 
1443
      return(1);
 
1444
    start_time= my_time(0);
 
1445
  }
 
1446
  return(0);
 
1447
}
 
1448
 
 
1449
 
 
1450
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
 
1451
                        bool *suppress_warnings)
 
1452
{
 
1453
  uchar buf[FN_REFLEN + 10];
 
1454
  int32_t len;
 
1455
  int32_t binlog_flags = 0; // for now
 
1456
  char* logname = mi->master_log_name;
 
1457
  
 
1458
  *suppress_warnings= false;
 
1459
 
 
1460
  // TODO if big log files: Change next to int8store()
 
1461
  int4store(buf, (uint32_t) mi->master_log_pos);
 
1462
  int2store(buf + 4, binlog_flags);
 
1463
  int4store(buf + 6, server_id);
 
1464
  len = (uint32_t) strlen(logname);
 
1465
  memcpy(buf + 10, logname,len);
 
1466
  if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
 
1467
  {
 
1468
    /*
 
1469
      Something went wrong, so we will just reconnect and retry later
 
1470
      in the future, we should do a better error analysis, but for
 
1471
      now we just fill up the error log :-)
 
1472
    */
 
1473
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1474
      *suppress_warnings= true;                 // Suppress reconnect warning
 
1475
    else
 
1476
      sql_print_error(_("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs"),
 
1477
                      drizzle_errno(drizzle), drizzle_error(drizzle),
 
1478
                      mi->connect_retry);
 
1479
    return(1);
 
1480
  }
 
1481
 
 
1482
  return(0);
 
1483
}
 
1484
 
 
1485
/*
 
1486
  Read one event from the master
 
1487
 
 
1488
  SYNOPSIS
 
1489
    read_event()
 
1490
    DRIZZLE               DRIZZLE connection
 
1491
    mi                  Master connection information
 
1492
    suppress_warnings   TRUE when a normal net read timeout has caused us to
 
1493
                        try a reconnect.  We do not want to print anything to
 
1494
                        the error log in this case because this a anormal
 
1495
                        event in an idle server.
 
1496
 
 
1497
    RETURN VALUES
 
1498
    'packet_error'      Error
 
1499
    number              Length of packet
 
1500
*/
 
1501
 
 
1502
static uint32_t read_event(DRIZZLE *drizzle,
 
1503
                        Master_info *mi __attribute__((unused)),
 
1504
                        bool* suppress_warnings)
 
1505
{
 
1506
  uint32_t len;
 
1507
 
 
1508
  *suppress_warnings= false;
 
1509
  /*
 
1510
    my_real_read() will time us out
 
1511
    We check if we were told to die, and if not, try reading again
 
1512
  */
 
1513
  if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
 
1514
    return(packet_error);
 
1515
 
 
1516
  len = cli_safe_read(drizzle);
 
1517
  if (len == packet_error || (int32_t) len < 1)
 
1518
  {
 
1519
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1520
    {
 
1521
      /*
 
1522
        We are trying a normal reconnect after a read timeout;
 
1523
        we suppress prints to .err file as long as the reconnect
 
1524
        happens without problems
 
1525
      */
 
1526
      *suppress_warnings= true;
 
1527
    }
 
1528
    else
 
1529
      sql_print_error(_("Error reading packet from server: %s ( server_errno=%d)"),
 
1530
                      drizzle_error(drizzle), drizzle_errno(drizzle));
 
1531
    return(packet_error);
 
1532
  }
 
1533
 
 
1534
  /* Check if eof packet */
 
1535
  if (len < 8 && drizzle->net.read_pos[0] == 254)
 
1536
  {
 
1537
    sql_print_information(_("Slave: received end packet from server, apparent "
 
1538
                            "master shutdown: %s"),
 
1539
                     drizzle_error(drizzle));
 
1540
     return(packet_error);
 
1541
  }
 
1542
 
 
1543
  return(len - 1);
 
1544
}
 
1545
 
 
1546
 
 
1547
int32_t check_expected_error(THD* thd __attribute__((unused)),
 
1548
                         Relay_log_info const *rli __attribute__((unused)),
 
1549
                         int32_t expected_error)
 
1550
{
 
1551
  switch (expected_error) {
 
1552
  case ER_NET_READ_ERROR:
 
1553
  case ER_NET_ERROR_ON_WRITE:
 
1554
  case ER_QUERY_INTERRUPTED:
 
1555
  case ER_SERVER_SHUTDOWN:
 
1556
  case ER_NEW_ABORTING_CONNECTION:
 
1557
    return(1);
 
1558
  default:
 
1559
    return(0);
 
1560
  }
 
1561
}
 
1562
 
 
1563
 
 
1564
/*
 
1565
  Check if the current error is of temporary nature of not.
 
1566
  Some errors are temporary in nature, such as
 
1567
  ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT.  Ndb also signals
 
1568
  that the error is temporary by pushing a warning with the error code
 
1569
  ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
 
1570
*/
 
1571
static int32_t has_temporary_error(THD *thd)
 
1572
{
 
1573
  if (thd->is_fatal_error)
 
1574
    return(0);
 
1575
 
 
1576
  if (thd->main_da.is_error())
 
1577
  {
 
1578
    thd->clear_error();
 
1579
    my_error(ER_LOCK_DEADLOCK, MYF(0));
 
1580
  }
 
1581
 
 
1582
  /*
 
1583
    If there is no message in THD, we can't say if it's a temporary
 
1584
    error or not. This is currently the case for Incident_log_event,
 
1585
    which sets no message. Return FALSE.
 
1586
  */
 
1587
  if (!thd->is_error())
 
1588
    return(0);
 
1589
 
 
1590
  /*
 
1591
    Temporary error codes:
 
1592
    currently, InnoDB deadlock detected by InnoDB or lock
 
1593
    wait timeout (innodb_lock_wait_timeout exceeded
 
1594
  */
 
1595
  if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
 
1596
      thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
 
1597
    return(1);
 
1598
 
 
1599
  return(0);
 
1600
}
 
1601
 
 
1602
 
 
1603
/**
 
1604
  Applies the given event and advances the relay log position.
 
1605
 
 
1606
  In essence, this function does:
 
1607
 
 
1608
  @code
 
1609
    ev->apply_event(rli);
 
1610
    ev->update_pos(rli);
 
1611
  @endcode
 
1612
 
 
1613
  But it also does some maintainance, such as skipping events if
 
1614
  needed and reporting errors.
 
1615
 
 
1616
  If the @c skip flag is set, then it is tested whether the event
 
1617
  should be skipped, by looking at the slave_skip_counter and the
 
1618
  server id.  The skip flag should be set when calling this from a
 
1619
  replication thread but not set when executing an explicit BINLOG
 
1620
  statement.
 
1621
 
 
1622
  @retval 0 OK.
 
1623
 
 
1624
  @retval 1 Error calling ev->apply_event().
 
1625
 
 
1626
  @retval 2 No error calling ev->apply_event(), but error calling
 
1627
  ev->update_pos().
 
1628
*/
 
1629
int32_t apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
 
1630
                               bool skip)
 
1631
{
 
1632
  int32_t exec_res= 0;
 
1633
 
 
1634
  /*
 
1635
    Execute the event to change the database and update the binary
 
1636
    log coordinates, but first we set some data that is needed for
 
1637
    the thread.
 
1638
 
 
1639
    The event will be executed unless it is supposed to be skipped.
 
1640
 
 
1641
    Queries originating from this server must be skipped.  Low-level
 
1642
    events (Format_description_log_event, Rotate_log_event,
 
1643
    Stop_log_event) from this server must also be skipped. But for
 
1644
    those we don't want to modify 'group_master_log_pos', because
 
1645
    these events did not exist on the master.
 
1646
    Format_description_log_event is not completely skipped.
 
1647
 
 
1648
    Skip queries specified by the user in 'slave_skip_counter'.  We
 
1649
    can't however skip events that has something to do with the log
 
1650
    files themselves.
 
1651
 
 
1652
    Filtering on own server id is extremely important, to ignore
 
1653
    execution of events created by the creation/rotation of the relay
 
1654
    log (remember that now the relay log starts with its Format_desc,
 
1655
    has a Rotate etc).
 
1656
  */
 
1657
 
 
1658
  thd->server_id = ev->server_id; // use the original server id for logging
 
1659
  thd->set_time();                            // time the query
 
1660
  thd->lex->current_select= 0;
 
1661
  if (!ev->when)
 
1662
    ev->when= my_time(0);
 
1663
  ev->thd = thd; // because up to this point, ev->thd == 0
 
1664
 
 
1665
  if (skip)
 
1666
  {
 
1667
    int32_t reason= ev->shall_skip(rli);
 
1668
    if (reason == Log_event::EVENT_SKIP_COUNT)
 
1669
      --rli->slave_skip_counter;
 
1670
    pthread_mutex_unlock(&rli->data_lock);
 
1671
    if (reason == Log_event::EVENT_SKIP_NOT)
 
1672
      exec_res= ev->apply_event(rli);
 
1673
  }
 
1674
  else
 
1675
    exec_res= ev->apply_event(rli);
 
1676
 
 
1677
  if (exec_res == 0)
 
1678
  {
 
1679
    int32_t error= ev->update_pos(rli);
 
1680
    /*
 
1681
      The update should not fail, so print an error message and
 
1682
      return an error code.
 
1683
 
 
1684
      TODO: Replace this with a decent error message when merged
 
1685
      with BUG#24954 (which adds several new error message).
 
1686
    */
 
1687
    if (error)
 
1688
    {
 
1689
      char buf[22];
 
1690
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
 
1691
                  _("It was not possible to update the positions"
 
1692
                  " of the relay log information: the slave may"
 
1693
                  " be in an inconsistent state."
 
1694
                  " Stopped in %s position %s"),
 
1695
                  rli->group_relay_log_name,
 
1696
                  llstr(rli->group_relay_log_pos, buf));
 
1697
      return(2);
 
1698
    }
 
1699
  }
 
1700
 
 
1701
  return(exec_res ? 1 : 0);
 
1702
}
 
1703
 
 
1704
 
 
1705
/**
 
1706
  Top-level function for executing the next event from the relay log.
 
1707
 
 
1708
  This function reads the event from the relay log, executes it, and
 
1709
  advances the relay log position.  It also handles errors, etc.
 
1710
 
 
1711
  This function may fail to apply the event for the following reasons:
 
1712
 
 
1713
   - The position specfied by the UNTIL condition of the START SLAVE
 
1714
     command is reached.
 
1715
 
 
1716
   - It was not possible to read the event from the log.
 
1717
 
 
1718
   - The slave is killed.
 
1719
 
 
1720
   - An error occurred when applying the event, and the event has been
 
1721
     tried slave_trans_retries times.  If the event has been retried
 
1722
     fewer times, 0 is returned.
 
1723
 
 
1724
   - init_master_info or init_relay_log_pos failed. (These are called
 
1725
     if a failure occurs when applying the event.)</li>
 
1726
 
 
1727
   - An error occurred when updating the binlog position.
 
1728
 
 
1729
  @retval 0 The event was applied.
 
1730
 
 
1731
  @retval 1 The event was not applied.
 
1732
*/
 
1733
static int32_t exec_relay_log_event(THD* thd, Relay_log_info* rli)
 
1734
{
 
1735
  /*
 
1736
     We acquire this mutex since we need it for all operations except
 
1737
     event execution. But we will release it in places where we will
 
1738
     wait for something for example inside of next_event().
 
1739
   */
 
1740
  pthread_mutex_lock(&rli->data_lock);
 
1741
 
 
1742
  Log_event * ev = next_event(rli);
 
1743
 
 
1744
  assert(rli->sql_thd==thd);
 
1745
 
 
1746
  if (sql_slave_killed(thd,rli))
 
1747
  {
 
1748
    pthread_mutex_unlock(&rli->data_lock);
 
1749
    delete ev;
 
1750
    return(1);
 
1751
  }
 
1752
  if (ev)
 
1753
  {
 
1754
    int32_t exec_res;
 
1755
 
 
1756
    /*
 
1757
      This tests if the position of the beginning of the current event
 
1758
      hits the UNTIL barrier.
 
1759
    */
 
1760
    if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
 
1761
        rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
 
1762
                                rli->group_master_log_pos :
 
1763
                                ev->log_pos - ev->data_written))
 
1764
    {
 
1765
      char buf[22];
 
1766
      sql_print_information(_("Slave SQL thread stopped because it reached its"
 
1767
                              " UNTIL position %s"),
 
1768
                            llstr(rli->until_pos(), buf));
 
1769
      /*
 
1770
        Setting abort_slave flag because we do not want additional message about
 
1771
        error in query execution to be printed.
 
1772
      */
 
1773
      rli->abort_slave= 1;
 
1774
      pthread_mutex_unlock(&rli->data_lock);
 
1775
      delete ev;
 
1776
      return(1);
 
1777
    }
 
1778
    exec_res= apply_event_and_update_pos(ev, thd, rli, true);
 
1779
 
 
1780
    /*
 
1781
      Format_description_log_event should not be deleted because it will be
 
1782
      used to read info about the relay log's format; it will be deleted when
 
1783
      the SQL thread does not need it, i.e. when this thread terminates.
 
1784
    */
 
1785
    if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
 
1786
    {
 
1787
      delete ev;
 
1788
    }
 
1789
 
 
1790
    /*
 
1791
      update_log_pos failed: this should not happen, so we don't
 
1792
      retry.
 
1793
    */
 
1794
    if (exec_res == 2)
 
1795
      return(1);
 
1796
 
 
1797
    if (slave_trans_retries)
 
1798
    {
 
1799
      int32_t temp_err= 0;
 
1800
      if (exec_res && (temp_err= has_temporary_error(thd)))
 
1801
      {
 
1802
        const char *errmsg;
 
1803
        /*
 
1804
          We were in a transaction which has been rolled back because of a
 
1805
          temporary error;
 
1806
          let's seek back to BEGIN log event and retry it all again.
 
1807
          Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
 
1808
          there is no rollback since 5.0.13 (ref: manual).
 
1809
          We have to not only seek but also
 
1810
          a) init_master_info(), to seek back to hot relay log's start for later
 
1811
          (for when we will come back to this hot log after re-processing the
 
1812
          possibly existing old logs where BEGIN is: check_binlog_magic() will
 
1813
          then need the cache to be at position 0 (see comments at beginning of
 
1814
          init_master_info()).
 
1815
          b) init_relay_log_pos(), because the BEGIN may be an older relay log.
 
1816
        */
 
1817
        if (rli->trans_retries < slave_trans_retries)
 
1818
        {
 
1819
          if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
 
1820
            sql_print_error(_("Failed to initialize the master info structure"));
 
1821
          else if (init_relay_log_pos(rli,
 
1822
                                      rli->group_relay_log_name,
 
1823
                                      rli->group_relay_log_pos,
 
1824
                                      1, &errmsg, 1))
 
1825
            sql_print_error(_("Error initializing relay log position: %s"),
 
1826
                            errmsg);
 
1827
          else
 
1828
          {
 
1829
            exec_res= 0;
 
1830
            end_trans(thd, ROLLBACK);
 
1831
            /* chance for concurrent connection to get more locks */
 
1832
            safe_sleep(thd, min(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
 
1833
                       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
 
1834
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
 
1835
            rli->trans_retries++;
 
1836
            rli->retried_trans++;
 
1837
            pthread_mutex_unlock(&rli->data_lock);
 
1838
          }
 
1839
        }
 
1840
        else
 
1841
          sql_print_error(_("Slave SQL thread retried transaction %lu time(s) "
 
1842
                            "in vain, giving up. Consider raising the value of "
 
1843
                            "the slave_transaction_retries variable."),
 
1844
                          slave_trans_retries);
 
1845
      }
 
1846
      else if ((exec_res && !temp_err) ||
 
1847
               (opt_using_transactions &&
 
1848
                rli->group_relay_log_pos == rli->event_relay_log_pos))
 
1849
      {
 
1850
        /*
 
1851
          Only reset the retry counter if the entire group succeeded
 
1852
          or failed with a non-transient error.  On a successful
 
1853
          event, the execution will proceed as usual; in the case of a
 
1854
          non-transient error, the slave will stop with an error.
 
1855
         */
 
1856
        rli->trans_retries= 0; // restart from fresh
 
1857
      }
 
1858
    }
 
1859
    return(exec_res);
 
1860
  }
 
1861
  pthread_mutex_unlock(&rli->data_lock);
 
1862
  rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
 
1863
              ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
 
1864
              _("Could not parse relay log event entry. The possible reasons "
 
1865
                "are: the master's binary log is corrupted (you can check this "
 
1866
                "by running 'mysqlbinlog' on the binary log), the slave's "
 
1867
                "relay log is corrupted (you can check this by running "
 
1868
                "'mysqlbinlog' on the relay log), a network problem, or a bug "
 
1869
                "in the master's or slave's DRIZZLE code. If you want to check "
 
1870
                "the master's binary log or slave's relay log, you will be "
 
1871
                "able to know their names by issuing 'SHOW SLAVE STATUS' "
 
1872
                "on this slave."));
 
1873
  return(1);
 
1874
}
 
1875
 
 
1876
 
 
1877
/**
 
1878
  @brief Try to reconnect slave IO thread.
 
1879
 
 
1880
  @details Terminates current connection to master, sleeps for
 
1881
  @c mi->connect_retry msecs and initiates new connection with
 
1882
  @c safe_reconnect(). Variable pointed by @c retry_count is increased -
 
1883
  if it exceeds @c master_retry_count then connection is not re-established
 
1884
  and function signals error.
 
1885
  Unless @c suppres_warnings is TRUE, a warning is put in the server error log
 
1886
  when reconnecting. The warning message and messages used to report errors
 
1887
  are taken from @c messages array. In case @c master_retry_count is exceeded,
 
1888
  no messages are added to the log.
 
1889
 
 
1890
  @param[in]     thd                 Thread context.
 
1891
  @param[in]     DRIZZLE               DRIZZLE connection.
 
1892
  @param[in]     mi                  Master connection information.
 
1893
  @param[in,out] retry_count         Number of attempts to reconnect.
 
1894
  @param[in]     suppress_warnings   TRUE when a normal net read timeout 
 
1895
                                     has caused to reconnecting.
 
1896
  @param[in]     messages            Messages to print/log, see 
 
1897
                                     reconnect_messages[] array.
 
1898
 
 
1899
  @retval        0                   OK.
 
1900
  @retval        1                   There was an error.
 
1901
*/
 
1902
 
 
1903
static int32_t try_to_reconnect(THD *thd, DRIZZLE *drizzle, Master_info *mi,
 
1904
                            uint32_t *retry_count, bool suppress_warnings,
 
1905
                            const char *messages[SLAVE_RECON_MSG_MAX])
 
1906
{
 
1907
  mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
 
1908
  thd->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
 
1909
  end_server(drizzle);
 
1910
  if ((*retry_count)++)
 
1911
  {
 
1912
    if (*retry_count > master_retry_count)
 
1913
      return 1;                             // Don't retry forever
 
1914
    safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
 
1915
               (void *) mi);
 
1916
  }
 
1917
  if (check_io_slave_killed(thd, mi,
 
1918
                            _(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
 
1919
    return 1;
 
1920
  thd->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
 
1921
  if (!suppress_warnings)
 
1922
  {
 
1923
    char buf[256], llbuff[22];
 
1924
    snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
 
1925
             IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
 
1926
    /*
 
1927
      Raise a warining during registering on master/requesting dump.
 
1928
      Log a message reading event.
 
1929
    */
 
1930
    if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
 
1931
    {
 
1932
      mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
 
1933
                 ER(ER_SLAVE_MASTER_COM_FAILURE),
 
1934
                 _(messages[SLAVE_RECON_MSG_COMMAND]), buf);
 
1935
    }
 
1936
    else
 
1937
    {
 
1938
      sql_print_information(buf);
 
1939
    }
 
1940
  }
 
1941
  if (safe_reconnect(thd, drizzle, mi, 1) || io_slave_killed(thd, mi))
 
1942
  {
 
1943
    if (global_system_variables.log_warnings)
 
1944
      sql_print_information(_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
 
1945
    return 1;
 
1946
  }
 
1947
  return 0;
 
1948
}
 
1949
 
 
1950
 
 
1951
/* Slave I/O Thread entry point */
 
1952
 
 
1953
pthread_handler_t handle_slave_io(void *arg)
 
1954
{
 
1955
  THD *thd; // needs to be first for thread_stack
 
1956
  DRIZZLE *drizzle;
 
1957
  Master_info *mi = (Master_info*)arg;
 
1958
  Relay_log_info *rli= &mi->rli;
 
1959
  char llbuff[22];
 
1960
  uint32_t retry_count;
 
1961
  bool suppress_warnings;
 
1962
  uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
 
1963
  my_thread_init();
 
1964
 
 
1965
  assert(mi->inited);
 
1966
  drizzle= NULL ;
 
1967
  retry_count= 0;
 
1968
 
 
1969
  pthread_mutex_lock(&mi->run_lock);
 
1970
  /* Inform waiting threads that slave has started */
 
1971
  mi->slave_run_id++;
 
1972
 
 
1973
  mi->events_till_disconnect = disconnect_slave_event_count;
 
1974
 
 
1975
  thd= new THD;
 
1976
  THD_CHECK_SENTRY(thd);
 
1977
  mi->io_thd = thd;
 
1978
 
 
1979
  pthread_detach_this_thread();
 
1980
  thd->thread_stack= (char*) &thd; // remember where our stack is
 
1981
  if (init_slave_thread(thd, SLAVE_THD_IO))
 
1982
  {
 
1983
    pthread_cond_broadcast(&mi->start_cond);
 
1984
    pthread_mutex_unlock(&mi->run_lock);
 
1985
    sql_print_error(_("Failed during slave I/O thread initialization"));
 
1986
    goto err;
 
1987
  }
 
1988
  pthread_mutex_lock(&LOCK_thread_count);
 
1989
  threads.append(thd);
 
1990
  pthread_mutex_unlock(&LOCK_thread_count);
 
1991
  mi->slave_running = 1;
 
1992
  mi->abort_slave = 0;
 
1993
  pthread_mutex_unlock(&mi->run_lock);
 
1994
  pthread_cond_broadcast(&mi->start_cond);
 
1995
 
 
1996
  if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
 
1997
  {
 
1998
    mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
 
1999
               ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
 
2000
    goto err;
 
2001
  }
 
2002
 
 
2003
  thd_proc_info(thd, "Connecting to master");
 
2004
  // we can get killed during safe_connect
 
2005
  if (!safe_connect(thd, drizzle, mi))
 
2006
  {
 
2007
    sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
 
2008
                            "replication started in log '%s' at position %s"),
 
2009
                          mi->user, mi->host, mi->port,
 
2010
                          IO_RPL_LOG_NAME,
 
2011
                          llstr(mi->master_log_pos,llbuff));
 
2012
  /*
 
2013
    Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
 
2014
    thread, since a replication event can become this much larger than
 
2015
    the corresponding packet (query) sent from client to master.
 
2016
  */
 
2017
    drizzle->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
 
2018
  }
 
2019
  else
 
2020
  {
 
2021
    sql_print_information(_("Slave I/O thread killed while connecting to master"));
 
2022
    goto err;
 
2023
  }
 
2024
 
 
2025
connected:
 
2026
 
 
2027
  // TODO: the assignment below should be under mutex (5.0)
 
2028
  mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
 
2029
  thd->slave_net = &drizzle->net;
 
2030
  thd_proc_info(thd, "Checking master version");
 
2031
  if (get_master_version_and_clock(drizzle, mi))
 
2032
    goto err;
 
2033
  
 
2034
  if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
 
2035
  {
 
2036
    /*
 
2037
      Register ourselves with the master.
 
2038
    */
 
2039
    thd_proc_info(thd, "Registering slave on master");
 
2040
    if (register_slave_on_master(drizzle, mi, &suppress_warnings))
 
2041
    {
 
2042
      if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
 
2043
                                 "while registering slave on master"))
 
2044
      {
 
2045
        sql_print_error(_("Slave I/O thread couldn't register on master"));
 
2046
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2047
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
 
2048
          goto err;
 
2049
      }
 
2050
      else
 
2051
        goto err;
 
2052
      goto connected;
 
2053
    }
 
2054
    if (!retry_count_reg)
 
2055
    {
 
2056
      retry_count_reg++;
 
2057
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2058
      if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2059
                           reconnect_messages[SLAVE_RECON_ACT_REG]))
 
2060
        goto err;
 
2061
      goto connected;
 
2062
    }
 
2063
  }
 
2064
 
 
2065
  while (!io_slave_killed(thd,mi))
 
2066
  {
 
2067
    thd_proc_info(thd, "Requesting binlog dump");
 
2068
    if (request_dump(drizzle, mi, &suppress_warnings))
 
2069
    {
 
2070
      sql_print_error(_("Failed on request_dump()"));
 
2071
      if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while \
 
2072
requesting master dump")) ||
 
2073
          try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2074
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
 
2075
        goto err;
 
2076
      goto connected;
 
2077
    }
 
2078
    if (!retry_count_dump)
 
2079
    {
 
2080
      retry_count_dump++;
 
2081
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2082
      if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2083
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
 
2084
        goto err;
 
2085
      goto connected;
 
2086
    }
 
2087
 
 
2088
    while (!io_slave_killed(thd,mi))
 
2089
    {
 
2090
      uint32_t event_len;
 
2091
      /*
 
2092
        We say "waiting" because read_event() will wait if there's nothing to
 
2093
        read. But if there's something to read, it will not wait. The
 
2094
        important thing is to not confuse users by saying "reading" whereas
 
2095
        we're in fact receiving nothing.
 
2096
      */
 
2097
      thd_proc_info(thd, _("Waiting for master to send event"));
 
2098
      event_len= read_event(drizzle, mi, &suppress_warnings);
 
2099
      if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while "
 
2100
                                           "reading event")))
 
2101
        goto err;
 
2102
      if (!retry_count_event)
 
2103
      {
 
2104
        retry_count_event++;
 
2105
        sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2106
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2107
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
 
2108
          goto err;
 
2109
        goto connected;
 
2110
      }
 
2111
 
 
2112
      if (event_len == packet_error)
 
2113
      {
 
2114
        uint32_t drizzle_error_number= drizzle_errno(drizzle);
 
2115
        switch (drizzle_error_number) {
 
2116
        case CR_NET_PACKET_TOO_LARGE:
 
2117
          sql_print_error(_("Log entry on master is longer than "
 
2118
                            "max_allowed_packet (%ld) on "
 
2119
                            "slave. If the entry is correct, restart the "
 
2120
                            "server with a higher value of "
 
2121
                            "max_allowed_packet"),
 
2122
                          thd->variables.max_allowed_packet);
 
2123
          goto err;
 
2124
        case ER_MASTER_FATAL_ERROR_READING_BINLOG:
 
2125
          sql_print_error(ER(drizzle_error_number), drizzle_error_number,
 
2126
                          drizzle_error(drizzle));
 
2127
          goto err;
 
2128
        case EE_OUTOFMEMORY:
 
2129
        case ER_OUTOFMEMORY:
 
2130
          sql_print_error(
 
2131
       _("Stopping slave I/O thread due to out-of-memory error from master"));
 
2132
          goto err;
 
2133
        }
 
2134
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2135
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
 
2136
          goto err;
 
2137
        goto connected;
 
2138
      } // if (event_len == packet_error)
 
2139
 
 
2140
      retry_count=0;                    // ok event, reset retry counter
 
2141
      thd_proc_info(thd, _("Queueing master event to the relay log"));
 
2142
      if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
 
2143
      {
 
2144
        goto err;
 
2145
      }
 
2146
      if (flush_master_info(mi, 1))
 
2147
      {
 
2148
        sql_print_error(_("Failed to flush master info file"));
 
2149
        goto err;
 
2150
      }
 
2151
      /*
 
2152
        See if the relay logs take too much space.
 
2153
        We don't lock mi->rli.log_space_lock here; this dirty read saves time
 
2154
        and does not introduce any problem:
 
2155
        - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
 
2156
        the clean value is 0), then we are reading only one more event as we
 
2157
        should, and we'll block only at the next event. No big deal.
 
2158
        - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
 
2159
        the clean value is 1), then we are going into wait_for_relay_log_space()
 
2160
        for no reason, but this function will do a clean read, notice the clean
 
2161
        value and exit immediately.
 
2162
      */
 
2163
      if (rli->log_space_limit && rli->log_space_limit <
 
2164
          rli->log_space_total &&
 
2165
          !rli->ignore_log_space_limit)
 
2166
        if (wait_for_relay_log_space(rli))
 
2167
        {
 
2168
          sql_print_error(_("Slave I/O thread aborted while waiting for "
 
2169
                            "relay log space"));
 
2170
          goto err;
 
2171
        }
 
2172
    }
 
2173
  }
 
2174
 
 
2175
// error = 0;
 
2176
err:
 
2177
// print the current replication position
 
2178
  sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
 
2179
                          "position %s"),
 
2180
                        IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
 
2181
  VOID(pthread_mutex_lock(&LOCK_thread_count));
 
2182
  thd->query = thd->db = 0; // extra safety
 
2183
  thd->query_length= thd->db_length= 0;
 
2184
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
 
2185
  if (drizzle)
 
2186
  {
 
2187
    /*
 
2188
      Here we need to clear the active VIO before closing the
 
2189
      connection with the master.  The reason is that THD::awake()
 
2190
      might be called from terminate_slave_thread() because somebody
 
2191
      issued a STOP SLAVE.  If that happends, the close_active_vio()
 
2192
      can be called in the middle of closing the VIO associated with
 
2193
      the 'mysql' object, causing a crash.
 
2194
    */
 
2195
    drizzle_close(drizzle);
 
2196
    mi->drizzle=0;
 
2197
  }
 
2198
  write_ignored_events_info_to_relay_log(thd, mi);
 
2199
  thd_proc_info(thd, _("Waiting for slave mutex on exit"));
 
2200
  pthread_mutex_lock(&mi->run_lock);
 
2201
 
 
2202
  /* Forget the relay log's format */
 
2203
  delete mi->rli.relay_log.description_event_for_queue;
 
2204
  mi->rli.relay_log.description_event_for_queue= 0;
 
2205
  // TODO: make rpl_status part of Master_info
 
2206
  change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
 
2207
  assert(thd->net.buff != 0);
 
2208
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
 
2209
  close_thread_tables(thd);
 
2210
  pthread_mutex_lock(&LOCK_thread_count);
 
2211
  THD_CHECK_SENTRY(thd);
 
2212
  delete thd;
 
2213
  pthread_mutex_unlock(&LOCK_thread_count);
 
2214
  mi->abort_slave= 0;
 
2215
  mi->slave_running= 0;
 
2216
  mi->io_thd= 0;
 
2217
  /*
 
2218
    Note: the order of the two following calls (first broadcast, then unlock)
 
2219
    is important. Otherwise a killer_thread can execute between the calls and
 
2220
    delete the mi structure leading to a crash! (see BUG#25306 for details)
 
2221
   */ 
 
2222
  pthread_cond_broadcast(&mi->stop_cond);       // tell the world we are done
 
2223
  pthread_mutex_unlock(&mi->run_lock);
 
2224
  my_thread_end();
 
2225
  pthread_exit(0);
 
2226
  return(0);                               // Can't return anything here
 
2227
}
 
2228
 
 
2229
 
 
2230
/* Slave SQL Thread entry point */
 
2231
 
 
2232
pthread_handler_t handle_slave_sql(void *arg)
 
2233
{
 
2234
  THD *thd;                     /* needs to be first for thread_stack */
 
2235
  char llbuff[22],llbuff1[22];
 
2236
 
 
2237
  Relay_log_info* rli = &((Master_info*)arg)->rli;
 
2238
  const char *errmsg;
 
2239
 
 
2240
  my_thread_init();
 
2241
 
 
2242
  assert(rli->inited);
 
2243
  pthread_mutex_lock(&rli->run_lock);
 
2244
  assert(!rli->slave_running);
 
2245
  errmsg= 0;
 
2246
  rli->events_till_abort = abort_slave_event_count;
 
2247
 
 
2248
  thd = new THD;
 
2249
  thd->thread_stack = (char*)&thd; // remember where our stack is
 
2250
  rli->sql_thd= thd;
 
2251
  
 
2252
  /* Inform waiting threads that slave has started */
 
2253
  rli->slave_run_id++;
 
2254
  rli->slave_running = 1;
 
2255
 
 
2256
  pthread_detach_this_thread();
 
2257
  if (init_slave_thread(thd, SLAVE_THD_SQL))
 
2258
  {
 
2259
    /*
 
2260
      TODO: this is currently broken - slave start and change master
 
2261
      will be stuck if we fail here
 
2262
    */
 
2263
    pthread_cond_broadcast(&rli->start_cond);
 
2264
    pthread_mutex_unlock(&rli->run_lock);
 
2265
    sql_print_error(_("Failed during slave thread initialization"));
 
2266
    goto err;
 
2267
  }
 
2268
  thd->init_for_queries();
 
2269
  thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
 
2270
  pthread_mutex_lock(&LOCK_thread_count);
 
2271
  threads.append(thd);
 
2272
  pthread_mutex_unlock(&LOCK_thread_count);
 
2273
  /*
 
2274
    We are going to set slave_running to 1. Assuming slave I/O thread is
 
2275
    alive and connected, this is going to make Seconds_Behind_Master be 0
 
2276
    i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
 
2277
    the moment we start we can think we are caught up, and the next second we
 
2278
    start receiving data so we realize we are not caught up and
 
2279
    Seconds_Behind_Master grows. No big deal.
 
2280
  */
 
2281
  rli->abort_slave = 0;
 
2282
  pthread_mutex_unlock(&rli->run_lock);
 
2283
  pthread_cond_broadcast(&rli->start_cond);
 
2284
 
 
2285
  /*
 
2286
    Reset errors for a clean start (otherwise, if the master is idle, the SQL
 
2287
    thread may execute no Query_log_event, so the error will remain even
 
2288
    though there's no problem anymore). Do not reset the master timestamp
 
2289
    (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
 
2290
    as we are not sure that we are going to receive a query, we want to
 
2291
    remember the last master timestamp (to say how many seconds behind we are
 
2292
    now.
 
2293
    But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
 
2294
  */
 
2295
  rli->clear_error();
 
2296
 
 
2297
  //tell the I/O thread to take relay_log_space_limit into account from now on
 
2298
  pthread_mutex_lock(&rli->log_space_lock);
 
2299
  rli->ignore_log_space_limit= 0;
 
2300
  pthread_mutex_unlock(&rli->log_space_lock);
 
2301
  rli->trans_retries= 0; // start from "no error"
 
2302
 
 
2303
  if (init_relay_log_pos(rli,
 
2304
                         rli->group_relay_log_name,
 
2305
                         rli->group_relay_log_pos,
 
2306
                         1 /*need data lock*/, &errmsg,
 
2307
                         1 /*look for a description_event*/))
 
2308
  {
 
2309
    sql_print_error(_("Error initializing relay log position: %s"),
 
2310
                    errmsg);
 
2311
    goto err;
 
2312
  }
 
2313
  THD_CHECK_SENTRY(thd);
 
2314
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
 
2315
  /*
 
2316
    Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
 
2317
    correct position when it's called just after my_b_seek() (the questionable
 
2318
    stuff is those "seek is done on next read" comments in the my_b_seek()
 
2319
    source code).
 
2320
    The crude reality is that this assertion randomly fails whereas
 
2321
    replication seems to work fine. And there is no easy explanation why it
 
2322
    fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
 
2323
    init_relay_log_pos() called above). Maybe the assertion would be
 
2324
    meaningful if we held rli->data_lock between the my_b_seek() and the
 
2325
    assert().
 
2326
  */
 
2327
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
 
2328
  assert(rli->sql_thd == thd);
 
2329
 
 
2330
  if (global_system_variables.log_warnings)
 
2331
    sql_print_information(_("Slave SQL thread initialized, "
 
2332
                            "starting replication in log '%s' at "
 
2333
                            "position %s, relay log '%s' position: %s"),
 
2334
                            RPL_LOG_NAME,
 
2335
                          llstr(rli->group_master_log_pos,llbuff),
 
2336
                          rli->group_relay_log_name,
 
2337
                          llstr(rli->group_relay_log_pos,llbuff1));
 
2338
 
 
2339
  /* execute init_slave variable */
 
2340
  if (sys_init_slave.value_length)
 
2341
  {
 
2342
    execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
 
2343
    if (thd->is_slave_error)
 
2344
    {
 
2345
      sql_print_error(_("Slave SQL thread aborted. "
 
2346
                        "Can't execute init_slave query"));
 
2347
      goto err;
 
2348
    }
 
2349
  }
 
2350
 
 
2351
  /*
 
2352
    First check until condition - probably there is nothing to execute. We
 
2353
    do not want to wait for next event in this case.
 
2354
  */
 
2355
  pthread_mutex_lock(&rli->data_lock);
 
2356
  if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
 
2357
      rli->is_until_satisfied(rli->group_master_log_pos))
 
2358
  {
 
2359
    char buf[22];
 
2360
    sql_print_information(_("Slave SQL thread stopped because it reached its"
 
2361
                            " UNTIL position %s"), llstr(rli->until_pos(), buf));
 
2362
    pthread_mutex_unlock(&rli->data_lock);
 
2363
    goto err;
 
2364
  }
 
2365
  pthread_mutex_unlock(&rli->data_lock);
 
2366
 
 
2367
  /* Read queries from the IO/THREAD until this thread is killed */
 
2368
 
 
2369
  while (!sql_slave_killed(thd,rli))
 
2370
  {
 
2371
    thd_proc_info(thd, _("Reading event from the relay log"));
 
2372
    assert(rli->sql_thd == thd);
 
2373
    THD_CHECK_SENTRY(thd);
 
2374
    if (exec_relay_log_event(thd,rli))
 
2375
    {
 
2376
      // do not scare the user if SQL thread was simply killed or stopped
 
2377
      if (!sql_slave_killed(thd,rli))
 
2378
      {
 
2379
        /*
 
2380
          retrieve as much info as possible from the thd and, error
 
2381
          codes and warnings and print this to the error log as to
 
2382
          allow the user to locate the error
 
2383
        */
 
2384
        uint32_t const last_errno= rli->last_error().number;
 
2385
 
 
2386
        if (thd->is_error())
 
2387
        {
 
2388
          char const *const errmsg= thd->main_da.message();
 
2389
 
 
2390
          if (last_errno == 0)
 
2391
          {
 
2392
            rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), errmsg);
 
2393
          }
 
2394
          else if (last_errno != thd->main_da.sql_errno())
 
2395
          {
 
2396
            sql_print_error(_("Slave (additional info): %s Error_code: %d"),
 
2397
                            errmsg, thd->main_da.sql_errno());
 
2398
          }
 
2399
        }
 
2400
 
 
2401
        /* Print any warnings issued */
 
2402
        List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
 
2403
        DRIZZLE_ERROR *err;
 
2404
        /*
 
2405
          Added controlled slave thread cancel for replication
 
2406
          of user-defined variables.
 
2407
        */
 
2408
        bool udf_error = false;
 
2409
        while ((err= it++))
 
2410
        {
 
2411
          if (err->code == ER_CANT_OPEN_LIBRARY)
 
2412
            udf_error = true;
 
2413
          sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
 
2414
        }
 
2415
        if (udf_error)
 
2416
          sql_print_error(_("Error loading user-defined library, slave SQL "
 
2417
                            "thread aborted. Install the missing library, "
 
2418
                            "and restart the slave SQL thread with "
 
2419
                            "\"SLAVE START\". We stopped at log '%s' "
 
2420
                            "position %s"),
 
2421
                          RPL_LOG_NAME, llstr(rli->group_master_log_pos,
 
2422
            llbuff));
 
2423
        else
 
2424
          sql_print_error(_("Error running query, slave SQL thread aborted. "
 
2425
                            "Fix the problem, and restart "
 
2426
                            "the slave SQL thread with \"SLAVE START\". "
 
2427
                            "We stopped at log '%s' position %s"),
 
2428
                          RPL_LOG_NAME,
 
2429
                          llstr(rli->group_master_log_pos, llbuff));
 
2430
      }
 
2431
      goto err;
 
2432
    }
 
2433
  }
 
2434
 
 
2435
  /* Thread stopped. Print the current replication position to the log */
 
2436
  sql_print_information(_("Slave SQL thread exiting, replication stopped in "
 
2437
                          "log '%s' at position %s"),
 
2438
                        RPL_LOG_NAME,
 
2439
                        llstr(rli->group_master_log_pos,llbuff));
 
2440
 
 
2441
 err:
 
2442
 
 
2443
  /*
 
2444
    Some events set some playgrounds, which won't be cleared because thread
 
2445
    stops. Stopping of this thread may not be known to these events ("stop"
 
2446
    request is detected only by the present function, not by events), so we
 
2447
    must "proactively" clear playgrounds:
 
2448
  */
 
2449
  rli->cleanup_context(thd, 1);
 
2450
  VOID(pthread_mutex_lock(&LOCK_thread_count));
 
2451
  /*
 
2452
    Some extra safety, which should not been needed (normally, event deletion
 
2453
    should already have done these assignments (each event which sets these
 
2454
    variables is supposed to set them to 0 before terminating)).
 
2455
  */
 
2456
  thd->query= thd->db= thd->catalog= 0;
 
2457
  thd->query_length= thd->db_length= 0;
 
2458
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
 
2459
  thd_proc_info(thd, "Waiting for slave mutex on exit");
 
2460
  pthread_mutex_lock(&rli->run_lock);
 
2461
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
 
2462
  pthread_mutex_lock(&rli->data_lock);
 
2463
  assert(rli->slave_running == 1); // tracking buffer overrun
 
2464
  /* When master_pos_wait() wakes up it will check this and terminate */
 
2465
  rli->slave_running= 0;
 
2466
  /* Forget the relay log's format */
 
2467
  delete rli->relay_log.description_event_for_exec;
 
2468
  rli->relay_log.description_event_for_exec= 0;
 
2469
  /* Wake up master_pos_wait() */
 
2470
  pthread_mutex_unlock(&rli->data_lock);
 
2471
  pthread_cond_broadcast(&rli->data_cond);
 
2472
  rli->ignore_log_space_limit= 0; /* don't need any lock */
 
2473
  /* we die so won't remember charset - re-update them on next thread start */
 
2474
  rli->cached_charset_invalidate();
 
2475
  rli->save_temporary_tables = thd->temporary_tables;
 
2476
 
 
2477
  /*
 
2478
    TODO: see if we can do this conditionally in next_event() instead
 
2479
    to avoid unneeded position re-init
 
2480
  */
 
2481
  thd->temporary_tables = 0; // remove tempation from destructor to close them
 
2482
  assert(thd->net.buff != 0);
 
2483
  net_end(&thd->net); // destructor will not free it, because we are weird
 
2484
  assert(rli->sql_thd == thd);
 
2485
  THD_CHECK_SENTRY(thd);
 
2486
  rli->sql_thd= 0;
 
2487
  pthread_mutex_lock(&LOCK_thread_count);
 
2488
  THD_CHECK_SENTRY(thd);
 
2489
  delete thd;
 
2490
  pthread_mutex_unlock(&LOCK_thread_count);
 
2491
 /*
 
2492
  Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
 
2493
  is important. Otherwise a killer_thread can execute between the calls and
 
2494
  delete the mi structure leading to a crash! (see BUG#25306 for details)
 
2495
 */ 
 
2496
  pthread_cond_broadcast(&rli->stop_cond);
 
2497
  pthread_mutex_unlock(&rli->run_lock);  // tell the world we are done
 
2498
  
 
2499
  my_thread_end();
 
2500
  pthread_exit(0);
 
2501
  return(0);                               // Can't return anything here
 
2502
}
 
2503
 
 
2504
 
 
2505
/*
 
2506
  process_io_create_file()
 
2507
*/
 
2508
 
 
2509
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
 
2510
{
 
2511
  int32_t error = 1;
 
2512
  uint32_t num_bytes;
 
2513
  bool cev_not_written;
 
2514
  THD *thd = mi->io_thd;
 
2515
  NET *net = &mi->drizzle->net;
 
2516
 
 
2517
  if (unlikely(!cev->is_valid()))
 
2518
    return(1);
 
2519
 
 
2520
  if (!rpl_filter->db_ok(cev->db))
 
2521
  {
 
2522
    skip_load_data_infile(net);
 
2523
    return(0);
 
2524
  }
 
2525
  assert(cev->inited_from_old);
 
2526
  thd->file_id = cev->file_id = mi->file_id++;
 
2527
  thd->server_id = cev->server_id;
 
2528
  cev_not_written = 1;
 
2529
 
 
2530
  if (unlikely(net_request_file(net,cev->fname)))
 
2531
  {
 
2532
    sql_print_error(_("Slave I/O: failed requesting download of '%s'"),
 
2533
                    cev->fname);
 
2534
    goto err;
 
2535
  }
 
2536
 
 
2537
  /*
 
2538
    This dummy block is so we could instantiate Append_block_log_event
 
2539
    once and then modify it slightly instead of doing it multiple times
 
2540
    in the loop
 
2541
  */
 
2542
  {
 
2543
    Append_block_log_event aev(thd,0,0,0,0);
 
2544
 
 
2545
    for (;;)
 
2546
    {
 
2547
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
 
2548
      {
 
2549
        sql_print_error(_("Network read error downloading '%s' from master"),
 
2550
                        cev->fname);
 
2551
        goto err;
 
2552
      }
 
2553
      if (unlikely(!num_bytes)) /* eof */
 
2554
      {
 
2555
        /* 3.23 master wants it */
 
2556
        net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0);
 
2557
        /*
 
2558
          If we wrote Create_file_log_event, then we need to write
 
2559
          Execute_load_log_event. If we did not write Create_file_log_event,
 
2560
          then this is an empty file and we can just do as if the LOAD DATA
 
2561
          INFILE had not existed, i.e. write nothing.
 
2562
        */
 
2563
        if (unlikely(cev_not_written))
 
2564
          break;
 
2565
        Execute_load_log_event xev(thd,0,0);
 
2566
        xev.log_pos = cev->log_pos;
 
2567
        if (unlikely(mi->rli.relay_log.append(&xev)))
 
2568
        {
 
2569
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2570
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2571
                     _("error writing Exec_load event to relay log"));
 
2572
          goto err;
 
2573
        }
 
2574
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
 
2575
        break;
 
2576
      }
 
2577
      if (unlikely(cev_not_written))
 
2578
      {
 
2579
        cev->block = net->read_pos;
 
2580
        cev->block_len = num_bytes;
 
2581
        if (unlikely(mi->rli.relay_log.append(cev)))
 
2582
        {
 
2583
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2584
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2585
                     _("error writing Create_file event to relay log"));
 
2586
          goto err;
 
2587
        }
 
2588
        cev_not_written=0;
 
2589
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
 
2590
      }
 
2591
      else
 
2592
      {
 
2593
        aev.block = net->read_pos;
 
2594
        aev.block_len = num_bytes;
 
2595
        aev.log_pos = cev->log_pos;
 
2596
        if (unlikely(mi->rli.relay_log.append(&aev)))
 
2597
        {
 
2598
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2599
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2600
                     _("error writing Append_block event to relay log"));
 
2601
          goto err;
 
2602
        }
 
2603
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
 
2604
      }
 
2605
    }
 
2606
  }
 
2607
  error=0;
 
2608
err:
 
2609
  return(error);
 
2610
}
 
2611
 
 
2612
 
 
2613
/*
 
2614
  Start using a new binary log on the master
 
2615
 
 
2616
  SYNOPSIS
 
2617
    process_io_rotate()
 
2618
    mi                  master_info for the slave
 
2619
    rev                 The rotate log event read from the binary log
 
2620
 
 
2621
  DESCRIPTION
 
2622
    Updates the master info with the place in the next binary
 
2623
    log where we should start reading.
 
2624
    Rotate the relay log to avoid mixed-format relay logs.
 
2625
 
 
2626
  NOTES
 
2627
    We assume we already locked mi->data_lock
 
2628
 
 
2629
  RETURN VALUES
 
2630
    0           ok
 
2631
    1           Log event is illegal
 
2632
 
 
2633
*/
 
2634
 
 
2635
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
 
2636
{
 
2637
  safe_mutex_assert_owner(&mi->data_lock);
 
2638
 
 
2639
  if (unlikely(!rev->is_valid()))
 
2640
    return(1);
 
2641
 
 
2642
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
 
2643
  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
 
2644
  mi->master_log_pos= rev->pos;
 
2645
  /*
 
2646
    If we do not do this, we will be getting the first
 
2647
    rotate event forever, so we need to not disconnect after one.
 
2648
  */
 
2649
  if (disconnect_slave_event_count)
 
2650
    mi->events_till_disconnect++;
 
2651
 
 
2652
  /*
 
2653
    If description_event_for_queue is format <4, there is conversion in the
 
2654
    relay log to the slave's format (4). And Rotate can mean upgrade or
 
2655
    nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
 
2656
    no need to reset description_event_for_queue now. And if it's nothing (same
 
2657
    master version as before), no need (still using the slave's format).
 
2658
  */
 
2659
  if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
 
2660
  {
 
2661
    delete mi->rli.relay_log.description_event_for_queue;
 
2662
    /* start from format 3 (DRIZZLE 4.0) again */
 
2663
    mi->rli.relay_log.description_event_for_queue= new
 
2664
      Format_description_log_event(3);
 
2665
  }
 
2666
  /*
 
2667
    Rotate the relay log makes binlog format detection easier (at next slave
 
2668
    start or mysqlbinlog)
 
2669
  */
 
2670
  rotate_relay_log(mi); /* will take the right mutexes */
 
2671
  return(0);
 
2672
}
 
2673
 
 
2674
/*
 
2675
  Reads a 3.23 event and converts it to the slave's format. This code was
 
2676
  copied from DRIZZLE 4.0.
 
2677
*/
 
2678
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
 
2679
                           uint32_t event_len)
 
2680
{
 
2681
  const char *errmsg = 0;
 
2682
  uint32_t inc_pos;
 
2683
  bool ignore_event= 0;
 
2684
  char *tmp_buf = 0;
 
2685
  Relay_log_info *rli= &mi->rli;
 
2686
 
 
2687
  /*
 
2688
    If we get Load event, we need to pass a non-reusable buffer
 
2689
    to read_log_event, so we do a trick
 
2690
  */
 
2691
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
 
2692
  {
 
2693
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
 
2694
    {
 
2695
      mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
 
2696
                 ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
 
2697
      return(1);
 
2698
    }
 
2699
    memcpy(tmp_buf,buf,event_len);
 
2700
    /*
 
2701
      Create_file constructor wants a 0 as last char of buffer, this 0 will
 
2702
      serve as the string-termination char for the file's name (which is at the
 
2703
      end of the buffer)
 
2704
      We must increment event_len, otherwise the event constructor will not see
 
2705
      this end 0, which leads to segfault.
 
2706
    */
 
2707
    tmp_buf[event_len++]=0;
 
2708
    int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
 
2709
    buf = (const char*)tmp_buf;
 
2710
  }
 
2711
  /*
 
2712
    This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
 
2713
    send the loaded file, and write it to the relay log in the form of
 
2714
    Append_block/Exec_load (the SQL thread needs the data, as that thread is not
 
2715
    connected to the master).
 
2716
  */
 
2717
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
 
2718
                                            mi->rli.relay_log.description_event_for_queue);
 
2719
  if (unlikely(!ev))
 
2720
  {
 
2721
    sql_print_error(_("Read invalid event from master: '%s', "
 
2722
                      "master could be corrupt but a more likely cause "
 
2723
                      "of this is a bug"),
 
2724
                    errmsg);
 
2725
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
 
2726
    return(1);
 
2727
  }
 
2728
 
 
2729
  pthread_mutex_lock(&mi->data_lock);
 
2730
  ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
 
2731
  switch (ev->get_type_code()) {
 
2732
  case STOP_EVENT:
 
2733
    ignore_event= 1;
 
2734
    inc_pos= event_len;
 
2735
    break;
 
2736
  case ROTATE_EVENT:
 
2737
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
 
2738
    {
 
2739
      delete ev;
 
2740
      pthread_mutex_unlock(&mi->data_lock);
 
2741
      return(1);
 
2742
    }
 
2743
    inc_pos= 0;
 
2744
    break;
 
2745
  case CREATE_FILE_EVENT:
 
2746
    /*
 
2747
      Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
 
2748
      queue_old_event() which is for 3.23 events which don't comprise
 
2749
      CREATE_FILE_EVENT. This is because read_log_event() above has just
 
2750
      transformed LOAD_EVENT into CREATE_FILE_EVENT.
 
2751
    */
 
2752
  {
 
2753
    /* We come here when and only when tmp_buf != 0 */
 
2754
    assert(tmp_buf != 0);
 
2755
    inc_pos=event_len;
 
2756
    ev->log_pos+= inc_pos;
 
2757
    int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
 
2758
    delete ev;
 
2759
    mi->master_log_pos += inc_pos;
 
2760
    pthread_mutex_unlock(&mi->data_lock);
 
2761
    my_free((char*)tmp_buf, MYF(0));
 
2762
    return(error);
 
2763
  }
 
2764
  default:
 
2765
    inc_pos= event_len;
 
2766
    break;
 
2767
  }
 
2768
  if (likely(!ignore_event))
 
2769
  {
 
2770
    if (ev->log_pos)
 
2771
      /*
 
2772
         Don't do it for fake Rotate events (see comment in
 
2773
      Log_event::Log_event(const char* buf...) in log_event.cc).
 
2774
      */
 
2775
      ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
 
2776
    if (unlikely(rli->relay_log.append(ev)))
 
2777
    {
 
2778
      delete ev;
 
2779
      pthread_mutex_unlock(&mi->data_lock);
 
2780
      return(1);
 
2781
    }
 
2782
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
2783
  }
 
2784
  delete ev;
 
2785
  mi->master_log_pos+= inc_pos;
 
2786
  pthread_mutex_unlock(&mi->data_lock);
 
2787
  return(0);
 
2788
}
 
2789
 
 
2790
/*
 
2791
  Reads a 4.0 event and converts it to the slave's format. This code was copied
 
2792
  from queue_binlog_ver_1_event(), with some affordable simplifications.
 
2793
*/
 
2794
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
 
2795
                           uint32_t event_len)
 
2796
{
 
2797
  const char *errmsg = 0;
 
2798
  uint32_t inc_pos;
 
2799
  char *tmp_buf = 0;
 
2800
  Relay_log_info *rli= &mi->rli;
 
2801
 
 
2802
  /* read_log_event() will adjust log_pos to be end_log_pos */
 
2803
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
 
2804
                                            mi->rli.relay_log.description_event_for_queue);
 
2805
  if (unlikely(!ev))
 
2806
  {
 
2807
    sql_print_error(_("Read invalid event from master: '%s', "
 
2808
                      "master could be corrupt but a more likely cause of "
 
2809
                      "this is a bug"),
 
2810
                    errmsg);
 
2811
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
 
2812
    return(1);
 
2813
  }
 
2814
  pthread_mutex_lock(&mi->data_lock);
 
2815
  switch (ev->get_type_code()) {
 
2816
  case STOP_EVENT:
 
2817
    goto err;
 
2818
  case ROTATE_EVENT:
 
2819
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
 
2820
    {
 
2821
      delete ev;
 
2822
      pthread_mutex_unlock(&mi->data_lock);
 
2823
      return(1);
 
2824
    }
 
2825
    inc_pos= 0;
 
2826
    break;
 
2827
  default:
 
2828
    inc_pos= event_len;
 
2829
    break;
 
2830
  }
 
2831
  if (unlikely(rli->relay_log.append(ev)))
 
2832
  {
 
2833
    delete ev;
 
2834
    pthread_mutex_unlock(&mi->data_lock);
 
2835
    return(1);
 
2836
  }
 
2837
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
2838
  delete ev;
 
2839
  mi->master_log_pos+= inc_pos;
 
2840
err:
 
2841
  pthread_mutex_unlock(&mi->data_lock);
 
2842
  return(0);
 
2843
}
 
2844
 
 
2845
/*
 
2846
  queue_old_event()
 
2847
 
 
2848
  Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
 
2849
  (exactly, slave's) format. To do the conversion, we create a 5.0 event from
 
2850
  the 3.23/4.0 bytes, then write this event to the relay log.
 
2851
 
 
2852
  TODO:
 
2853
    Test this code before release - it has to be tested on a separate
 
2854
    setup with 3.23 master or 4.0 master
 
2855
*/
 
2856
 
 
2857
static int32_t queue_old_event(Master_info *mi, const char *buf,
 
2858
                           uint32_t event_len)
 
2859
{
 
2860
  switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
 
2861
  {
 
2862
  case 1:
 
2863
      return(queue_binlog_ver_1_event(mi,buf,event_len));
 
2864
  case 3:
 
2865
      return(queue_binlog_ver_3_event(mi,buf,event_len));
 
2866
  default: /* unsupported format; eg version 2 */
 
2867
    return(1);
 
2868
  }
 
2869
}
 
2870
 
 
2871
/*
 
2872
  queue_event()
 
2873
 
 
2874
  If the event is 3.23/4.0, passes it to queue_old_event() which will convert
 
2875
  it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
 
2876
  no format conversion, it's pure read/write of bytes.
 
2877
  So a 5.0.0 slave's relay log can contain events in the slave's format or in
 
2878
  any >=5.0.0 format.
 
2879
*/
 
2880
 
 
2881
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
 
2882
{
 
2883
  int32_t error= 0;
 
2884
  String error_msg;
 
2885
  uint32_t inc_pos= 0;
 
2886
  Relay_log_info *rli= &mi->rli;
 
2887
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
 
2888
 
 
2889
 
 
2890
  if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
 
2891
      buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
 
2892
    return(queue_old_event(mi,buf,event_len));
 
2893
 
 
2894
  pthread_mutex_lock(&mi->data_lock);
 
2895
 
 
2896
  switch (buf[EVENT_TYPE_OFFSET]) {
 
2897
  case STOP_EVENT:
 
2898
    /*
 
2899
      We needn't write this event to the relay log. Indeed, it just indicates a
 
2900
      master server shutdown. The only thing this does is cleaning. But
 
2901
      cleaning is already done on a per-master-thread basis (as the master
 
2902
      server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
 
2903
      prepared statements' deletion are TODO only when we binlog prep stmts).
 
2904
 
 
2905
      We don't even increment mi->master_log_pos, because we may be just after
 
2906
      a Rotate event. Btw, in a few milliseconds we are going to have a Start
 
2907
      event from the next binlog (unless the master is presently running
 
2908
      without --log-bin).
 
2909
    */
 
2910
    goto err;
 
2911
  case ROTATE_EVENT:
 
2912
  {
 
2913
    Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
 
2914
    if (unlikely(process_io_rotate(mi,&rev)))
 
2915
    {
 
2916
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
2917
      goto err;
 
2918
    }
 
2919
    /*
 
2920
      Now the I/O thread has just changed its mi->master_log_name, so
 
2921
      incrementing mi->master_log_pos is nonsense.
 
2922
    */
 
2923
    inc_pos= 0;
 
2924
    break;
 
2925
  }
 
2926
  case FORMAT_DESCRIPTION_EVENT:
 
2927
  {
 
2928
    /*
 
2929
      Create an event, and save it (when we rotate the relay log, we will have
 
2930
      to write this event again).
 
2931
    */
 
2932
    /*
 
2933
      We are the only thread which reads/writes description_event_for_queue.
 
2934
      The relay_log struct does not move (though some members of it can
 
2935
      change), so we needn't any lock (no rli->data_lock, no log lock).
 
2936
    */
 
2937
    Format_description_log_event* tmp;
 
2938
    const char* errmsg;
 
2939
    if (!(tmp= (Format_description_log_event*)
 
2940
          Log_event::read_log_event(buf, event_len, &errmsg,
 
2941
                                    mi->rli.relay_log.description_event_for_queue)))
 
2942
    {
 
2943
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
2944
      goto err;
 
2945
    }
 
2946
    delete mi->rli.relay_log.description_event_for_queue;
 
2947
    mi->rli.relay_log.description_event_for_queue= tmp;
 
2948
    /*
 
2949
       Though this does some conversion to the slave's format, this will
 
2950
       preserve the master's binlog format version, and number of event types.
 
2951
    */
 
2952
    /*
 
2953
       If the event was not requested by the slave (the slave did not ask for
 
2954
       it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
 
2955
    */
 
2956
    inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
 
2957
  }
 
2958
  break;
 
2959
 
 
2960
  case HEARTBEAT_LOG_EVENT:
 
2961
  {
 
2962
    /*
 
2963
      HB (heartbeat) cannot come before RL (Relay)
 
2964
    */
 
2965
    char  llbuf[22];
 
2966
    Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
 
2967
    if (!hb.is_valid())
 
2968
    {
 
2969
      error= ER_SLAVE_HEARTBEAT_FAILURE;
 
2970
      error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
 
2971
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
 
2972
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
 
2973
      error_msg.append(STRING_WITH_LEN(" log_pos "));
 
2974
      llstr(hb.log_pos, llbuf);
 
2975
      error_msg.append(llbuf, strlen(llbuf));
 
2976
      goto err;
 
2977
    }
 
2978
    mi->received_heartbeats++;
 
2979
    /* 
 
2980
       compare local and event's versions of log_file, log_pos.
 
2981
       
 
2982
       Heartbeat is sent only after an event corresponding to the corrdinates
 
2983
       the heartbeat carries.
 
2984
       Slave can not have a difference in coordinates except in the only
 
2985
       special case when mi->master_log_name, master_log_pos have never
 
2986
       been updated by Rotate event i.e when slave does not have any history
 
2987
       with the master (and thereafter mi->master_log_pos is NULL).
 
2988
 
 
2989
       TODO: handling `when' for SHOW SLAVE STATUS' snds behind
 
2990
    */
 
2991
    if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
 
2992
         && mi->master_log_name != NULL)
 
2993
        || mi->master_log_pos != hb.log_pos)
 
2994
    {
 
2995
      /* missed events of heartbeat from the past */
 
2996
      error= ER_SLAVE_HEARTBEAT_FAILURE;
 
2997
      error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
 
2998
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
 
2999
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
 
3000
      error_msg.append(STRING_WITH_LEN(" log_pos "));
 
3001
      llstr(hb.log_pos, llbuf);
 
3002
      error_msg.append(llbuf, strlen(llbuf));
 
3003
      goto err;
 
3004
    }
 
3005
    goto skip_relay_logging;
 
3006
  }
 
3007
  break;
 
3008
    
 
3009
  default:
 
3010
    inc_pos= event_len;
 
3011
    break;
 
3012
  }
 
3013
 
 
3014
  /*
 
3015
     If this event is originating from this server, don't queue it.
 
3016
     We don't check this for 3.23 events because it's simpler like this; 3.23
 
3017
     will be filtered anyway by the SQL slave thread which also tests the
 
3018
     server id (we must also keep this test in the SQL thread, in case somebody
 
3019
     upgrades a 4.0 slave which has a not-filtered relay log).
 
3020
 
 
3021
     ANY event coming from ourselves can be ignored: it is obvious for queries;
 
3022
     for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
 
3023
     (--log-slave-updates would not log that) unless this slave is also its
 
3024
     direct master (an unsupported, useless setup!).
 
3025
  */
 
3026
 
 
3027
  pthread_mutex_lock(log_lock);
 
3028
 
 
3029
  if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
 
3030
      !mi->rli.replicate_same_server_id)
 
3031
  {
 
3032
    /*
 
3033
      Do not write it to the relay log.
 
3034
      a) We still want to increment mi->master_log_pos, so that we won't
 
3035
      re-read this event from the master if the slave IO thread is now
 
3036
      stopped/restarted (more efficient if the events we are ignoring are big
 
3037
      LOAD DATA INFILE).
 
3038
      b) We want to record that we are skipping events, for the information of
 
3039
      the slave SQL thread, otherwise that thread may let
 
3040
      rli->group_relay_log_pos stay too small if the last binlog's event is
 
3041
      ignored.
 
3042
      But events which were generated by this slave and which do not exist in
 
3043
      the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
 
3044
      mi->master_log_pos.
 
3045
    */
 
3046
    if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
 
3047
        buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
 
3048
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
 
3049
    {
 
3050
      mi->master_log_pos+= inc_pos;
 
3051
      memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
 
3052
      assert(rli->ign_master_log_name_end[0]);
 
3053
      rli->ign_master_log_pos_end= mi->master_log_pos;
 
3054
    }
 
3055
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
 
3056
  }
 
3057
  else
 
3058
  {
 
3059
    /* write the event to the relay log */
 
3060
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
 
3061
    {
 
3062
      mi->master_log_pos+= inc_pos;
 
3063
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3064
    }
 
3065
    else
 
3066
    {
 
3067
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
3068
    }
 
3069
    rli->ign_master_log_name_end[0]= 0; // last event is not ignored
 
3070
  }
 
3071
  pthread_mutex_unlock(log_lock);
 
3072
 
 
3073
skip_relay_logging:
 
3074
  
 
3075
err:
 
3076
  pthread_mutex_unlock(&mi->data_lock);
 
3077
  if (error)
 
3078
    mi->report(ERROR_LEVEL, error, ER(error),
 
3079
               (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
 
3080
               _("could not queue event from master") :
 
3081
               error_msg.ptr());
 
3082
  return(error);
 
3083
}
 
3084
 
 
3085
 
 
3086
void end_relay_log_info(Relay_log_info* rli)
 
3087
{
 
3088
  if (!rli->inited)
 
3089
    return;
 
3090
  if (rli->info_fd >= 0)
 
3091
  {
 
3092
    end_io_cache(&rli->info_file);
 
3093
    (void) my_close(rli->info_fd, MYF(MY_WME));
 
3094
    rli->info_fd = -1;
 
3095
  }
 
3096
  if (rli->cur_log_fd >= 0)
 
3097
  {
 
3098
    end_io_cache(&rli->cache_buf);
 
3099
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
 
3100
    rli->cur_log_fd = -1;
 
3101
  }
 
3102
  rli->inited = 0;
 
3103
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
3104
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3105
  /*
 
3106
    Delete the slave's temporary tables from memory.
 
3107
    In the future there will be other actions than this, to ensure persistance
 
3108
    of slave's temp tables after shutdown.
 
3109
  */
 
3110
  rli->close_temporary_tables();
 
3111
  return;
 
3112
}
 
3113
 
 
3114
/*
 
3115
  Try to connect until successful or slave killed
 
3116
 
 
3117
  SYNPOSIS
 
3118
    safe_connect()
 
3119
    thd                 Thread handler for slave
 
3120
    DRIZZLE               DRIZZLE connection handle
 
3121
    mi                  Replication handle
 
3122
 
 
3123
  RETURN
 
3124
    0   ok
 
3125
    #   Error
 
3126
*/
 
3127
 
 
3128
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi)
 
3129
{
 
3130
  return(connect_to_master(thd, drizzle, mi, 0, 0));
 
3131
}
 
3132
 
 
3133
 
 
3134
/*
 
3135
  SYNPOSIS
 
3136
    connect_to_master()
 
3137
 
 
3138
  IMPLEMENTATION
 
3139
    Try to connect until successful or slave killed or we have retried
 
3140
    master_retry_count times
 
3141
*/
 
3142
 
 
3143
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
3144
                             bool reconnect, bool suppress_warnings)
 
3145
{
 
3146
  int32_t slave_was_killed;
 
3147
  int32_t last_errno= -2;                           // impossible error
 
3148
  uint32_t err_count=0;
 
3149
  char llbuff[22];
 
3150
 
 
3151
  mi->events_till_disconnect = disconnect_slave_event_count;
 
3152
  uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
 
3153
  if (opt_slave_compressed_protocol)
 
3154
    client_flag=CLIENT_COMPRESS;                /* We will use compression */
 
3155
 
 
3156
  drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
 
3157
  drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
 
3158
 
 
3159
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
 
3160
         (reconnect ? drizzle_reconnect(drizzle) != 0 :
 
3161
          drizzle_connect(drizzle, mi->host, mi->user, mi->password, 0,
 
3162
                             mi->port, 0, client_flag) == 0))
 
3163
  {
 
3164
    /* Don't repeat last error */
 
3165
    if ((int32_t)drizzle_errno(drizzle) != last_errno)
 
3166
    {
 
3167
      last_errno=drizzle_errno(drizzle);
 
3168
      suppress_warnings= 0;
 
3169
      mi->report(ERROR_LEVEL, last_errno,
 
3170
                 _("error %s to master '%s@%s:%d'"
 
3171
                   " - retry-time: %d  retries: %u"),
 
3172
                 (reconnect ? _("reconnecting") : _("connecting")),
 
3173
                 mi->user, mi->host, mi->port,
 
3174
                 mi->connect_retry, master_retry_count);
 
3175
    }
 
3176
    /*
 
3177
      By default we try forever. The reason is that failure will trigger
 
3178
      master election, so if the user did not set master_retry_count we
 
3179
      do not want to have election triggered on the first failure to
 
3180
      connect
 
3181
    */
 
3182
    if (++err_count == master_retry_count)
 
3183
    {
 
3184
      slave_was_killed=1;
 
3185
      if (reconnect)
 
3186
        change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
 
3187
      break;
 
3188
    }
 
3189
    safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
 
3190
               (void*)mi);
 
3191
  }
 
3192
 
 
3193
  if (!slave_was_killed)
 
3194
  {
 
3195
    if (reconnect)
 
3196
    {
 
3197
      if (!suppress_warnings && global_system_variables.log_warnings)
 
3198
        sql_print_information(_("Slave: connected to master '%s@%s:%d', "
 
3199
                                "replication resumed in log '%s' at "
 
3200
                                "position %s"), mi->user,
 
3201
                                mi->host, mi->port,
 
3202
                                IO_RPL_LOG_NAME,
 
3203
                                llstr(mi->master_log_pos,llbuff));
 
3204
    }
 
3205
    else
 
3206
    {
 
3207
      change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
 
3208
      general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d",
 
3209
                        mi->user, mi->host, mi->port);
 
3210
    }
 
3211
  }
 
3212
  drizzle->reconnect= 1;
 
3213
  return(slave_was_killed);
 
3214
}
 
3215
 
 
3216
 
 
3217
/*
 
3218
  safe_reconnect()
 
3219
 
 
3220
  IMPLEMENTATION
 
3221
    Try to connect until successful or slave killed or we have retried
 
3222
    master_retry_count times
 
3223
*/
 
3224
 
 
3225
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
3226
                          bool suppress_warnings)
 
3227
{
 
3228
  return(connect_to_master(thd, drizzle, mi, 1, suppress_warnings));
 
3229
}
 
3230
 
 
3231
 
 
3232
/*
 
3233
  Store the file and position where the execute-slave thread are in the
 
3234
  relay log.
 
3235
 
 
3236
  SYNOPSIS
 
3237
    flush_relay_log_info()
 
3238
    rli                 Relay log information
 
3239
 
 
3240
  NOTES
 
3241
    - As this is only called by the slave thread, we don't need to
 
3242
      have a lock on this.
 
3243
    - If there is an active transaction, then we don't update the position
 
3244
      in the relay log.  This is to ensure that we re-execute statements
 
3245
      if we die in the middle of an transaction that was rolled back.
 
3246
    - As a transaction never spans binary logs, we don't have to handle the
 
3247
      case where we do a relay-log-rotation in the middle of the transaction.
 
3248
      If this would not be the case, we would have to ensure that we
 
3249
      don't delete the relay log file where the transaction started when
 
3250
      we switch to a new relay log file.
 
3251
 
 
3252
  TODO
 
3253
    - Change the log file information to a binary format to avoid calling
 
3254
      int64_t2str.
 
3255
 
 
3256
  RETURN VALUES
 
3257
    0   ok
 
3258
    1   write error
 
3259
*/
 
3260
 
 
3261
bool flush_relay_log_info(Relay_log_info* rli)
 
3262
{
 
3263
  bool error=0;
 
3264
 
 
3265
  if (unlikely(rli->no_storage))
 
3266
    return(0);
 
3267
 
 
3268
  IO_CACHE *file = &rli->info_file;
 
3269
  char buff[FN_REFLEN*2+22*2+4], *pos;
 
3270
 
 
3271
  my_b_seek(file, 0L);
 
3272
  pos=stpcpy(buff, rli->group_relay_log_name);
 
3273
  *pos++='\n';
 
3274
  pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
 
3275
  *pos++='\n';
 
3276
  pos=stpcpy(pos, rli->group_master_log_name);
 
3277
  *pos++='\n';
 
3278
  pos=int64_t2str(rli->group_master_log_pos, pos, 10);
 
3279
  *pos='\n';
 
3280
  if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
 
3281
    error=1;
 
3282
  if (flush_io_cache(file))
 
3283
    error=1;
 
3284
 
 
3285
  /* Flushing the relay log is done by the slave I/O thread */
 
3286
  return(error);
 
3287
}
 
3288
 
 
3289
 
 
3290
/*
 
3291
  Called when we notice that the current "hot" log got rotated under our feet.
 
3292
*/
 
3293
 
 
3294
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
 
3295
{
 
3296
  assert(rli->cur_log != &rli->cache_buf);
 
3297
  assert(rli->cur_log_fd == -1);
 
3298
 
 
3299
  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
 
3300
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
 
3301
                                   errmsg)) <0)
 
3302
    return(0);
 
3303
  /*
 
3304
    We want to start exactly where we was before:
 
3305
    relay_log_pos       Current log pos
 
3306
    pending             Number of bytes already processed from the event
 
3307
  */
 
3308
  rli->event_relay_log_pos= max(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
 
3309
  my_b_seek(cur_log,rli->event_relay_log_pos);
 
3310
  return(cur_log);
 
3311
}
 
3312
 
 
3313
 
 
3314
static Log_event* next_event(Relay_log_info* rli)
 
3315
{
 
3316
  Log_event* ev;
 
3317
  IO_CACHE* cur_log = rli->cur_log;
 
3318
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
 
3319
  const char* errmsg=0;
 
3320
  THD* thd = rli->sql_thd;
 
3321
 
 
3322
  assert(thd != 0);
 
3323
 
 
3324
  if (abort_slave_event_count && !rli->events_till_abort--)
 
3325
    return(0);
 
3326
 
 
3327
  /*
 
3328
    For most operations we need to protect rli members with data_lock,
 
3329
    so we assume calling function acquired this mutex for us and we will
 
3330
    hold it for the most of the loop below However, we will release it
 
3331
    whenever it is worth the hassle,  and in the cases when we go into a
 
3332
    pthread_cond_wait() with the non-data_lock mutex
 
3333
  */
 
3334
  safe_mutex_assert_owner(&rli->data_lock);
 
3335
 
 
3336
  while (!sql_slave_killed(thd,rli))
 
3337
  {
 
3338
    /*
 
3339
      We can have two kinds of log reading:
 
3340
      hot_log:
 
3341
        rli->cur_log points at the IO_CACHE of relay_log, which
 
3342
        is actively being updated by the I/O thread. We need to be careful
 
3343
        in this case and make sure that we are not looking at a stale log that
 
3344
        has already been rotated. If it has been, we reopen the log.
 
3345
 
 
3346
      The other case is much simpler:
 
3347
        We just have a read only log that nobody else will be updating.
 
3348
    */
 
3349
    bool hot_log;
 
3350
    if ((hot_log = (cur_log != &rli->cache_buf)))
 
3351
    {
 
3352
      assert(rli->cur_log_fd == -1); // foreign descriptor
 
3353
      pthread_mutex_lock(log_lock);
 
3354
 
 
3355
      /*
 
3356
        Reading xxx_file_id is safe because the log will only
 
3357
        be rotated when we hold relay_log.LOCK_log
 
3358
      */
 
3359
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
 
3360
      {
 
3361
        // The master has switched to a new log file; Reopen the old log file
 
3362
        cur_log=reopen_relay_log(rli, &errmsg);
 
3363
        pthread_mutex_unlock(log_lock);
 
3364
        if (!cur_log)                           // No more log files
 
3365
          goto err;
 
3366
        hot_log=0;                              // Using old binary log
 
3367
      }
 
3368
    }
 
3369
    /* 
 
3370
      As there is no guarantee that the relay is open (for example, an I/O
 
3371
      error during a write by the slave I/O thread may have closed it), we
 
3372
      have to test it.
 
3373
    */
 
3374
    if (!my_b_inited(cur_log))
 
3375
      goto err;
 
3376
    assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
 
3377
    assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
 
3378
 
 
3379
    /*
 
3380
      Relay log is always in new format - if the master is 3.23, the
 
3381
      I/O thread will convert the format for us.
 
3382
      A problem: the description event may be in a previous relay log. So if
 
3383
      the slave has been shutdown meanwhile, we would have to look in old relay
 
3384
      logs, which may even have been deleted. So we need to write this
 
3385
      description event at the beginning of the relay log.
 
3386
      When the relay log is created when the I/O thread starts, easy: the
 
3387
      master will send the description event and we will queue it.
 
3388
      But if the relay log is created by new_file(): then the solution is:
 
3389
      DRIZZLE_BIN_LOG::open() will write the buffered description event.
 
3390
    */
 
3391
    if ((ev=Log_event::read_log_event(cur_log,0,
 
3392
                                      rli->relay_log.description_event_for_exec)))
 
3393
 
 
3394
    {
 
3395
      assert(thd==rli->sql_thd);
 
3396
      /*
 
3397
        read it while we have a lock, to avoid a mutex lock in
 
3398
        inc_event_relay_log_pos()
 
3399
      */
 
3400
      rli->future_event_relay_log_pos= my_b_tell(cur_log);
 
3401
      if (hot_log)
 
3402
        pthread_mutex_unlock(log_lock);
 
3403
      return(ev);
 
3404
    }
 
3405
    assert(thd==rli->sql_thd);
 
3406
    if (opt_reckless_slave)                     // For mysql-test
 
3407
      cur_log->error = 0;
 
3408
    if (cur_log->error < 0)
 
3409
    {
 
3410
      errmsg = "slave SQL thread aborted because of I/O error";
 
3411
      if (hot_log)
 
3412
        pthread_mutex_unlock(log_lock);
 
3413
      goto err;
 
3414
    }
 
3415
    if (!cur_log->error) /* EOF */
 
3416
    {
 
3417
      /*
 
3418
        On a hot log, EOF means that there are no more updates to
 
3419
        process and we must block until I/O thread adds some and
 
3420
        signals us to continue
 
3421
      */
 
3422
      if (hot_log)
 
3423
      {
 
3424
        /*
 
3425
          We say in Seconds_Behind_Master that we have "caught up". Note that
 
3426
          for example if network link is broken but I/O slave thread hasn't
 
3427
          noticed it (slave_net_timeout not elapsed), then we'll say "caught
 
3428
          up" whereas we're not really caught up. Fixing that would require
 
3429
          internally cutting timeout in smaller pieces in network read, no
 
3430
          thanks. Another example: SQL has caught up on I/O, now I/O has read
 
3431
          a new event and is queuing it; the false "0" will exist until SQL
 
3432
          finishes executing the new event; it will be look abnormal only if
 
3433
          the events have old timestamps (then you get "many", 0, "many").
 
3434
 
 
3435
          Transient phases like this can be fixed with implemeting
 
3436
          Heartbeat event which provides the slave the status of the
 
3437
          master at time the master does not have any new update to send.
 
3438
          Seconds_Behind_Master would be zero only when master has no
 
3439
          more updates in binlog for slave. The heartbeat can be sent
 
3440
          in a (small) fraction of slave_net_timeout. Until it's done
 
3441
          rli->last_master_timestamp is temporarely (for time of
 
3442
          waiting for the following event) reset whenever EOF is
 
3443
          reached.
 
3444
        */
 
3445
        time_t save_timestamp= rli->last_master_timestamp;
 
3446
        rli->last_master_timestamp= 0;
 
3447
 
 
3448
        assert(rli->relay_log.get_open_count() ==
 
3449
                    rli->cur_log_old_open_count);
 
3450
 
 
3451
        if (rli->ign_master_log_name_end[0])
 
3452
        {
 
3453
          /* We generate and return a Rotate, to make our positions advance */
 
3454
          ev= new Rotate_log_event(rli->ign_master_log_name_end,
 
3455
                                   0, rli->ign_master_log_pos_end,
 
3456
                                   Rotate_log_event::DUP_NAME);
 
3457
          rli->ign_master_log_name_end[0]= 0;
 
3458
          pthread_mutex_unlock(log_lock);
 
3459
          if (unlikely(!ev))
 
3460
          {
 
3461
            errmsg= "Slave SQL thread failed to create a Rotate event "
 
3462
              "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
 
3463
            goto err;
 
3464
          }
 
3465
          ev->server_id= 0; // don't be ignored by slave SQL thread
 
3466
          return(ev);
 
3467
        }
 
3468
 
 
3469
        /*
 
3470
          We can, and should release data_lock while we are waiting for
 
3471
          update. If we do not, show slave status will block
 
3472
        */
 
3473
        pthread_mutex_unlock(&rli->data_lock);
 
3474
 
 
3475
        /*
 
3476
          Possible deadlock :
 
3477
          - the I/O thread has reached log_space_limit
 
3478
          - the SQL thread has read all relay logs, but cannot purge for some
 
3479
          reason:
 
3480
            * it has already purged all logs except the current one
 
3481
            * there are other logs than the current one but they're involved in
 
3482
            a transaction that finishes in the current one (or is not finished)
 
3483
          Solution :
 
3484
          Wake up the possibly waiting I/O thread, and set a boolean asking
 
3485
          the I/O thread to temporarily ignore the log_space_limit
 
3486
          constraint, because we do not want the I/O thread to block because of
 
3487
          space (it's ok if it blocks for any other reason (e.g. because the
 
3488
          master does not send anything). Then the I/O thread stops waiting
 
3489
          and reads more events.
 
3490
          The SQL thread decides when the I/O thread should take log_space_limit
 
3491
          into account again : ignore_log_space_limit is reset to 0
 
3492
          in purge_first_log (when the SQL thread purges the just-read relay
 
3493
          log), and also when the SQL thread starts. We should also reset
 
3494
          ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
 
3495
          fact, no need as RESET SLAVE requires that the slave
 
3496
          be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
 
3497
          it stops.
 
3498
        */
 
3499
        pthread_mutex_lock(&rli->log_space_lock);
 
3500
        // prevent the I/O thread from blocking next times
 
3501
        rli->ignore_log_space_limit= 1;
 
3502
        /*
 
3503
          If the I/O thread is blocked, unblock it.  Ok to broadcast
 
3504
          after unlock, because the mutex is only destroyed in
 
3505
          ~Relay_log_info(), i.e. when rli is destroyed, and rli will
 
3506
          not be destroyed before we exit the present function.
 
3507
        */
 
3508
        pthread_mutex_unlock(&rli->log_space_lock);
 
3509
        pthread_cond_broadcast(&rli->log_space_cond);
 
3510
        // Note that wait_for_update_relay_log unlocks lock_log !
 
3511
        rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
 
3512
        // re-acquire data lock since we released it earlier
 
3513
        pthread_mutex_lock(&rli->data_lock);
 
3514
        rli->last_master_timestamp= save_timestamp;
 
3515
        continue;
 
3516
      }
 
3517
      /*
 
3518
        If the log was not hot, we need to move to the next log in
 
3519
        sequence. The next log could be hot or cold, we deal with both
 
3520
        cases separately after doing some common initialization
 
3521
      */
 
3522
      end_io_cache(cur_log);
 
3523
      assert(rli->cur_log_fd >= 0);
 
3524
      my_close(rli->cur_log_fd, MYF(MY_WME));
 
3525
      rli->cur_log_fd = -1;
 
3526
 
 
3527
      if (relay_log_purge)
 
3528
      {
 
3529
        /*
 
3530
          purge_first_log will properly set up relay log coordinates in rli.
 
3531
          If the group's coordinates are equal to the event's coordinates
 
3532
          (i.e. the relay log was not rotated in the middle of a group),
 
3533
          we can purge this relay log too.
 
3534
          We do uint64_t and string comparisons, this may be slow but
 
3535
          - purging the last relay log is nice (it can save 1GB of disk), so we
 
3536
          like to detect the case where we can do it, and given this,
 
3537
          - I see no better detection method
 
3538
          - purge_first_log is not called that often
 
3539
        */
 
3540
        if (rli->relay_log.purge_first_log
 
3541
            (rli,
 
3542
             rli->group_relay_log_pos == rli->event_relay_log_pos
 
3543
             && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
 
3544
        {
 
3545
          errmsg = "Error purging processed logs";
 
3546
          goto err;
 
3547
        }
 
3548
      }
 
3549
      else
 
3550
      {
 
3551
        /*
 
3552
          If hot_log is set, then we already have a lock on
 
3553
          LOCK_log.  If not, we have to get the lock.
 
3554
 
 
3555
          According to Sasha, the only time this code will ever be executed
 
3556
          is if we are recovering from a bug.
 
3557
        */
 
3558
        if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
 
3559
        {
 
3560
          errmsg = "error switching to the next log";
 
3561
          goto err;
 
3562
        }
 
3563
        rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
 
3564
        strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
 
3565
                sizeof(rli->event_relay_log_name)-1);
 
3566
        flush_relay_log_info(rli);
 
3567
      }
 
3568
 
 
3569
      /*
 
3570
        Now we want to open this next log. To know if it's a hot log (the one
 
3571
        being written by the I/O thread now) or a cold log, we can use
 
3572
        is_active(); if it is hot, we use the I/O cache; if it's cold we open
 
3573
        the file normally. But if is_active() reports that the log is hot, this
 
3574
        may change between the test and the consequence of the test. So we may
 
3575
        open the I/O cache whereas the log is now cold, which is nonsense.
 
3576
        To guard against this, we need to have LOCK_log.
 
3577
      */
 
3578
 
 
3579
      if (!hot_log) /* if hot_log, we already have this mutex */
 
3580
        pthread_mutex_lock(log_lock);
 
3581
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
 
3582
      {
 
3583
#ifdef EXTRA_DEBUG
 
3584
        if (global_system_variables.log_warnings)
 
3585
          sql_print_information(_("next log '%s' is currently active"),
 
3586
                                rli->linfo.log_file_name);
 
3587
#endif
 
3588
        rli->cur_log= cur_log= rli->relay_log.get_log_file();
 
3589
        rli->cur_log_old_open_count= rli->relay_log.get_open_count();
 
3590
        assert(rli->cur_log_fd == -1);
 
3591
 
 
3592
        /*
 
3593
          Read pointer has to be at the start since we are the only
 
3594
          reader.
 
3595
          We must keep the LOCK_log to read the 4 first bytes, as this is a hot
 
3596
          log (same as when we call read_log_event() above: for a hot log we
 
3597
          take the mutex).
 
3598
        */
 
3599
        if (check_binlog_magic(cur_log,&errmsg))
 
3600
        {
 
3601
          if (!hot_log) pthread_mutex_unlock(log_lock);
 
3602
          goto err;
 
3603
        }
 
3604
        if (!hot_log) pthread_mutex_unlock(log_lock);
 
3605
        continue;
 
3606
      }
 
3607
      if (!hot_log) pthread_mutex_unlock(log_lock);
 
3608
      /*
 
3609
        if we get here, the log was not hot, so we will have to open it
 
3610
        ourselves. We are sure that the log is still not hot now (a log can get
 
3611
        from hot to cold, but not from cold to hot). No need for LOCK_log.
 
3612
      */
 
3613
#ifdef EXTRA_DEBUG
 
3614
      if (global_system_variables.log_warnings)
 
3615
        sql_print_information(_("next log '%s' is not active"),
 
3616
                              rli->linfo.log_file_name);
 
3617
#endif
 
3618
      // open_binlog() will check the magic header
 
3619
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
 
3620
                                       &errmsg)) <0)
 
3621
        goto err;
 
3622
    }
 
3623
    else
 
3624
    {
 
3625
      /*
 
3626
        Read failed with a non-EOF error.
 
3627
        TODO: come up with something better to handle this error
 
3628
      */
 
3629
      if (hot_log)
 
3630
        pthread_mutex_unlock(log_lock);
 
3631
      sql_print_error(_("Slave SQL thread: I/O error reading "
 
3632
                        "event(errno: %d  cur_log->error: %d)"),
 
3633
                      my_errno,cur_log->error);
 
3634
      // set read position to the beginning of the event
 
3635
      my_b_seek(cur_log,rli->event_relay_log_pos);
 
3636
      /* otherwise, we have had a partial read */
 
3637
      errmsg = _("Aborting slave SQL thread because of partial event read");
 
3638
      break;                                    // To end of function
 
3639
    }
 
3640
  }
 
3641
  if (!errmsg && global_system_variables.log_warnings)
 
3642
  {
 
3643
    sql_print_information(_("Error reading relay log event: %s"),
 
3644
                          _("slave SQL thread was killed"));
 
3645
    return(0);
 
3646
  }
 
3647
 
 
3648
err:
 
3649
  if (errmsg)
 
3650
    sql_print_error(_("Error reading relay log event: %s"), errmsg);
 
3651
  return(0);
 
3652
}
 
3653
 
 
3654
/*
 
3655
  Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
 
3656
  because of size is simpler because when we do it we already have all relevant
 
3657
  locks; here we don't, so this function is mainly taking locks).
 
3658
  Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
 
3659
  is void).
 
3660
*/
 
3661
 
 
3662
void rotate_relay_log(Master_info* mi)
 
3663
{
 
3664
  Relay_log_info* rli= &mi->rli;
 
3665
 
 
3666
  /* We don't lock rli->run_lock. This would lead to deadlocks. */
 
3667
  pthread_mutex_lock(&mi->run_lock);
 
3668
 
 
3669
  /*
 
3670
     We need to test inited because otherwise, new_file() will attempt to lock
 
3671
     LOCK_log, which may not be inited (if we're not a slave).
 
3672
  */
 
3673
  if (!rli->inited)
 
3674
  {
 
3675
    goto end;
 
3676
  }
 
3677
 
 
3678
  /* If the relay log is closed, new_file() will do nothing. */
 
3679
  rli->relay_log.new_file();
 
3680
 
 
3681
  /*
 
3682
    We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
 
3683
    be counted, so imagine a succession of FLUSH LOGS  and assume the slave
 
3684
    threads are started:
 
3685
    relay_log_space decreases by the size of the deleted relay log, but does
 
3686
    not increase, so flush-after-flush we may become negative, which is wrong.
 
3687
    Even if this will be corrected as soon as a query is replicated on the
 
3688
    slave (because the I/O thread will then call harvest_bytes_written() which
 
3689
    will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
 
3690
    output in SHOW SLAVE STATUS meanwhile. So we harvest now.
 
3691
    If the log is closed, then this will just harvest the last writes, probably
 
3692
    0 as they probably have been harvested.
 
3693
  */
 
3694
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3695
end:
 
3696
  pthread_mutex_unlock(&mi->run_lock);
 
3697
  return;
 
3698
}
 
3699
 
 
3700
 
 
3701
/**
 
3702
   Detects, based on master's version (as found in the relay log), if master
 
3703
   has a certain bug.
 
3704
   @param rli Relay_log_info which tells the master's version
 
3705
   @param bug_id Number of the bug as found in bugs.mysql.com
 
3706
   @param report bool report error message, default TRUE
 
3707
   @return true if master has the bug, FALSE if it does not.
 
3708
*/
 
3709
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
 
3710
{
 
3711
  struct st_version_range_for_one_bug {
 
3712
    uint32_t        bug_id;
 
3713
    const uchar introduced_in[3]; // first version with bug
 
3714
    const uchar fixed_in[3];      // first version with fix
 
3715
  };
 
3716
  static struct st_version_range_for_one_bug versions_for_all_bugs[]=
 
3717
  {
 
3718
    {24432, { 5, 0, 24 }, { 5, 0, 38 } },
 
3719
    {24432, { 5, 1, 12 }, { 5, 1, 17 } },
 
3720
    {33029, { 5, 0,  0 }, { 5, 0, 58 } },
 
3721
    {33029, { 5, 1,  0 }, { 5, 1, 12 } },
 
3722
  };
 
3723
  const uchar *master_ver=
 
3724
    rli->relay_log.description_event_for_exec->server_version_split;
 
3725
 
 
3726
  assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
 
3727
 
 
3728
  for (uint32_t i= 0;
 
3729
       i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
 
3730
  {
 
3731
    const uchar *introduced_in= versions_for_all_bugs[i].introduced_in,
 
3732
      *fixed_in= versions_for_all_bugs[i].fixed_in;
 
3733
    if ((versions_for_all_bugs[i].bug_id == bug_id) &&
 
3734
        (memcmp(introduced_in, master_ver, 3) <= 0) &&
 
3735
        (memcmp(fixed_in,      master_ver, 3) >  0))
 
3736
    {
 
3737
      if (!report)
 
3738
        return true;
 
3739
 
 
3740
      // a short message for SHOW SLAVE STATUS (message length constraints)
 
3741
      my_printf_error(ER_UNKNOWN_ERROR,
 
3742
                      _("master may suffer from"
 
3743
                        " http://bugs.mysql.com/bug.php?id=%u"
 
3744
                        " so slave stops; check error log on slave"
 
3745
                        " for more info"), MYF(0), bug_id);
 
3746
      // a verbose message for the error log
 
3747
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
 
3748
                  _("According to the master's version ('%s'),"
 
3749
                    " it is probable that master suffers from this bug:"
 
3750
                    " http://bugs.mysql.com/bug.php?id=%u"
 
3751
                    " and thus replicating the current binary log event"
 
3752
                    " may make the slave's data become different from the"
 
3753
                    " master's data."
 
3754
                    " To take no risk, slave refuses to replicate"
 
3755
                    " this event and stops."
 
3756
                    " We recommend that all updates be stopped on the"
 
3757
                    " master and slave, that the data of both be"
 
3758
                    " manually synchronized,"
 
3759
                    " that master's binary logs be deleted,"
 
3760
                    " that master be upgraded to a version at least"
 
3761
                    " equal to '%d.%d.%d'. Then replication can be"
 
3762
                    " restarted."),
 
3763
                  rli->relay_log.description_event_for_exec->server_version,
 
3764
                  bug_id,
 
3765
                  fixed_in[0], fixed_in[1], fixed_in[2]);
 
3766
      return true;
 
3767
    }
 
3768
  }
 
3769
  return false;
 
3770
}
 
3771
 
 
3772
/**
 
3773
   BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
 
3774
   exclusive, if one statement in a SP generated AUTO_INCREMENT value
 
3775
   by the top statement, all statements after it would be considered
 
3776
   generated AUTO_INCREMENT value by the top statement, and a
 
3777
   erroneous INSERT_ID value might be associated with these statement,
 
3778
   which could cause duplicate entry error and stop the slave.
 
3779
 
 
3780
   Detect buggy master to work around.
 
3781
 */
 
3782
bool rpl_master_erroneous_autoinc(THD *thd)
 
3783
{
 
3784
  if (active_mi && active_mi->rli.sql_thd == thd)
 
3785
  {
 
3786
    Relay_log_info *rli= &active_mi->rli;
 
3787
    return rpl_master_has_bug(rli, 33029, false);
 
3788
  }
 
3789
  return false;
 
3790
}
 
3791
 
 
3792
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
3793
template class I_List_iterator<i_string>;
 
3794
template class I_List_iterator<i_string_pair>;
 
3795
#endif
 
3796
 
 
3797
/**
 
3798
  @} (end of group Replication)
 
3799
*/
 
3800
 
 
3801
#endif /* HAVE_REPLICATION */