~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/slave.cc

  • Committer: Mats Kindahl
  • Date: 2008-08-26 07:32:59 UTC
  • mto: (489.1.2 codestyle)
  • mto: This revision was merged to the branch mainline in revision 491.
  • Revision ID: mats@mysql.com-20080826073259-9k4evtajgldgolli
Replaced use of thd_proc_info() macro with calls to
set_proc_info() and get_proc_info() internally.  Introduced
functions set_thd_proc_info() and get_thd_proc_info() for
external users, i.e., plug-ins.

The set_thd_proc_info() accepted callers info that can be used to
print debug output, but the information was not used. The return
value was changed to void and the old value is not fetched any
more. To be able to get the value of proc_info for external
users, the function get_thd_proc_info() was introduced.

The thd_proc_info() macro called set_thd_proc_info() but almost
never used the return value of set_thd_proc_info() so the macro
was replaced with a call of THD::set_proc_info().

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2003 DRIZZLE AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/**
 
18
  @addtogroup Replication
 
19
  @{
 
20
 
 
21
  @file
 
22
 
 
23
  @brief Code to run the io thread and the sql thread on the
 
24
  replication slave.
 
25
*/
 
26
#include <drizzled/server_includes.h>
 
27
 
 
28
#include <storage/myisam/myisam.h>
 
29
#include "rpl_mi.h"
 
30
#include "rpl_rli.h"
 
31
#include "sql_repl.h"
 
32
#include "rpl_filter.h"
 
33
#include "repl_failsafe.h"
 
34
#include <mysys/thr_alarm.h>
 
35
#include <libdrizzle/sql_common.h>
 
36
#include <libdrizzle/errmsg.h>
 
37
#include <mysys/mysys_err.h>
 
38
#include <drizzled/drizzled_error_messages.h>
 
39
 
 
40
#ifdef HAVE_REPLICATION
 
41
 
 
42
#include "rpl_tblmap.h"
 
43
 
 
44
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
 
45
 
 
46
#define MAX_SLAVE_RETRY_PAUSE 5
 
47
bool use_slave_mask = 0;
 
48
MY_BITMAP slave_error_mask;
 
49
 
 
50
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
 
51
 
 
52
char* slave_load_tmpdir = 0;
 
53
Master_info *active_mi= 0;
 
54
bool replicate_same_server_id;
 
55
uint64_t relay_log_space_limit = 0;
 
56
 
 
57
/*
 
58
  When slave thread exits, we need to remember the temporary tables so we
 
59
  can re-use them on slave start.
 
60
 
 
61
  TODO: move the vars below under Master_info
 
62
*/
 
63
 
 
64
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
 
65
int32_t events_till_abort = -1;
 
66
 
 
67
enum enum_slave_reconnect_actions
 
68
{
 
69
  SLAVE_RECON_ACT_REG= 0,
 
70
  SLAVE_RECON_ACT_DUMP= 1,
 
71
  SLAVE_RECON_ACT_EVENT= 2,
 
72
  SLAVE_RECON_ACT_MAX
 
73
};
 
74
 
 
75
enum enum_slave_reconnect_messages
 
76
{
 
77
  SLAVE_RECON_MSG_WAIT= 0,
 
78
  SLAVE_RECON_MSG_KILLED_WAITING= 1,
 
79
  SLAVE_RECON_MSG_AFTER= 2,
 
80
  SLAVE_RECON_MSG_FAILED= 3,
 
81
  SLAVE_RECON_MSG_COMMAND= 4,
 
82
  SLAVE_RECON_MSG_KILLED_AFTER= 5,
 
83
  SLAVE_RECON_MSG_MAX
 
84
};
 
85
 
 
86
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
 
87
{
 
88
  {
 
89
    N_("Waiting to reconnect after a failed registration on master"),
 
90
    N_("Slave I/O thread killed while waitnig to reconnect after a "
 
91
                 "failed registration on master"),
 
92
    N_("Reconnecting after a failed registration on master"),
 
93
    N_("failed registering on master, reconnecting to try again, "
 
94
                 "log '%s' at postion %s"),
 
95
    "COM_REGISTER_SLAVE",
 
96
    N_("Slave I/O thread killed during or after reconnect")
 
97
  },
 
98
  {
 
99
    N_("Waiting to reconnect after a failed binlog dump request"),
 
100
    N_("Slave I/O thread killed while retrying master dump"),
 
101
    N_("Reconnecting after a failed binlog dump request"),
 
102
    N_("failed dump request, reconnecting to try again, "
 
103
                 "log '%s' at postion %s"),
 
104
    "COM_BINLOG_DUMP",
 
105
    N_("Slave I/O thread killed during or after reconnect")
 
106
  },
 
107
  {
 
108
    N_("Waiting to reconnect after a failed master event read"),
 
109
    N_("Slave I/O thread killed while waiting to reconnect "
 
110
                 "after a failed read"),
 
111
    N_("Reconnecting after a failed master event read"),
 
112
    N_("Slave I/O thread: Failed reading log event, "
 
113
                 "reconnecting to retry, log '%s' at postion %s"),
 
114
    "",
 
115
    N_("Slave I/O thread killed during or after a "
 
116
                 "reconnect done to recover from failed read")
 
117
  }
 
118
};
 
119
 
 
120
 
 
121
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
 
122
 
 
123
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
 
124
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
 
125
static bool wait_for_relay_log_space(Relay_log_info* rli);
 
126
static inline bool io_slave_killed(THD* thd,Master_info* mi);
 
127
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
 
128
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
 
129
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi);
 
130
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
131
                          bool suppress_warnings);
 
132
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
133
                             bool reconnect, bool suppress_warnings);
 
134
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
135
                      void* thread_killed_arg);
 
136
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
 
137
static Log_event* next_event(Relay_log_info* rli);
 
138
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
 
139
static int32_t terminate_slave_thread(THD *thd,
 
140
                                  pthread_mutex_t* term_lock,
 
141
                                  pthread_cond_t* term_cond,
 
142
                                  volatile uint32_t *slave_running,
 
143
                                  bool skip_lock);
 
144
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
 
145
 
 
146
/*
 
147
  Find out which replications threads are running
 
148
 
 
149
  SYNOPSIS
 
150
    init_thread_mask()
 
151
    mask                Return value here
 
152
    mi                  master_info for slave
 
153
    inverse             If set, returns which threads are not running
 
154
 
 
155
  IMPLEMENTATION
 
156
    Get a bit mask for which threads are running so that we can later restart
 
157
    these threads.
 
158
 
 
159
  RETURN
 
160
    mask        If inverse == 0, running threads
 
161
                If inverse == 1, stopped threads
 
162
*/
 
163
 
 
164
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
 
165
{
 
166
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
 
167
  register int32_t tmp_mask=0;
 
168
 
 
169
  if (set_io)
 
170
    tmp_mask |= SLAVE_IO;
 
171
  if (set_sql)
 
172
    tmp_mask |= SLAVE_SQL;
 
173
  if (inverse)
 
174
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
 
175
  *mask = tmp_mask;
 
176
  return;
 
177
}
 
178
 
 
179
 
 
180
/*
 
181
  lock_slave_threads()
 
182
*/
 
183
 
 
184
void lock_slave_threads(Master_info* mi)
 
185
{
 
186
  //TODO: see if we can do this without dual mutex
 
187
  pthread_mutex_lock(&mi->run_lock);
 
188
  pthread_mutex_lock(&mi->rli.run_lock);
 
189
  return;
 
190
}
 
191
 
 
192
 
 
193
/*
 
194
  unlock_slave_threads()
 
195
*/
 
196
 
 
197
void unlock_slave_threads(Master_info* mi)
 
198
{
 
199
  //TODO: see if we can do this without dual mutex
 
200
  pthread_mutex_unlock(&mi->rli.run_lock);
 
201
  pthread_mutex_unlock(&mi->run_lock);
 
202
  return;
 
203
}
 
204
 
 
205
 
 
206
/* Initialize slave structures */
 
207
 
 
208
int32_t init_slave()
 
209
{
 
210
  /*
 
211
    This is called when mysqld starts. Before client connections are
 
212
    accepted. However bootstrap may conflict with us if it does START SLAVE.
 
213
    So it's safer to take the lock.
 
214
  */
 
215
  pthread_mutex_lock(&LOCK_active_mi);
 
216
  /*
 
217
    TODO: re-write this to interate through the list of files
 
218
    for multi-master
 
219
  */
 
220
  active_mi= new Master_info;
 
221
 
 
222
  /*
 
223
    If master_host is not specified, try to read it from the master_info file.
 
224
    If master_host is specified, create the master_info file if it doesn't
 
225
    exists.
 
226
  */
 
227
  if (!active_mi)
 
228
  {
 
229
    sql_print_error(_("Failed to allocate memory for the master info structure"));
 
230
    goto err;
 
231
  }
 
232
 
 
233
  if (init_master_info(active_mi,master_info_file,relay_log_info_file,
 
234
                       1, (SLAVE_IO | SLAVE_SQL)))
 
235
  {
 
236
    sql_print_error(_("Failed to initialize the master info structure"));
 
237
    goto err;
 
238
  }
 
239
 
 
240
  /* If server id is not set, start_slave_thread() will say it */
 
241
 
 
242
  if (active_mi->host[0] && !opt_skip_slave_start)
 
243
  {
 
244
    if (start_slave_threads(1 /* need mutex */,
 
245
                            0 /* no wait for start*/,
 
246
                            active_mi,
 
247
                            master_info_file,
 
248
                            relay_log_info_file,
 
249
                            SLAVE_IO | SLAVE_SQL))
 
250
    {
 
251
      sql_print_error(_("Failed to create slave threads"));
 
252
      goto err;
 
253
    }
 
254
  }
 
255
  pthread_mutex_unlock(&LOCK_active_mi);
 
256
  return(0);
 
257
 
 
258
err:
 
259
  pthread_mutex_unlock(&LOCK_active_mi);
 
260
  return(1);
 
261
}
 
262
 
 
263
 
 
264
/*
 
265
  Init function to set up array for errors that should be skipped for slave
 
266
 
 
267
  SYNOPSIS
 
268
    init_slave_skip_errors()
 
269
    arg         List of errors numbers to skip, separated with ','
 
270
 
 
271
  NOTES
 
272
    Called from get_options() in mysqld.cc on start-up
 
273
*/
 
274
 
 
275
void init_slave_skip_errors(const char* arg)
 
276
{
 
277
  const char *p;
 
278
 
 
279
  if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
 
280
  {
 
281
    fprintf(stderr, "Badly out of memory, please check your system status\n");
 
282
    exit(1);
 
283
  }
 
284
  use_slave_mask = 1;
 
285
  for (;my_isspace(system_charset_info,*arg);++arg)
 
286
    /* empty */;
 
287
  if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
 
288
  {
 
289
    bitmap_set_all(&slave_error_mask);
 
290
    return;
 
291
  }
 
292
  for (p= arg ; *p; )
 
293
  {
 
294
    long err_code;
 
295
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
 
296
      break;
 
297
    if (err_code < MAX_SLAVE_ERROR)
 
298
       bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
 
299
    while (!my_isdigit(system_charset_info,*p) && *p)
 
300
      p++;
 
301
  }
 
302
  return;
 
303
}
 
304
 
 
305
 
 
306
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
 
307
{
 
308
  if (!mi->inited)
 
309
    return(0); /* successfully do nothing */
 
310
  int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
 
311
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
 
312
 
 
313
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
 
314
  {
 
315
    mi->abort_slave=1;
 
316
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
 
317
                                      &mi->stop_cond,
 
318
                                      &mi->slave_running,
 
319
                                      skip_lock)) &&
 
320
        !force_all)
 
321
      return(error);
 
322
  }
 
323
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
 
324
  {
 
325
    mi->rli.abort_slave=1;
 
326
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
 
327
                                      &mi->rli.stop_cond,
 
328
                                      &mi->rli.slave_running,
 
329
                                      skip_lock)) &&
 
330
        !force_all)
 
331
      return(error);
 
332
  }
 
333
  return(0);
 
334
}
 
335
 
 
336
 
 
337
/**
 
338
   Wait for a slave thread to terminate.
 
339
 
 
340
   This function is called after requesting the thread to terminate
 
341
   (by setting @c abort_slave member of @c Relay_log_info or @c
 
342
   Master_info structure to 1). Termination of the thread is
 
343
   controlled with the the predicate <code>*slave_running</code>.
 
344
 
 
345
   Function will acquire @c term_lock before waiting on the condition
 
346
   unless @c skip_lock is true in which case the mutex should be owned
 
347
   by the caller of this function and will remain acquired after
 
348
   return from the function.
 
349
 
 
350
   @param term_lock
 
351
          Associated lock to use when waiting for @c term_cond
 
352
 
 
353
   @param term_cond
 
354
          Condition that is signalled when the thread has terminated
 
355
 
 
356
   @param slave_running
 
357
          Pointer to predicate to check for slave thread termination
 
358
 
 
359
   @param skip_lock
 
360
          If @c true the lock will not be acquired before waiting on
 
361
          the condition. In this case, it is assumed that the calling
 
362
          function acquires the lock before calling this function.
 
363
 
 
364
   @retval 0 All OK
 
365
 */
 
366
static int32_t
 
367
terminate_slave_thread(THD *thd,
 
368
                       pthread_mutex_t* term_lock,
 
369
                       pthread_cond_t* term_cond,
 
370
                       volatile uint32_t *slave_running,
 
371
                       bool skip_lock)
 
372
{
 
373
  int32_t error;
 
374
 
 
375
  if (!skip_lock)
 
376
    pthread_mutex_lock(term_lock);
 
377
 
 
378
  safe_mutex_assert_owner(term_lock);
 
379
 
 
380
  if (!*slave_running)
 
381
  {
 
382
    if (!skip_lock)
 
383
      pthread_mutex_unlock(term_lock);
 
384
    return(ER_SLAVE_NOT_RUNNING);
 
385
  }
 
386
  assert(thd != 0);
 
387
  THD_CHECK_SENTRY(thd);
 
388
 
 
389
  /*
 
390
    Is is critical to test if the slave is running. Otherwise, we might
 
391
    be referening freed memory trying to kick it
 
392
  */
 
393
 
 
394
  while (*slave_running)                        // Should always be true
 
395
  {
 
396
    pthread_mutex_lock(&thd->LOCK_delete);
 
397
#ifndef DONT_USE_THR_ALARM
 
398
    /*
 
399
      Error codes from pthread_kill are:
 
400
      EINVAL: invalid signal number (can't happen)
 
401
      ESRCH: thread already killed (can happen, should be ignored)
 
402
    */
 
403
    int32_t err= pthread_kill(thd->real_id, thr_client_alarm);
 
404
    assert(err != EINVAL);
 
405
#endif
 
406
    thd->awake(THD::NOT_KILLED);
 
407
    pthread_mutex_unlock(&thd->LOCK_delete);
 
408
 
 
409
    /*
 
410
      There is a small chance that slave thread might miss the first
 
411
      alarm. To protect againts it, resend the signal until it reacts
 
412
    */
 
413
    struct timespec abstime;
 
414
    set_timespec(abstime,2);
 
415
    error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
 
416
    assert(error == ETIMEDOUT || error == 0);
 
417
  }
 
418
 
 
419
  assert(*slave_running == 0);
 
420
 
 
421
  if (!skip_lock)
 
422
    pthread_mutex_unlock(term_lock);
 
423
  return(0);
 
424
}
 
425
 
 
426
 
 
427
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
 
428
                           pthread_mutex_t *cond_lock,
 
429
                           pthread_cond_t *start_cond,
 
430
                           volatile uint32_t *slave_running,
 
431
                           volatile uint32_t *slave_run_id,
 
432
                           Master_info* mi,
 
433
                           bool high_priority)
 
434
{
 
435
  pthread_t th;
 
436
  uint32_t start_id;
 
437
 
 
438
  assert(mi->inited);
 
439
 
 
440
  if (start_lock)
 
441
    pthread_mutex_lock(start_lock);
 
442
  if (!server_id)
 
443
  {
 
444
    if (start_cond)
 
445
      pthread_cond_broadcast(start_cond);
 
446
    if (start_lock)
 
447
      pthread_mutex_unlock(start_lock);
 
448
    sql_print_error(_("Server id not set, will not start slave"));
 
449
    return(ER_BAD_SLAVE);
 
450
  }
 
451
 
 
452
  if (*slave_running)
 
453
  {
 
454
    if (start_cond)
 
455
      pthread_cond_broadcast(start_cond);
 
456
    if (start_lock)
 
457
      pthread_mutex_unlock(start_lock);
 
458
    return(ER_SLAVE_MUST_STOP);
 
459
  }
 
460
  start_id= *slave_run_id;
 
461
  if (high_priority)
 
462
  {
 
463
    struct sched_param tmp_sched_param;
 
464
 
 
465
    memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
 
466
    tmp_sched_param.sched_priority= CONNECT_PRIOR;
 
467
    (void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
 
468
  }
 
469
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
 
470
  {
 
471
    if (start_lock)
 
472
      pthread_mutex_unlock(start_lock);
 
473
    return(ER_SLAVE_THREAD);
 
474
  }
 
475
  if (start_cond && cond_lock) // caller has cond_lock
 
476
  {
 
477
    THD* thd = current_thd;
 
478
    while (start_id == *slave_run_id)
 
479
    {
 
480
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
 
481
                                            "Waiting for slave thread to start");
 
482
      pthread_cond_wait(start_cond,cond_lock);
 
483
      thd->exit_cond(old_msg);
 
484
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
 
485
      if (thd->killed)
 
486
        return(thd->killed_errno());
 
487
    }
 
488
  }
 
489
  if (start_lock)
 
490
    pthread_mutex_unlock(start_lock);
 
491
  return(0);
 
492
}
 
493
 
 
494
 
 
495
/*
 
496
  start_slave_threads()
 
497
 
 
498
  NOTES
 
499
    SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
 
500
    sense to do that for starting a slave--we always care if it actually
 
501
    started the threads that were not previously running
 
502
*/
 
503
 
 
504
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
 
505
                        Master_info* mi,
 
506
                        const char* master_info_fname __attribute__((unused)),
 
507
                        const char* slave_info_fname __attribute__((unused)),
 
508
                        int32_t thread_mask)
 
509
{
 
510
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
 
511
  pthread_cond_t* cond_io=0,*cond_sql=0;
 
512
  int32_t error=0;
 
513
 
 
514
  if (need_slave_mutex)
 
515
  {
 
516
    lock_io = &mi->run_lock;
 
517
    lock_sql = &mi->rli.run_lock;
 
518
  }
 
519
  if (wait_for_start)
 
520
  {
 
521
    cond_io = &mi->start_cond;
 
522
    cond_sql = &mi->rli.start_cond;
 
523
    lock_cond_io = &mi->run_lock;
 
524
    lock_cond_sql = &mi->rli.run_lock;
 
525
  }
 
526
 
 
527
  if (thread_mask & SLAVE_IO)
 
528
    error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
 
529
                              cond_io,
 
530
                              &mi->slave_running, &mi->slave_run_id,
 
531
                              mi, 1); //high priority, to read the most possible
 
532
  if (!error && (thread_mask & SLAVE_SQL))
 
533
  {
 
534
    error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
 
535
                              cond_sql,
 
536
                              &mi->rli.slave_running, &mi->rli.slave_run_id,
 
537
                              mi, 0);
 
538
    if (error)
 
539
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
 
540
  }
 
541
  return(error);
 
542
}
 
543
 
 
544
 
 
545
#ifdef NOT_USED_YET
 
546
static int32_t end_slave_on_walk(Master_info* mi, uchar* /*unused*/)
 
547
{
 
548
  end_master_info(mi);
 
549
  return(0);
 
550
}
 
551
#endif
 
552
 
 
553
 
 
554
/*
 
555
  Free all resources used by slave
 
556
 
 
557
  SYNOPSIS
 
558
    end_slave()
 
559
*/
 
560
 
 
561
void end_slave()
 
562
{
 
563
  /*
 
564
    This is called when the server terminates, in close_connections().
 
565
    It terminates slave threads. However, some CHANGE MASTER etc may still be
 
566
    running presently. If a START SLAVE was in progress, the mutex lock below
 
567
    will make us wait until slave threads have started, and START SLAVE
 
568
    returns, then we terminate them here.
 
569
  */
 
570
  pthread_mutex_lock(&LOCK_active_mi);
 
571
  if (active_mi)
 
572
  {
 
573
    /*
 
574
      TODO: replace the line below with
 
575
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
 
576
      once multi-master code is ready.
 
577
    */
 
578
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
 
579
    end_master_info(active_mi);
 
580
    delete active_mi;
 
581
    active_mi= 0;
 
582
  }
 
583
  pthread_mutex_unlock(&LOCK_active_mi);
 
584
  return;
 
585
}
 
586
 
 
587
 
 
588
static bool io_slave_killed(THD* thd, Master_info* mi)
 
589
{
 
590
  assert(mi->io_thd == thd);
 
591
  assert(mi->slave_running); // tracking buffer overrun
 
592
  return(mi->abort_slave || abort_loop || thd->killed);
 
593
}
 
594
 
 
595
 
 
596
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
 
597
{
 
598
  assert(rli->sql_thd == thd);
 
599
  assert(rli->slave_running == 1);// tracking buffer overrun
 
600
  if (abort_loop || thd->killed || rli->abort_slave)
 
601
  {
 
602
    /*
 
603
      If we are in an unsafe situation (stopping could corrupt replication),
 
604
      we give one minute to the slave SQL thread of grace before really
 
605
      terminating, in the hope that it will be able to read more events and
 
606
      the unsafe situation will soon be left. Note that this one minute starts
 
607
      from the last time anything happened in the slave SQL thread. So it's
 
608
      really one minute of idleness, we don't timeout if the slave SQL thread
 
609
      is actively working.
 
610
    */
 
611
    if (rli->last_event_start_time == 0)
 
612
      return(1);
 
613
    if (difftime(time(0), rli->last_event_start_time) > 60)
 
614
    {
 
615
      rli->report(ERROR_LEVEL, 0,
 
616
                  _("SQL thread had to stop in an unsafe situation, in "
 
617
                  "the middle of applying updates to a "
 
618
                  "non-transactional table without any primary key. "
 
619
                  "There is a risk of duplicate updates when the slave "
 
620
                  "SQL thread is restarted. Please check your tables' "
 
621
                  "contents after restart."));
 
622
      return(1);
 
623
    }
 
624
  }
 
625
  return(0);
 
626
}
 
627
 
 
628
 
 
629
/*
 
630
  skip_load_data_infile()
 
631
 
 
632
  NOTES
 
633
    This is used to tell a 3.23 master to break send_file()
 
634
*/
 
635
 
 
636
void skip_load_data_infile(NET *net)
 
637
{
 
638
  (void)net_request_file(net, "/dev/null");
 
639
  (void)my_net_read(net);                               // discard response
 
640
  (void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok
 
641
  return;
 
642
}
 
643
 
 
644
 
 
645
bool net_request_file(NET* net, const char* fname)
 
646
{
 
647
  return(net_write_command(net, 251, (uchar*) fname, strlen(fname),
 
648
                                (uchar*) "", 0));
 
649
}
 
650
 
 
651
/*
 
652
  From other comments and tests in code, it looks like
 
653
  sometimes Query_log_event and Load_log_event can have db == 0
 
654
  (see rewrite_db() above for example)
 
655
  (cases where this happens are unclear; it may be when the master is 3.23).
 
656
*/
 
657
 
 
658
const char *print_slave_db_safe(const char* db)
 
659
{
 
660
  return((db ? db : ""));
 
661
}
 
662
 
 
663
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
 
664
                                 const char *default_val)
 
665
{
 
666
  uint32_t length;
 
667
 
 
668
  if ((length=my_b_gets(f,var, max_size)))
 
669
  {
 
670
    char* last_p = var + length -1;
 
671
    if (*last_p == '\n')
 
672
      *last_p = 0; // if we stopped on newline, kill it
 
673
    else
 
674
    {
 
675
      /*
 
676
        If we truncated a line or stopped on last char, remove all chars
 
677
        up to and including newline.
 
678
      */
 
679
      int32_t c;
 
680
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
 
681
    }
 
682
    return(0);
 
683
  }
 
684
  else if (default_val)
 
685
  {
 
686
    strmake(var,  default_val, max_size-1);
 
687
    return(0);
 
688
  }
 
689
  return(1);
 
690
}
 
691
 
 
692
 
 
693
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
 
694
{
 
695
  char buf[32];
 
696
 
 
697
 
 
698
  if (my_b_gets(f, buf, sizeof(buf)))
 
699
  {
 
700
    *var = atoi(buf);
 
701
    return(0);
 
702
  }
 
703
  else if (default_val)
 
704
  {
 
705
    *var = default_val;
 
706
    return(0);
 
707
  }
 
708
  return(1);
 
709
}
 
710
 
 
711
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
 
712
{
 
713
  char buf[16];
 
714
 
 
715
 
 
716
  if (my_b_gets(f, buf, sizeof(buf)))
 
717
  {
 
718
    if (sscanf(buf, "%f", var) != 1)
 
719
      return(1);
 
720
    else
 
721
      return(0);
 
722
  }
 
723
  else if (default_val != 0.0)
 
724
  {
 
725
    *var = default_val;
 
726
    return(0);
 
727
  }
 
728
  return(1);
 
729
}
 
730
 
 
731
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
 
732
{
 
733
  if (io_slave_killed(thd, mi))
 
734
  {
 
735
    if (info && global_system_variables.log_warnings)
 
736
      sql_print_information(info);
 
737
    return true;
 
738
  }
 
739
  return false;
 
740
}
 
741
 
 
742
 
 
743
/*
 
744
  Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
 
745
  relying on the binlog's version. This is not perfect: imagine an upgrade
 
746
  of the master without waiting that all slaves are in sync with the master;
 
747
  then a slave could be fooled about the binlog's format. This is what happens
 
748
  when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
 
749
  slaves are fooled. So we do this only to distinguish between 3.23 and more
 
750
  recent masters (it's too late to change things for 3.23).
 
751
 
 
752
  RETURNS
 
753
  0       ok
 
754
  1       error
 
755
*/
 
756
 
 
757
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
 
758
{
 
759
  char error_buf[512];
 
760
  String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
 
761
  char err_buff[MAX_SLAVE_ERRMSG];
 
762
  const char* errmsg= 0;
 
763
  int32_t err_code= 0;
 
764
  DRIZZLE_RES *master_res= 0;
 
765
  DRIZZLE_ROW master_row;
 
766
 
 
767
  err_msg.length(0);
 
768
  /*
 
769
    Free old description_event_for_queue (that is needed if we are in
 
770
    a reconnection).
 
771
  */
 
772
  delete mi->rli.relay_log.description_event_for_queue;
 
773
  mi->rli.relay_log.description_event_for_queue= 0;
 
774
 
 
775
  if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
 
776
  {
 
777
    errmsg = _("Master reported unrecognized DRIZZLE version");
 
778
    err_code= ER_SLAVE_FATAL_ERROR;
 
779
    sprintf(err_buff, ER(err_code), errmsg);
 
780
    err_msg.append(err_buff);
 
781
  }
 
782
  else
 
783
  {
 
784
    /*
 
785
      Note the following switch will bug when we have DRIZZLE branch 30 ;)
 
786
    */
 
787
    switch (*drizzle->server_version)
 
788
    {
 
789
    case '0':
 
790
    case '1':
 
791
    case '2':
 
792
      errmsg = _("Master reported unrecognized DRIZZLE version");
 
793
      err_code= ER_SLAVE_FATAL_ERROR;
 
794
      sprintf(err_buff, ER(err_code), errmsg);
 
795
      err_msg.append(err_buff);
 
796
      break;
 
797
    case '3':
 
798
      mi->rli.relay_log.description_event_for_queue= new
 
799
        Format_description_log_event(1, drizzle->server_version);
 
800
      break;
 
801
    case '4':
 
802
      mi->rli.relay_log.description_event_for_queue= new
 
803
        Format_description_log_event(3, drizzle->server_version);
 
804
      break;
 
805
    default:
 
806
      /*
 
807
        Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
 
808
        take the early steps (like tests for "is this a 3.23 master") which we
 
809
        have to take before we receive the real master's Format_desc which will
 
810
        override this one. Note that the Format_desc we create below is garbage
 
811
        (it has the format of the *slave*); it's only good to help know if the
 
812
        master is 3.23, 4.0, etc.
 
813
      */
 
814
      mi->rli.relay_log.description_event_for_queue= new
 
815
        Format_description_log_event(4, drizzle->server_version);
 
816
      break;
 
817
    }
 
818
  }
 
819
 
 
820
  /*
 
821
     This does not mean that a 5.0 slave will be able to read a 6.0 master; but
 
822
     as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
 
823
     can't read a 6.0 master, this will show up when the slave can't read some
 
824
     events sent by the master, and there will be error messages.
 
825
  */
 
826
 
 
827
  if (err_msg.length() != 0)
 
828
    goto err;
 
829
 
 
830
  /* as we are here, we tried to allocate the event */
 
831
  if (!mi->rli.relay_log.description_event_for_queue)
 
832
  {
 
833
    errmsg= _("default Format_description_log_event");
 
834
    err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
 
835
    sprintf(err_buff, ER(err_code), errmsg);
 
836
    err_msg.append(err_buff);
 
837
    goto err;
 
838
  }
 
839
 
 
840
  /*
 
841
    Compare the master and slave's clock. Do not die if master's clock is
 
842
    unavailable (very old master not supporting UNIX_TIMESTAMP()?).
 
843
  */
 
844
 
 
845
  if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
 
846
      (master_res= drizzle_store_result(drizzle)) &&
 
847
      (master_row= drizzle_fetch_row(master_res)))
 
848
  {
 
849
    mi->clock_diff_with_master=
 
850
      (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
 
851
  }
 
852
  else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
 
853
  {
 
854
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
 
855
    sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
 
856
                        "do not trust column Seconds_Behind_Master of SHOW "
 
857
                        "SLAVE STATUS. Error: %s (%d)"),
 
858
                      drizzle_error(drizzle), drizzle_errno(drizzle));
 
859
  }
 
860
  if (master_res)
 
861
    drizzle_free_result(master_res);
 
862
 
 
863
  /*
 
864
    Check that the master's server id and ours are different. Because if they
 
865
    are equal (which can result from a simple copy of master's datadir to slave,
 
866
    thus copying some my.cnf), replication will work but all events will be
 
867
    skipped.
 
868
    Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
 
869
    master?).
 
870
    Note: we could have put a @@SERVER_ID in the previous SELECT
 
871
    UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
 
872
  */
 
873
  if (!drizzle_real_query(drizzle,
 
874
                        STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
 
875
      (master_res= drizzle_store_result(drizzle)))
 
876
  {
 
877
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
878
        (::server_id == strtoul(master_row[1], 0, 10)) &&
 
879
        !mi->rli.replicate_same_server_id)
 
880
    {
 
881
      errmsg=
 
882
        _("The slave I/O thread stops because master and slave have equal "
 
883
          "DRIZZLE server ids; these ids must be different "
 
884
          "for replication to work (or "
 
885
          "the --replicate-same-server-id option must be used "
 
886
          "on slave but this does"
 
887
          "not always make sense; please check the manual before using it).");
 
888
      err_code= ER_SLAVE_FATAL_ERROR;
 
889
      sprintf(err_buff, ER(err_code), errmsg);
 
890
      err_msg.append(err_buff);
 
891
    }
 
892
    drizzle_free_result(master_res);
 
893
    if (errmsg)
 
894
      goto err;
 
895
  }
 
896
 
 
897
  /*
 
898
    Check that the master's global character_set_server and ours are the same.
 
899
    Not fatal if query fails (old master?).
 
900
    Note that we don't check for equality of global character_set_client and
 
901
    collation_connection (neither do we prevent their setting in
 
902
    set_var.cc). That's because from what I (Guilhem) have tested, the global
 
903
    values of these 2 are never used (new connections don't use them).
 
904
    We don't test equality of global collation_database either as it's is
 
905
    going to be deprecated (made read-only) in 4.1 very soon.
 
906
    The test is only relevant if master < 5.0.3 (we'll test only if it's older
 
907
    than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
 
908
    charset info in each binlog event.
 
909
    We don't do it for 3.23 because masters <3.23.50 hang on
 
910
    SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
 
911
    test only if master is 4.x.
 
912
  */
 
913
 
 
914
  /* redundant with rest of code but safer against later additions */
 
915
  if (*drizzle->server_version == '3')
 
916
    goto err;
 
917
 
 
918
  if ((*drizzle->server_version == '4') &&
 
919
      !drizzle_real_query(drizzle,
 
920
                        STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
 
921
      (master_res= drizzle_store_result(drizzle)))
 
922
  {
 
923
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
924
        strcmp(master_row[0], global_system_variables.collation_server->name))
 
925
    {
 
926
      errmsg=
 
927
        _("The slave I/O thread stops because master and slave have"
 
928
          " different values for the COLLATION_SERVER global variable."
 
929
          " The values must be equal for replication to work");
 
930
      err_code= ER_SLAVE_FATAL_ERROR;
 
931
      sprintf(err_buff, ER(err_code), errmsg);
 
932
      err_msg.append(err_buff);
 
933
    }
 
934
    drizzle_free_result(master_res);
 
935
    if (errmsg)
 
936
      goto err;
 
937
  }
 
938
 
 
939
  /*
 
940
    Perform analogous check for time zone. Theoretically we also should
 
941
    perform check here to verify that SYSTEM time zones are the same on
 
942
    slave and master, but we can't rely on value of @@system_time_zone
 
943
    variable (it is time zone abbreviation) since it determined at start
 
944
    time and so could differ for slave and master even if they are really
 
945
    in the same system time zone. So we are omiting this check and just
 
946
    relying on documentation. Also according to Monty there are many users
 
947
    who are using replication between servers in various time zones. Hence
 
948
    such check will broke everything for them. (And now everything will
 
949
    work for them because by default both their master and slave will have
 
950
    'SYSTEM' time zone).
 
951
    This check is only necessary for 4.x masters (and < 5.0.4 masters but
 
952
    those were alpha).
 
953
  */
 
954
  if ((*drizzle->server_version == '4') &&
 
955
      !drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
 
956
      (master_res= drizzle_store_result(drizzle)))
 
957
  {
 
958
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
959
        strcmp(master_row[0],
 
960
               global_system_variables.time_zone->get_name()->ptr()))
 
961
    {
 
962
      errmsg=
 
963
        _("The slave I/O thread stops because master and slave have"
 
964
          " different values for the TIME_ZONE global variable."
 
965
          " The values must be equal for replication to work");
 
966
      err_code= ER_SLAVE_FATAL_ERROR;
 
967
      sprintf(err_buff, ER(err_code), errmsg);
 
968
      err_msg.append(err_buff);
 
969
    }
 
970
    drizzle_free_result(master_res);
 
971
 
 
972
    if (errmsg)
 
973
      goto err;
 
974
  }
 
975
 
 
976
  if (mi->heartbeat_period != 0.0)
 
977
  {
 
978
    char llbuf[22];
 
979
    const char query_format[]= "SET @master_heartbeat_period= %s";
 
980
    char query[sizeof(query_format) - 2 + sizeof(llbuf)];
 
981
    /* 
 
982
       the period is an uint64_t of nano-secs. 
 
983
    */
 
984
    llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
 
985
    sprintf(query, query_format, llbuf);
 
986
 
 
987
    if (drizzle_real_query(drizzle, query, strlen(query))
 
988
        && !check_io_slave_killed(mi->io_thd, mi, NULL))
 
989
    {
 
990
      err_msg.append("The slave I/O thread stops because querying master with '");
 
991
      err_msg.append(query);
 
992
      err_msg.append("' failed;");
 
993
      err_msg.append(" error: ");
 
994
      err_code= drizzle_errno(drizzle);
 
995
      err_msg.qs_append(err_code);
 
996
      err_msg.append("  '");
 
997
      err_msg.append(drizzle_error(drizzle));
 
998
      err_msg.append("'");
 
999
      drizzle_free_result(drizzle_store_result(drizzle));
 
1000
      goto err;
 
1001
    }
 
1002
    drizzle_free_result(drizzle_store_result(drizzle));
 
1003
  }
 
1004
  
 
1005
err:
 
1006
  if (err_msg.length() != 0)
 
1007
  {
 
1008
    sql_print_error(err_msg.ptr());
 
1009
    assert(err_code != 0);
 
1010
    mi->report(ERROR_LEVEL, err_code, err_msg.ptr());
 
1011
    return(1);
 
1012
  }
 
1013
 
 
1014
  return(0);
 
1015
}
 
1016
 
 
1017
 
 
1018
static bool wait_for_relay_log_space(Relay_log_info* rli)
 
1019
{
 
1020
  bool slave_killed=0;
 
1021
  Master_info* mi = rli->mi;
 
1022
  const char *save_proc_info;
 
1023
  THD* thd = mi->io_thd;
 
1024
 
 
1025
  pthread_mutex_lock(&rli->log_space_lock);
 
1026
  save_proc_info= thd->enter_cond(&rli->log_space_cond,
 
1027
                                  &rli->log_space_lock,
 
1028
                                  _("Waiting for the slave SQL thread "
 
1029
                                    "to free enough relay log space"));
 
1030
  while (rli->log_space_limit < rli->log_space_total &&
 
1031
         !(slave_killed=io_slave_killed(thd,mi)) &&
 
1032
         !rli->ignore_log_space_limit)
 
1033
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
 
1034
  thd->exit_cond(save_proc_info);
 
1035
  return(slave_killed);
 
1036
}
 
1037
 
 
1038
 
 
1039
/*
 
1040
  Builds a Rotate from the ignored events' info and writes it to relay log.
 
1041
 
 
1042
  SYNOPSIS
 
1043
  write_ignored_events_info_to_relay_log()
 
1044
    thd             pointer to I/O thread's thd
 
1045
    mi
 
1046
 
 
1047
  DESCRIPTION
 
1048
    Slave I/O thread, going to die, must leave a durable trace of the
 
1049
    ignored events' end position for the use of the slave SQL thread, by
 
1050
    calling this function. Only that thread can call it (see assertion).
 
1051
 */
 
1052
static void write_ignored_events_info_to_relay_log(THD *thd __attribute__((unused)),
 
1053
                                                   Master_info *mi)
 
1054
{
 
1055
  Relay_log_info *rli= &mi->rli;
 
1056
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
 
1057
 
 
1058
  assert(thd == mi->io_thd);
 
1059
  pthread_mutex_lock(log_lock);
 
1060
  if (rli->ign_master_log_name_end[0])
 
1061
  {
 
1062
    Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
 
1063
                                               0, rli->ign_master_log_pos_end,
 
1064
                                               Rotate_log_event::DUP_NAME);
 
1065
    rli->ign_master_log_name_end[0]= 0;
 
1066
    /* can unlock before writing as slave SQL thd will soon see our Rotate */
 
1067
    pthread_mutex_unlock(log_lock);
 
1068
    if (likely((bool)ev))
 
1069
    {
 
1070
      ev->server_id= 0; // don't be ignored by slave SQL thread
 
1071
      if (unlikely(rli->relay_log.append(ev)))
 
1072
        mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
1073
                   ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
1074
                   _("failed to write a Rotate event"
 
1075
                     " to the relay log, SHOW SLAVE STATUS may be"
 
1076
                     " inaccurate"));
 
1077
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
1078
      if (flush_master_info(mi, 1))
 
1079
        sql_print_error(_("Failed to flush master info file"));
 
1080
      delete ev;
 
1081
    }
 
1082
    else
 
1083
      mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
 
1084
                 ER(ER_SLAVE_CREATE_EVENT_FAILURE),
 
1085
                 _("Rotate_event (out of memory?),"
 
1086
                   " SHOW SLAVE STATUS may be inaccurate"));
 
1087
  }
 
1088
  else
 
1089
    pthread_mutex_unlock(log_lock);
 
1090
  return;
 
1091
}
 
1092
 
 
1093
 
 
1094
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
 
1095
                             bool *suppress_warnings)
 
1096
{
 
1097
  uchar buf[1024], *pos= buf;
 
1098
  uint32_t report_host_len, report_user_len=0, report_password_len=0;
 
1099
 
 
1100
  *suppress_warnings= false;
 
1101
  if (!report_host)
 
1102
    return(0);
 
1103
  report_host_len= strlen(report_host);
 
1104
  if (report_user)
 
1105
    report_user_len= strlen(report_user);
 
1106
  if (report_password)
 
1107
    report_password_len= strlen(report_password);
 
1108
  /* 30 is a good safety margin */
 
1109
  if (report_host_len + report_user_len + report_password_len + 30 >
 
1110
      sizeof(buf))
 
1111
    return(0);                                     // safety
 
1112
 
 
1113
  int4store(pos, server_id); pos+= 4;
 
1114
  pos= net_store_data(pos, (uchar*) report_host, report_host_len);
 
1115
  pos= net_store_data(pos, (uchar*) report_user, report_user_len);
 
1116
  pos= net_store_data(pos, (uchar*) report_password, report_password_len);
 
1117
  int2store(pos, (uint16_t) report_port); pos+= 2;
 
1118
  int4store(pos, rpl_recovery_rank);    pos+= 4;
 
1119
  /* The master will fill in master_id */
 
1120
  int4store(pos, 0);                    pos+= 4;
 
1121
 
 
1122
  if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
 
1123
  {
 
1124
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1125
    {
 
1126
      *suppress_warnings= true;                 // Suppress reconnect warning
 
1127
    }
 
1128
    else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
 
1129
    {
 
1130
      char buf[256];
 
1131
      snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle), 
 
1132
               drizzle_errno(drizzle));
 
1133
      mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
 
1134
                 ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
 
1135
    }
 
1136
    return(1);
 
1137
  }
 
1138
  return(0);
 
1139
}
 
1140
 
 
1141
 
 
1142
bool show_master_info(THD* thd, Master_info* mi)
 
1143
{
 
1144
  // TODO: fix this for multi-master
 
1145
  List<Item> field_list;
 
1146
  Protocol *protocol= thd->protocol;
 
1147
 
 
1148
  field_list.push_back(new Item_empty_string("Slave_IO_State",
 
1149
                                                     14));
 
1150
  field_list.push_back(new Item_empty_string("Master_Host",
 
1151
                                                     sizeof(mi->host)));
 
1152
  field_list.push_back(new Item_empty_string("Master_User",
 
1153
                                                     sizeof(mi->user)));
 
1154
  field_list.push_back(new Item_return_int("Master_Port", 7,
 
1155
                                           DRIZZLE_TYPE_LONG));
 
1156
  field_list.push_back(new Item_return_int("Connect_Retry", 10,
 
1157
                                           DRIZZLE_TYPE_LONG));
 
1158
  field_list.push_back(new Item_empty_string("Master_Log_File",
 
1159
                                             FN_REFLEN));
 
1160
  field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
 
1161
                                           DRIZZLE_TYPE_LONGLONG));
 
1162
  field_list.push_back(new Item_empty_string("Relay_Log_File",
 
1163
                                             FN_REFLEN));
 
1164
  field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
 
1165
                                           DRIZZLE_TYPE_LONGLONG));
 
1166
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
 
1167
                                             FN_REFLEN));
 
1168
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
 
1169
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
 
1170
  field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
 
1171
  field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
 
1172
  field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
 
1173
  field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
 
1174
  field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
 
1175
  field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
 
1176
                                             28));
 
1177
  field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
 
1178
  field_list.push_back(new Item_empty_string("Last_Error", 20));
 
1179
  field_list.push_back(new Item_return_int("Skip_Counter", 10,
 
1180
                                           DRIZZLE_TYPE_LONG));
 
1181
  field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
 
1182
                                           DRIZZLE_TYPE_LONGLONG));
 
1183
  field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
 
1184
                                           DRIZZLE_TYPE_LONGLONG));
 
1185
  field_list.push_back(new Item_empty_string("Until_Condition", 6));
 
1186
  field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
 
1187
  field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
 
1188
                                           DRIZZLE_TYPE_LONGLONG));
 
1189
  field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
 
1190
  field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
 
1191
                                             sizeof(mi->ssl_ca)));
 
1192
  field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
 
1193
                                             sizeof(mi->ssl_capath)));
 
1194
  field_list.push_back(new Item_empty_string("Master_SSL_Cert",
 
1195
                                             sizeof(mi->ssl_cert)));
 
1196
  field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
 
1197
                                             sizeof(mi->ssl_cipher)));
 
1198
  field_list.push_back(new Item_empty_string("Master_SSL_Key",
 
1199
                                             sizeof(mi->ssl_key)));
 
1200
  field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
 
1201
                                           DRIZZLE_TYPE_LONGLONG));
 
1202
  field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
 
1203
                                             3));
 
1204
  field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
 
1205
  field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
 
1206
  field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
 
1207
  field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
 
1208
 
 
1209
  if (protocol->send_fields(&field_list,
 
1210
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1211
    return(true);
 
1212
 
 
1213
  if (mi->host[0])
 
1214
  {
 
1215
    String *packet= &thd->packet;
 
1216
    protocol->prepare_for_resend();
 
1217
 
 
1218
    /*
 
1219
      slave_running can be accessed without run_lock but not other
 
1220
      non-volotile members like mi->io_thd, which is guarded by the mutex.
 
1221
    */
 
1222
    pthread_mutex_lock(&mi->run_lock);
 
1223
    protocol->store(mi->io_thd ? mi->io_thd->get_proc_info() : "", &my_charset_bin);
 
1224
    pthread_mutex_unlock(&mi->run_lock);
 
1225
 
 
1226
    pthread_mutex_lock(&mi->data_lock);
 
1227
    pthread_mutex_lock(&mi->rli.data_lock);
 
1228
    protocol->store(mi->host, &my_charset_bin);
 
1229
    protocol->store(mi->user, &my_charset_bin);
 
1230
    protocol->store((uint32_t) mi->port);
 
1231
    protocol->store((uint32_t) mi->connect_retry);
 
1232
    protocol->store(mi->master_log_name, &my_charset_bin);
 
1233
    protocol->store((uint64_t) mi->master_log_pos);
 
1234
    protocol->store(mi->rli.group_relay_log_name +
 
1235
                    dirname_length(mi->rli.group_relay_log_name),
 
1236
                    &my_charset_bin);
 
1237
    protocol->store((uint64_t) mi->rli.group_relay_log_pos);
 
1238
    protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
 
1239
    protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
 
1240
                    "Yes" : "No", &my_charset_bin);
 
1241
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
 
1242
    protocol->store(rpl_filter->get_do_db());
 
1243
    protocol->store(rpl_filter->get_ignore_db());
 
1244
 
 
1245
    char buf[256];
 
1246
    String tmp(buf, sizeof(buf), &my_charset_bin);
 
1247
    rpl_filter->get_do_table(&tmp);
 
1248
    protocol->store(&tmp);
 
1249
    rpl_filter->get_ignore_table(&tmp);
 
1250
    protocol->store(&tmp);
 
1251
    rpl_filter->get_wild_do_table(&tmp);
 
1252
    protocol->store(&tmp);
 
1253
    rpl_filter->get_wild_ignore_table(&tmp);
 
1254
    protocol->store(&tmp);
 
1255
 
 
1256
    protocol->store(mi->rli.last_error().number);
 
1257
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
 
1258
    protocol->store((uint32_t) mi->rli.slave_skip_counter);
 
1259
    protocol->store((uint64_t) mi->rli.group_master_log_pos);
 
1260
    protocol->store((uint64_t) mi->rli.log_space_total);
 
1261
 
 
1262
    protocol->store(
 
1263
      mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
 
1264
        ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
 
1265
          "Relay"), &my_charset_bin);
 
1266
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
 
1267
    protocol->store((uint64_t) mi->rli.until_log_pos);
 
1268
 
 
1269
    protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
 
1270
    protocol->store(mi->ssl_ca, &my_charset_bin);
 
1271
    protocol->store(mi->ssl_capath, &my_charset_bin);
 
1272
    protocol->store(mi->ssl_cert, &my_charset_bin);
 
1273
    protocol->store(mi->ssl_cipher, &my_charset_bin);
 
1274
    protocol->store(mi->ssl_key, &my_charset_bin);
 
1275
 
 
1276
    /*
 
1277
      Seconds_Behind_Master: if SQL thread is running and I/O thread is
 
1278
      connected, we can compute it otherwise show NULL (i.e. unknown).
 
1279
    */
 
1280
    if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
 
1281
        mi->rli.slave_running)
 
1282
    {
 
1283
      long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
 
1284
                       - mi->clock_diff_with_master);
 
1285
      /*
 
1286
        Apparently on some systems time_diff can be <0. Here are possible
 
1287
        reasons related to MySQL:
 
1288
        - the master is itself a slave of another master whose time is ahead.
 
1289
        - somebody used an explicit SET TIMESTAMP on the master.
 
1290
        Possible reason related to granularity-to-second of time functions
 
1291
        (nothing to do with MySQL), which can explain a value of -1:
 
1292
        assume the master's and slave's time are perfectly synchronized, and
 
1293
        that at slave's connection time, when the master's timestamp is read,
 
1294
        it is at the very end of second 1, and (a very short time later) when
 
1295
        the slave's timestamp is read it is at the very beginning of second
 
1296
        2. Then the recorded value for master is 1 and the recorded value for
 
1297
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
 
1298
        between timestamp of slave and rli->last_master_timestamp is 0
 
1299
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
 
1300
        This confuses users, so we don't go below 0: hence the max().
 
1301
 
 
1302
        last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
 
1303
        special marker to say "consider we have caught up".
 
1304
      */
 
1305
      protocol->store((int64_t)(mi->rli.last_master_timestamp ?
 
1306
                                 max((long)0, time_diff) : 0));
 
1307
    }
 
1308
    else
 
1309
    {
 
1310
      protocol->store_null();
 
1311
    }
 
1312
    protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
 
1313
 
 
1314
    // Last_IO_Errno
 
1315
    protocol->store(mi->last_error().number);
 
1316
    // Last_IO_Error
 
1317
    protocol->store(mi->last_error().message, &my_charset_bin);
 
1318
    // Last_SQL_Errno
 
1319
    protocol->store(mi->rli.last_error().number);
 
1320
    // Last_SQL_Error
 
1321
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
 
1322
 
 
1323
    pthread_mutex_unlock(&mi->rli.data_lock);
 
1324
    pthread_mutex_unlock(&mi->data_lock);
 
1325
 
 
1326
    if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
 
1327
      return(true);
 
1328
  }
 
1329
  my_eof(thd);
 
1330
  return(false);
 
1331
}
 
1332
 
 
1333
 
 
1334
void set_slave_thread_options(THD* thd)
 
1335
{
 
1336
  /*
 
1337
     It's nonsense to constrain the slave threads with max_join_size; if a
 
1338
     query succeeded on master, we HAVE to execute it. So set
 
1339
     OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
 
1340
     (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
 
1341
     SELECT examining more than 4 billion rows would still fail (yes, because
 
1342
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
 
1343
     only for client threads.
 
1344
  */
 
1345
  uint64_t options= thd->options | OPTION_BIG_SELECTS;
 
1346
  if (opt_log_slave_updates)
 
1347
    options|= OPTION_BIN_LOG;
 
1348
  else
 
1349
    options&= ~OPTION_BIN_LOG;
 
1350
  thd->options= options;
 
1351
  thd->variables.completion_type= 0;
 
1352
  return;
 
1353
}
 
1354
 
 
1355
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
 
1356
{
 
1357
  thd->variables.character_set_client=
 
1358
    global_system_variables.character_set_client;
 
1359
  thd->variables.collation_connection=
 
1360
    global_system_variables.collation_connection;
 
1361
  thd->variables.collation_server=
 
1362
    global_system_variables.collation_server;
 
1363
  thd->update_charset();
 
1364
 
 
1365
  /*
 
1366
    We use a const cast here since the conceptual (and externally
 
1367
    visible) behavior of the function is to set the default charset of
 
1368
    the thread.  That the cache has to be invalidated is a secondary
 
1369
    effect.
 
1370
   */
 
1371
  const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
 
1372
  return;
 
1373
}
 
1374
 
 
1375
/*
 
1376
  init_slave_thread()
 
1377
*/
 
1378
 
 
1379
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
 
1380
{
 
1381
  int32_t simulate_error= 0;
 
1382
  thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
 
1383
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
 
1384
  thd->security_ctx->skip_grants();
 
1385
  my_net_init(&thd->net, 0);
 
1386
/*
 
1387
  Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
 
1388
  slave threads, since a replication event can become this much larger
 
1389
  than the corresponding packet (query) sent from client to master.
 
1390
*/
 
1391
  thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
 
1392
    + MAX_LOG_EVENT_HEADER;  /* note, incr over the global not session var */
 
1393
  thd->slave_thread = 1;
 
1394
  thd->enable_slow_log= opt_log_slow_slave_statements;
 
1395
  set_slave_thread_options(thd);
 
1396
  thd->client_capabilities = CLIENT_LOCAL_FILES;
 
1397
  pthread_mutex_lock(&LOCK_thread_count);
 
1398
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
 
1399
  pthread_mutex_unlock(&LOCK_thread_count);
 
1400
 
 
1401
 simulate_error|= (1 << SLAVE_THD_IO);
 
1402
 simulate_error|= (1 << SLAVE_THD_SQL);
 
1403
  if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
 
1404
  {
 
1405
    thd->cleanup();
 
1406
    return(-1);
 
1407
  }
 
1408
  lex_start(thd);
 
1409
 
 
1410
  if (thd_type == SLAVE_THD_SQL)
 
1411
    thd->set_proc_info("Waiting for the next event in relay log");
 
1412
  else
 
1413
    thd->set_proc_info("Waiting for master update");
 
1414
  thd->version=refresh_version;
 
1415
  thd->set_time();
 
1416
  return(0);
 
1417
}
 
1418
 
 
1419
 
 
1420
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
1421
                      void* thread_killed_arg)
 
1422
{
 
1423
  int32_t nap_time;
 
1424
  thr_alarm_t alarmed;
 
1425
 
 
1426
  thr_alarm_init(&alarmed);
 
1427
  time_t start_time= my_time(0);
 
1428
  time_t end_time= start_time+sec;
 
1429
 
 
1430
  while ((nap_time= (int32_t) (end_time - start_time)) > 0)
 
1431
  {
 
1432
    ALARM alarm_buff;
 
1433
    /*
 
1434
      The only reason we are asking for alarm is so that
 
1435
      we will be woken up in case of murder, so if we do not get killed,
 
1436
      set the alarm so it goes off after we wake up naturally
 
1437
    */
 
1438
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
 
1439
    sleep(nap_time);
 
1440
    thr_end_alarm(&alarmed);
 
1441
 
 
1442
    if ((*thread_killed)(thd,thread_killed_arg))
 
1443
      return(1);
 
1444
    start_time= my_time(0);
 
1445
  }
 
1446
  return(0);
 
1447
}
 
1448
 
 
1449
 
 
1450
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
 
1451
                        bool *suppress_warnings)
 
1452
{
 
1453
  uchar buf[FN_REFLEN + 10];
 
1454
  int32_t len;
 
1455
  int32_t binlog_flags = 0; // for now
 
1456
  char* logname = mi->master_log_name;
 
1457
  
 
1458
  *suppress_warnings= false;
 
1459
 
 
1460
  // TODO if big log files: Change next to int8store()
 
1461
  int4store(buf, (uint32_t) mi->master_log_pos);
 
1462
  int2store(buf + 4, binlog_flags);
 
1463
  int4store(buf + 6, server_id);
 
1464
  len = (uint32_t) strlen(logname);
 
1465
  memcpy(buf + 10, logname,len);
 
1466
  if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
 
1467
  {
 
1468
    /*
 
1469
      Something went wrong, so we will just reconnect and retry later
 
1470
      in the future, we should do a better error analysis, but for
 
1471
      now we just fill up the error log :-)
 
1472
    */
 
1473
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1474
      *suppress_warnings= true;                 // Suppress reconnect warning
 
1475
    else
 
1476
      sql_print_error(_("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs"),
 
1477
                      drizzle_errno(drizzle), drizzle_error(drizzle),
 
1478
                      mi->connect_retry);
 
1479
    return(1);
 
1480
  }
 
1481
 
 
1482
  return(0);
 
1483
}
 
1484
 
 
1485
/*
 
1486
  Read one event from the master
 
1487
 
 
1488
  SYNOPSIS
 
1489
    read_event()
 
1490
    DRIZZLE               DRIZZLE connection
 
1491
    mi                  Master connection information
 
1492
    suppress_warnings   TRUE when a normal net read timeout has caused us to
 
1493
                        try a reconnect.  We do not want to print anything to
 
1494
                        the error log in this case because this a anormal
 
1495
                        event in an idle server.
 
1496
 
 
1497
    RETURN VALUES
 
1498
    'packet_error'      Error
 
1499
    number              Length of packet
 
1500
*/
 
1501
 
 
1502
static uint32_t read_event(DRIZZLE *drizzle,
 
1503
                        Master_info *mi __attribute__((unused)),
 
1504
                        bool* suppress_warnings)
 
1505
{
 
1506
  uint32_t len;
 
1507
 
 
1508
  *suppress_warnings= false;
 
1509
  /*
 
1510
    my_real_read() will time us out
 
1511
    We check if we were told to die, and if not, try reading again
 
1512
  */
 
1513
  if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
 
1514
    return(packet_error);
 
1515
 
 
1516
  len = cli_safe_read(drizzle);
 
1517
  if (len == packet_error || (int32_t) len < 1)
 
1518
  {
 
1519
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1520
    {
 
1521
      /*
 
1522
        We are trying a normal reconnect after a read timeout;
 
1523
        we suppress prints to .err file as long as the reconnect
 
1524
        happens without problems
 
1525
      */
 
1526
      *suppress_warnings= true;
 
1527
    }
 
1528
    else
 
1529
      sql_print_error(_("Error reading packet from server: %s ( server_errno=%d)"),
 
1530
                      drizzle_error(drizzle), drizzle_errno(drizzle));
 
1531
    return(packet_error);
 
1532
  }
 
1533
 
 
1534
  /* Check if eof packet */
 
1535
  if (len < 8 && drizzle->net.read_pos[0] == 254)
 
1536
  {
 
1537
    sql_print_information(_("Slave: received end packet from server, apparent "
 
1538
                            "master shutdown: %s"),
 
1539
                     drizzle_error(drizzle));
 
1540
     return(packet_error);
 
1541
  }
 
1542
 
 
1543
  return(len - 1);
 
1544
}
 
1545
 
 
1546
 
 
1547
int32_t check_expected_error(THD* thd __attribute__((unused)),
 
1548
                         Relay_log_info const *rli __attribute__((unused)),
 
1549
                         int32_t expected_error)
 
1550
{
 
1551
  switch (expected_error) {
 
1552
  case ER_NET_READ_ERROR:
 
1553
  case ER_NET_ERROR_ON_WRITE:
 
1554
  case ER_QUERY_INTERRUPTED:
 
1555
  case ER_SERVER_SHUTDOWN:
 
1556
  case ER_NEW_ABORTING_CONNECTION:
 
1557
    return(1);
 
1558
  default:
 
1559
    return(0);
 
1560
  }
 
1561
}
 
1562
 
 
1563
 
 
1564
/*
 
1565
  Check if the current error is of temporary nature of not.
 
1566
  Some errors are temporary in nature, such as
 
1567
  ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT.  Ndb also signals
 
1568
  that the error is temporary by pushing a warning with the error code
 
1569
  ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
 
1570
*/
 
1571
static int32_t has_temporary_error(THD *thd)
 
1572
{
 
1573
  if (thd->is_fatal_error)
 
1574
    return(0);
 
1575
 
 
1576
  if (thd->main_da.is_error())
 
1577
  {
 
1578
    thd->clear_error();
 
1579
    my_error(ER_LOCK_DEADLOCK, MYF(0));
 
1580
  }
 
1581
 
 
1582
  /*
 
1583
    If there is no message in THD, we can't say if it's a temporary
 
1584
    error or not. This is currently the case for Incident_log_event,
 
1585
    which sets no message. Return FALSE.
 
1586
  */
 
1587
  if (!thd->is_error())
 
1588
    return(0);
 
1589
 
 
1590
  /*
 
1591
    Temporary error codes:
 
1592
    currently, InnoDB deadlock detected by InnoDB or lock
 
1593
    wait timeout (innodb_lock_wait_timeout exceeded
 
1594
  */
 
1595
  if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
 
1596
      thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
 
1597
    return(1);
 
1598
 
 
1599
  return(0);
 
1600
}
 
1601
 
 
1602
 
 
1603
/**
 
1604
  Applies the given event and advances the relay log position.
 
1605
 
 
1606
  In essence, this function does:
 
1607
 
 
1608
  @code
 
1609
    ev->apply_event(rli);
 
1610
    ev->update_pos(rli);
 
1611
  @endcode
 
1612
 
 
1613
  But it also does some maintainance, such as skipping events if
 
1614
  needed and reporting errors.
 
1615
 
 
1616
  If the @c skip flag is set, then it is tested whether the event
 
1617
  should be skipped, by looking at the slave_skip_counter and the
 
1618
  server id.  The skip flag should be set when calling this from a
 
1619
  replication thread but not set when executing an explicit BINLOG
 
1620
  statement.
 
1621
 
 
1622
  @retval 0 OK.
 
1623
 
 
1624
  @retval 1 Error calling ev->apply_event().
 
1625
 
 
1626
  @retval 2 No error calling ev->apply_event(), but error calling
 
1627
  ev->update_pos().
 
1628
*/
 
1629
int32_t apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
 
1630
                               bool skip)
 
1631
{
 
1632
  int32_t exec_res= 0;
 
1633
 
 
1634
  /*
 
1635
    Execute the event to change the database and update the binary
 
1636
    log coordinates, but first we set some data that is needed for
 
1637
    the thread.
 
1638
 
 
1639
    The event will be executed unless it is supposed to be skipped.
 
1640
 
 
1641
    Queries originating from this server must be skipped.  Low-level
 
1642
    events (Format_description_log_event, Rotate_log_event,
 
1643
    Stop_log_event) from this server must also be skipped. But for
 
1644
    those we don't want to modify 'group_master_log_pos', because
 
1645
    these events did not exist on the master.
 
1646
    Format_description_log_event is not completely skipped.
 
1647
 
 
1648
    Skip queries specified by the user in 'slave_skip_counter'.  We
 
1649
    can't however skip events that has something to do with the log
 
1650
    files themselves.
 
1651
 
 
1652
    Filtering on own server id is extremely important, to ignore
 
1653
    execution of events created by the creation/rotation of the relay
 
1654
    log (remember that now the relay log starts with its Format_desc,
 
1655
    has a Rotate etc).
 
1656
  */
 
1657
 
 
1658
  thd->server_id = ev->server_id; // use the original server id for logging
 
1659
  thd->set_time();                            // time the query
 
1660
  thd->lex->current_select= 0;
 
1661
  if (!ev->when)
 
1662
    ev->when= my_time(0);
 
1663
  ev->thd = thd; // because up to this point, ev->thd == 0
 
1664
 
 
1665
  if (skip)
 
1666
  {
 
1667
    int32_t reason= ev->shall_skip(rli);
 
1668
    if (reason == Log_event::EVENT_SKIP_COUNT)
 
1669
      --rli->slave_skip_counter;
 
1670
    pthread_mutex_unlock(&rli->data_lock);
 
1671
    if (reason == Log_event::EVENT_SKIP_NOT)
 
1672
      exec_res= ev->apply_event(rli);
 
1673
  }
 
1674
  else
 
1675
    exec_res= ev->apply_event(rli);
 
1676
 
 
1677
  if (exec_res == 0)
 
1678
  {
 
1679
    int32_t error= ev->update_pos(rli);
 
1680
    /*
 
1681
      The update should not fail, so print an error message and
 
1682
      return an error code.
 
1683
 
 
1684
      TODO: Replace this with a decent error message when merged
 
1685
      with BUG#24954 (which adds several new error message).
 
1686
    */
 
1687
    if (error)
 
1688
    {
 
1689
      char buf[22];
 
1690
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
 
1691
                  _("It was not possible to update the positions"
 
1692
                  " of the relay log information: the slave may"
 
1693
                  " be in an inconsistent state."
 
1694
                  " Stopped in %s position %s"),
 
1695
                  rli->group_relay_log_name,
 
1696
                  llstr(rli->group_relay_log_pos, buf));
 
1697
      return(2);
 
1698
    }
 
1699
  }
 
1700
 
 
1701
  return(exec_res ? 1 : 0);
 
1702
}
 
1703
 
 
1704
 
 
1705
/**
 
1706
  Top-level function for executing the next event from the relay log.
 
1707
 
 
1708
  This function reads the event from the relay log, executes it, and
 
1709
  advances the relay log position.  It also handles errors, etc.
 
1710
 
 
1711
  This function may fail to apply the event for the following reasons:
 
1712
 
 
1713
   - The position specfied by the UNTIL condition of the START SLAVE
 
1714
     command is reached.
 
1715
 
 
1716
   - It was not possible to read the event from the log.
 
1717
 
 
1718
   - The slave is killed.
 
1719
 
 
1720
   - An error occurred when applying the event, and the event has been
 
1721
     tried slave_trans_retries times.  If the event has been retried
 
1722
     fewer times, 0 is returned.
 
1723
 
 
1724
   - init_master_info or init_relay_log_pos failed. (These are called
 
1725
     if a failure occurs when applying the event.)</li>
 
1726
 
 
1727
   - An error occurred when updating the binlog position.
 
1728
 
 
1729
  @retval 0 The event was applied.
 
1730
 
 
1731
  @retval 1 The event was not applied.
 
1732
*/
 
1733
static int32_t exec_relay_log_event(THD* thd, Relay_log_info* rli)
 
1734
{
 
1735
  /*
 
1736
     We acquire this mutex since we need it for all operations except
 
1737
     event execution. But we will release it in places where we will
 
1738
     wait for something for example inside of next_event().
 
1739
   */
 
1740
  pthread_mutex_lock(&rli->data_lock);
 
1741
 
 
1742
  Log_event * ev = next_event(rli);
 
1743
 
 
1744
  assert(rli->sql_thd==thd);
 
1745
 
 
1746
  if (sql_slave_killed(thd,rli))
 
1747
  {
 
1748
    pthread_mutex_unlock(&rli->data_lock);
 
1749
    delete ev;
 
1750
    return(1);
 
1751
  }
 
1752
  if (ev)
 
1753
  {
 
1754
    int32_t exec_res;
 
1755
 
 
1756
    /*
 
1757
      This tests if the position of the beginning of the current event
 
1758
      hits the UNTIL barrier.
 
1759
    */
 
1760
    if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
 
1761
        rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
 
1762
                                rli->group_master_log_pos :
 
1763
                                ev->log_pos - ev->data_written))
 
1764
    {
 
1765
      char buf[22];
 
1766
      sql_print_information(_("Slave SQL thread stopped because it reached its"
 
1767
                              " UNTIL position %s"),
 
1768
                            llstr(rli->until_pos(), buf));
 
1769
      /*
 
1770
        Setting abort_slave flag because we do not want additional message about
 
1771
        error in query execution to be printed.
 
1772
      */
 
1773
      rli->abort_slave= 1;
 
1774
      pthread_mutex_unlock(&rli->data_lock);
 
1775
      delete ev;
 
1776
      return(1);
 
1777
    }
 
1778
    exec_res= apply_event_and_update_pos(ev, thd, rli, true);
 
1779
 
 
1780
    /*
 
1781
      Format_description_log_event should not be deleted because it will be
 
1782
      used to read info about the relay log's format; it will be deleted when
 
1783
      the SQL thread does not need it, i.e. when this thread terminates.
 
1784
    */
 
1785
    if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
 
1786
    {
 
1787
      delete ev;
 
1788
    }
 
1789
 
 
1790
    /*
 
1791
      update_log_pos failed: this should not happen, so we don't
 
1792
      retry.
 
1793
    */
 
1794
    if (exec_res == 2)
 
1795
      return(1);
 
1796
 
 
1797
    if (slave_trans_retries)
 
1798
    {
 
1799
      int32_t temp_err= 0;
 
1800
      if (exec_res && (temp_err= has_temporary_error(thd)))
 
1801
      {
 
1802
        const char *errmsg;
 
1803
        /*
 
1804
          We were in a transaction which has been rolled back because of a
 
1805
          temporary error;
 
1806
          let's seek back to BEGIN log event and retry it all again.
 
1807
          Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
 
1808
          there is no rollback since 5.0.13 (ref: manual).
 
1809
          We have to not only seek but also
 
1810
          a) init_master_info(), to seek back to hot relay log's start for later
 
1811
          (for when we will come back to this hot log after re-processing the
 
1812
          possibly existing old logs where BEGIN is: check_binlog_magic() will
 
1813
          then need the cache to be at position 0 (see comments at beginning of
 
1814
          init_master_info()).
 
1815
          b) init_relay_log_pos(), because the BEGIN may be an older relay log.
 
1816
        */
 
1817
        if (rli->trans_retries < slave_trans_retries)
 
1818
        {
 
1819
          if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
 
1820
            sql_print_error(_("Failed to initialize the master info structure"));
 
1821
          else if (init_relay_log_pos(rli,
 
1822
                                      rli->group_relay_log_name,
 
1823
                                      rli->group_relay_log_pos,
 
1824
                                      1, &errmsg, 1))
 
1825
            sql_print_error(_("Error initializing relay log position: %s"),
 
1826
                            errmsg);
 
1827
          else
 
1828
          {
 
1829
            exec_res= 0;
 
1830
            end_trans(thd, ROLLBACK);
 
1831
            /* chance for concurrent connection to get more locks */
 
1832
            safe_sleep(thd, min(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
 
1833
                       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
 
1834
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
 
1835
            rli->trans_retries++;
 
1836
            rli->retried_trans++;
 
1837
            pthread_mutex_unlock(&rli->data_lock);
 
1838
          }
 
1839
        }
 
1840
        else
 
1841
          sql_print_error(_("Slave SQL thread retried transaction %lu time(s) "
 
1842
                            "in vain, giving up. Consider raising the value of "
 
1843
                            "the slave_transaction_retries variable."),
 
1844
                          slave_trans_retries);
 
1845
      }
 
1846
      else if ((exec_res && !temp_err) ||
 
1847
               (opt_using_transactions &&
 
1848
                rli->group_relay_log_pos == rli->event_relay_log_pos))
 
1849
      {
 
1850
        /*
 
1851
          Only reset the retry counter if the entire group succeeded
 
1852
          or failed with a non-transient error.  On a successful
 
1853
          event, the execution will proceed as usual; in the case of a
 
1854
          non-transient error, the slave will stop with an error.
 
1855
         */
 
1856
        rli->trans_retries= 0; // restart from fresh
 
1857
      }
 
1858
    }
 
1859
    return(exec_res);
 
1860
  }
 
1861
  pthread_mutex_unlock(&rli->data_lock);
 
1862
  rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
 
1863
              ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
 
1864
              _("Could not parse relay log event entry. The possible reasons "
 
1865
                "are: the master's binary log is corrupted (you can check this "
 
1866
                "by running 'mysqlbinlog' on the binary log), the slave's "
 
1867
                "relay log is corrupted (you can check this by running "
 
1868
                "'mysqlbinlog' on the relay log), a network problem, or a bug "
 
1869
                "in the master's or slave's DRIZZLE code. If you want to check "
 
1870
                "the master's binary log or slave's relay log, you will be "
 
1871
                "able to know their names by issuing 'SHOW SLAVE STATUS' "
 
1872
                "on this slave."));
 
1873
  return(1);
 
1874
}
 
1875
 
 
1876
 
 
1877
/**
 
1878
  @brief Try to reconnect slave IO thread.
 
1879
 
 
1880
  @details Terminates current connection to master, sleeps for
 
1881
  @c mi->connect_retry msecs and initiates new connection with
 
1882
  @c safe_reconnect(). Variable pointed by @c retry_count is increased -
 
1883
  if it exceeds @c master_retry_count then connection is not re-established
 
1884
  and function signals error.
 
1885
  Unless @c suppres_warnings is TRUE, a warning is put in the server error log
 
1886
  when reconnecting. The warning message and messages used to report errors
 
1887
  are taken from @c messages array. In case @c master_retry_count is exceeded,
 
1888
  no messages are added to the log.
 
1889
 
 
1890
  @param[in]     thd                 Thread context.
 
1891
  @param[in]     DRIZZLE               DRIZZLE connection.
 
1892
  @param[in]     mi                  Master connection information.
 
1893
  @param[in,out] retry_count         Number of attempts to reconnect.
 
1894
  @param[in]     suppress_warnings   TRUE when a normal net read timeout 
 
1895
                                     has caused to reconnecting.
 
1896
  @param[in]     messages            Messages to print/log, see 
 
1897
                                     reconnect_messages[] array.
 
1898
 
 
1899
  @retval        0                   OK.
 
1900
  @retval        1                   There was an error.
 
1901
*/
 
1902
 
 
1903
static int32_t try_to_reconnect(THD *thd, DRIZZLE *drizzle, Master_info *mi,
 
1904
                            uint32_t *retry_count, bool suppress_warnings,
 
1905
                            const char *messages[SLAVE_RECON_MSG_MAX])
 
1906
{
 
1907
  mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
 
1908
  thd->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
 
1909
#ifdef SIGNAL_WITH_VIO_CLOSE
 
1910
  thd->clear_active_vio();
 
1911
#endif
 
1912
  end_server(drizzle);
 
1913
  if ((*retry_count)++)
 
1914
  {
 
1915
    if (*retry_count > master_retry_count)
 
1916
      return 1;                             // Don't retry forever
 
1917
    safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
 
1918
               (void *) mi);
 
1919
  }
 
1920
  if (check_io_slave_killed(thd, mi,
 
1921
                            _(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
 
1922
    return 1;
 
1923
  thd->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
 
1924
  if (!suppress_warnings)
 
1925
  {
 
1926
    char buf[256], llbuff[22];
 
1927
    snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
 
1928
             IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
 
1929
    /*
 
1930
      Raise a warining during registering on master/requesting dump.
 
1931
      Log a message reading event.
 
1932
    */
 
1933
    if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
 
1934
    {
 
1935
      mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
 
1936
                 ER(ER_SLAVE_MASTER_COM_FAILURE),
 
1937
                 _(messages[SLAVE_RECON_MSG_COMMAND]), buf);
 
1938
    }
 
1939
    else
 
1940
    {
 
1941
      sql_print_information(buf);
 
1942
    }
 
1943
  }
 
1944
  if (safe_reconnect(thd, drizzle, mi, 1) || io_slave_killed(thd, mi))
 
1945
  {
 
1946
    if (global_system_variables.log_warnings)
 
1947
      sql_print_information(_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
 
1948
    return 1;
 
1949
  }
 
1950
  return 0;
 
1951
}
 
1952
 
 
1953
 
 
1954
/* Slave I/O Thread entry point */
 
1955
 
 
1956
pthread_handler_t handle_slave_io(void *arg)
 
1957
{
 
1958
  THD *thd; // needs to be first for thread_stack
 
1959
  DRIZZLE *drizzle;
 
1960
  Master_info *mi = (Master_info*)arg;
 
1961
  Relay_log_info *rli= &mi->rli;
 
1962
  char llbuff[22];
 
1963
  uint32_t retry_count;
 
1964
  bool suppress_warnings;
 
1965
  uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
 
1966
  my_thread_init();
 
1967
 
 
1968
  assert(mi->inited);
 
1969
  drizzle= NULL ;
 
1970
  retry_count= 0;
 
1971
 
 
1972
  pthread_mutex_lock(&mi->run_lock);
 
1973
  /* Inform waiting threads that slave has started */
 
1974
  mi->slave_run_id++;
 
1975
 
 
1976
  mi->events_till_disconnect = disconnect_slave_event_count;
 
1977
 
 
1978
  thd= new THD;
 
1979
  THD_CHECK_SENTRY(thd);
 
1980
  mi->io_thd = thd;
 
1981
 
 
1982
  pthread_detach_this_thread();
 
1983
  thd->thread_stack= (char*) &thd; // remember where our stack is
 
1984
  if (init_slave_thread(thd, SLAVE_THD_IO))
 
1985
  {
 
1986
    pthread_cond_broadcast(&mi->start_cond);
 
1987
    pthread_mutex_unlock(&mi->run_lock);
 
1988
    sql_print_error(_("Failed during slave I/O thread initialization"));
 
1989
    goto err;
 
1990
  }
 
1991
  pthread_mutex_lock(&LOCK_thread_count);
 
1992
  threads.append(thd);
 
1993
  pthread_mutex_unlock(&LOCK_thread_count);
 
1994
  mi->slave_running = 1;
 
1995
  mi->abort_slave = 0;
 
1996
  pthread_mutex_unlock(&mi->run_lock);
 
1997
  pthread_cond_broadcast(&mi->start_cond);
 
1998
 
 
1999
  if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
 
2000
  {
 
2001
    mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
 
2002
               ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
 
2003
    goto err;
 
2004
  }
 
2005
 
 
2006
  thd->set_proc_info("Connecting to master");
 
2007
  // we can get killed during safe_connect
 
2008
  if (!safe_connect(thd, drizzle, mi))
 
2009
  {
 
2010
    sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
 
2011
                            "replication started in log '%s' at position %s"),
 
2012
                          mi->user, mi->host, mi->port,
 
2013
                          IO_RPL_LOG_NAME,
 
2014
                          llstr(mi->master_log_pos,llbuff));
 
2015
  /*
 
2016
    Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
 
2017
    thread, since a replication event can become this much larger than
 
2018
    the corresponding packet (query) sent from client to master.
 
2019
  */
 
2020
    drizzle->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
 
2021
  }
 
2022
  else
 
2023
  {
 
2024
    sql_print_information(_("Slave I/O thread killed while connecting to master"));
 
2025
    goto err;
 
2026
  }
 
2027
 
 
2028
connected:
 
2029
 
 
2030
  // TODO: the assignment below should be under mutex (5.0)
 
2031
  mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
 
2032
  thd->slave_net = &drizzle->net;
 
2033
  thd->set_proc_info("Checking master version");
 
2034
  if (get_master_version_and_clock(drizzle, mi))
 
2035
    goto err;
 
2036
  
 
2037
  if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
 
2038
  {
 
2039
    /*
 
2040
      Register ourselves with the master.
 
2041
    */
 
2042
    thd->set_proc_info("Registering slave on master");
 
2043
    if (register_slave_on_master(drizzle, mi, &suppress_warnings))
 
2044
    {
 
2045
      if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
 
2046
                                 "while registering slave on master"))
 
2047
      {
 
2048
        sql_print_error(_("Slave I/O thread couldn't register on master"));
 
2049
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2050
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
 
2051
          goto err;
 
2052
      }
 
2053
      else
 
2054
        goto err;
 
2055
      goto connected;
 
2056
    }
 
2057
    if (!retry_count_reg)
 
2058
    {
 
2059
      retry_count_reg++;
 
2060
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2061
      if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2062
                           reconnect_messages[SLAVE_RECON_ACT_REG]))
 
2063
        goto err;
 
2064
      goto connected;
 
2065
    }
 
2066
  }
 
2067
 
 
2068
  while (!io_slave_killed(thd,mi))
 
2069
  {
 
2070
    thd->set_proc_info("Requesting binlog dump");
 
2071
    if (request_dump(drizzle, mi, &suppress_warnings))
 
2072
    {
 
2073
      sql_print_error(_("Failed on request_dump()"));
 
2074
      if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while \
 
2075
requesting master dump")) ||
 
2076
          try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2077
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
 
2078
        goto err;
 
2079
      goto connected;
 
2080
    }
 
2081
    if (!retry_count_dump)
 
2082
    {
 
2083
      retry_count_dump++;
 
2084
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2085
      if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2086
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
 
2087
        goto err;
 
2088
      goto connected;
 
2089
    }
 
2090
 
 
2091
    while (!io_slave_killed(thd,mi))
 
2092
    {
 
2093
      uint32_t event_len;
 
2094
      /*
 
2095
        We say "waiting" because read_event() will wait if there's nothing to
 
2096
        read. But if there's something to read, it will not wait. The
 
2097
        important thing is to not confuse users by saying "reading" whereas
 
2098
        we're in fact receiving nothing.
 
2099
      */
 
2100
      thd->set_proc_info(_("Waiting for master to send event"));
 
2101
      event_len= read_event(drizzle, mi, &suppress_warnings);
 
2102
      if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while "
 
2103
                                           "reading event")))
 
2104
        goto err;
 
2105
      if (!retry_count_event)
 
2106
      {
 
2107
        retry_count_event++;
 
2108
        sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2109
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2110
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
 
2111
          goto err;
 
2112
        goto connected;
 
2113
      }
 
2114
 
 
2115
      if (event_len == packet_error)
 
2116
      {
 
2117
        uint32_t drizzle_error_number= drizzle_errno(drizzle);
 
2118
        switch (drizzle_error_number) {
 
2119
        case CR_NET_PACKET_TOO_LARGE:
 
2120
          sql_print_error(_("Log entry on master is longer than "
 
2121
                            "max_allowed_packet (%ld) on "
 
2122
                            "slave. If the entry is correct, restart the "
 
2123
                            "server with a higher value of "
 
2124
                            "max_allowed_packet"),
 
2125
                          thd->variables.max_allowed_packet);
 
2126
          goto err;
 
2127
        case ER_MASTER_FATAL_ERROR_READING_BINLOG:
 
2128
          sql_print_error(ER(drizzle_error_number), drizzle_error_number,
 
2129
                          drizzle_error(drizzle));
 
2130
          goto err;
 
2131
        case EE_OUTOFMEMORY:
 
2132
        case ER_OUTOFMEMORY:
 
2133
          sql_print_error(
 
2134
       _("Stopping slave I/O thread due to out-of-memory error from master"));
 
2135
          goto err;
 
2136
        }
 
2137
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2138
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
 
2139
          goto err;
 
2140
        goto connected;
 
2141
      } // if (event_len == packet_error)
 
2142
 
 
2143
      retry_count=0;                    // ok event, reset retry counter
 
2144
      thd->set_proc_info(_("Queueing master event to the relay log"));
 
2145
      if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
 
2146
      {
 
2147
        goto err;
 
2148
      }
 
2149
      if (flush_master_info(mi, 1))
 
2150
      {
 
2151
        sql_print_error(_("Failed to flush master info file"));
 
2152
        goto err;
 
2153
      }
 
2154
      /*
 
2155
        See if the relay logs take too much space.
 
2156
        We don't lock mi->rli.log_space_lock here; this dirty read saves time
 
2157
        and does not introduce any problem:
 
2158
        - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
 
2159
        the clean value is 0), then we are reading only one more event as we
 
2160
        should, and we'll block only at the next event. No big deal.
 
2161
        - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
 
2162
        the clean value is 1), then we are going into wait_for_relay_log_space()
 
2163
        for no reason, but this function will do a clean read, notice the clean
 
2164
        value and exit immediately.
 
2165
      */
 
2166
      if (rli->log_space_limit && rli->log_space_limit <
 
2167
          rli->log_space_total &&
 
2168
          !rli->ignore_log_space_limit)
 
2169
        if (wait_for_relay_log_space(rli))
 
2170
        {
 
2171
          sql_print_error(_("Slave I/O thread aborted while waiting for "
 
2172
                            "relay log space"));
 
2173
          goto err;
 
2174
        }
 
2175
    }
 
2176
  }
 
2177
 
 
2178
// error = 0;
 
2179
err:
 
2180
// print the current replication position
 
2181
  sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
 
2182
                          "position %s"),
 
2183
                        IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
 
2184
  VOID(pthread_mutex_lock(&LOCK_thread_count));
 
2185
  thd->query = thd->db = 0; // extra safety
 
2186
  thd->query_length= thd->db_length= 0;
 
2187
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
 
2188
  if (drizzle)
 
2189
  {
 
2190
    /*
 
2191
      Here we need to clear the active VIO before closing the
 
2192
      connection with the master.  The reason is that THD::awake()
 
2193
      might be called from terminate_slave_thread() because somebody
 
2194
      issued a STOP SLAVE.  If that happends, the close_active_vio()
 
2195
      can be called in the middle of closing the VIO associated with
 
2196
      the 'mysql' object, causing a crash.
 
2197
    */
 
2198
#ifdef SIGNAL_WITH_VIO_CLOSE
 
2199
    thd->clear_active_vio();
 
2200
#endif
 
2201
    drizzle_close(drizzle);
 
2202
    mi->drizzle=0;
 
2203
  }
 
2204
  write_ignored_events_info_to_relay_log(thd, mi);
 
2205
  thd->set_proc_info(_("Waiting for slave mutex on exit"));
 
2206
  pthread_mutex_lock(&mi->run_lock);
 
2207
 
 
2208
  /* Forget the relay log's format */
 
2209
  delete mi->rli.relay_log.description_event_for_queue;
 
2210
  mi->rli.relay_log.description_event_for_queue= 0;
 
2211
  // TODO: make rpl_status part of Master_info
 
2212
  change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
 
2213
  assert(thd->net.buff != 0);
 
2214
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
 
2215
  close_thread_tables(thd);
 
2216
  pthread_mutex_lock(&LOCK_thread_count);
 
2217
  THD_CHECK_SENTRY(thd);
 
2218
  delete thd;
 
2219
  pthread_mutex_unlock(&LOCK_thread_count);
 
2220
  mi->abort_slave= 0;
 
2221
  mi->slave_running= 0;
 
2222
  mi->io_thd= 0;
 
2223
  /*
 
2224
    Note: the order of the two following calls (first broadcast, then unlock)
 
2225
    is important. Otherwise a killer_thread can execute between the calls and
 
2226
    delete the mi structure leading to a crash! (see BUG#25306 for details)
 
2227
   */ 
 
2228
  pthread_cond_broadcast(&mi->stop_cond);       // tell the world we are done
 
2229
  pthread_mutex_unlock(&mi->run_lock);
 
2230
  my_thread_end();
 
2231
  pthread_exit(0);
 
2232
  return(0);                               // Can't return anything here
 
2233
}
 
2234
 
 
2235
 
 
2236
/* Slave SQL Thread entry point */
 
2237
 
 
2238
pthread_handler_t handle_slave_sql(void *arg)
 
2239
{
 
2240
  THD *thd;                     /* needs to be first for thread_stack */
 
2241
  char llbuff[22],llbuff1[22];
 
2242
 
 
2243
  Relay_log_info* rli = &((Master_info*)arg)->rli;
 
2244
  const char *errmsg;
 
2245
 
 
2246
  my_thread_init();
 
2247
 
 
2248
  assert(rli->inited);
 
2249
  pthread_mutex_lock(&rli->run_lock);
 
2250
  assert(!rli->slave_running);
 
2251
  errmsg= 0;
 
2252
  rli->events_till_abort = abort_slave_event_count;
 
2253
 
 
2254
  thd = new THD;
 
2255
  thd->thread_stack = (char*)&thd; // remember where our stack is
 
2256
  rli->sql_thd= thd;
 
2257
  
 
2258
  /* Inform waiting threads that slave has started */
 
2259
  rli->slave_run_id++;
 
2260
  rli->slave_running = 1;
 
2261
 
 
2262
  pthread_detach_this_thread();
 
2263
  if (init_slave_thread(thd, SLAVE_THD_SQL))
 
2264
  {
 
2265
    /*
 
2266
      TODO: this is currently broken - slave start and change master
 
2267
      will be stuck if we fail here
 
2268
    */
 
2269
    pthread_cond_broadcast(&rli->start_cond);
 
2270
    pthread_mutex_unlock(&rli->run_lock);
 
2271
    sql_print_error(_("Failed during slave thread initialization"));
 
2272
    goto err;
 
2273
  }
 
2274
  thd->init_for_queries();
 
2275
  thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
 
2276
  pthread_mutex_lock(&LOCK_thread_count);
 
2277
  threads.append(thd);
 
2278
  pthread_mutex_unlock(&LOCK_thread_count);
 
2279
  /*
 
2280
    We are going to set slave_running to 1. Assuming slave I/O thread is
 
2281
    alive and connected, this is going to make Seconds_Behind_Master be 0
 
2282
    i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
 
2283
    the moment we start we can think we are caught up, and the next second we
 
2284
    start receiving data so we realize we are not caught up and
 
2285
    Seconds_Behind_Master grows. No big deal.
 
2286
  */
 
2287
  rli->abort_slave = 0;
 
2288
  pthread_mutex_unlock(&rli->run_lock);
 
2289
  pthread_cond_broadcast(&rli->start_cond);
 
2290
 
 
2291
  /*
 
2292
    Reset errors for a clean start (otherwise, if the master is idle, the SQL
 
2293
    thread may execute no Query_log_event, so the error will remain even
 
2294
    though there's no problem anymore). Do not reset the master timestamp
 
2295
    (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
 
2296
    as we are not sure that we are going to receive a query, we want to
 
2297
    remember the last master timestamp (to say how many seconds behind we are
 
2298
    now.
 
2299
    But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
 
2300
  */
 
2301
  rli->clear_error();
 
2302
 
 
2303
  //tell the I/O thread to take relay_log_space_limit into account from now on
 
2304
  pthread_mutex_lock(&rli->log_space_lock);
 
2305
  rli->ignore_log_space_limit= 0;
 
2306
  pthread_mutex_unlock(&rli->log_space_lock);
 
2307
  rli->trans_retries= 0; // start from "no error"
 
2308
 
 
2309
  if (init_relay_log_pos(rli,
 
2310
                         rli->group_relay_log_name,
 
2311
                         rli->group_relay_log_pos,
 
2312
                         1 /*need data lock*/, &errmsg,
 
2313
                         1 /*look for a description_event*/))
 
2314
  {
 
2315
    sql_print_error(_("Error initializing relay log position: %s"),
 
2316
                    errmsg);
 
2317
    goto err;
 
2318
  }
 
2319
  THD_CHECK_SENTRY(thd);
 
2320
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
 
2321
  /*
 
2322
    Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
 
2323
    correct position when it's called just after my_b_seek() (the questionable
 
2324
    stuff is those "seek is done on next read" comments in the my_b_seek()
 
2325
    source code).
 
2326
    The crude reality is that this assertion randomly fails whereas
 
2327
    replication seems to work fine. And there is no easy explanation why it
 
2328
    fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
 
2329
    init_relay_log_pos() called above). Maybe the assertion would be
 
2330
    meaningful if we held rli->data_lock between the my_b_seek() and the
 
2331
    assert().
 
2332
  */
 
2333
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
 
2334
  assert(rli->sql_thd == thd);
 
2335
 
 
2336
  if (global_system_variables.log_warnings)
 
2337
    sql_print_information(_("Slave SQL thread initialized, "
 
2338
                            "starting replication in log '%s' at "
 
2339
                            "position %s, relay log '%s' position: %s"),
 
2340
                            RPL_LOG_NAME,
 
2341
                          llstr(rli->group_master_log_pos,llbuff),
 
2342
                          rli->group_relay_log_name,
 
2343
                          llstr(rli->group_relay_log_pos,llbuff1));
 
2344
 
 
2345
  /* execute init_slave variable */
 
2346
  if (sys_init_slave.value_length)
 
2347
  {
 
2348
    execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
 
2349
    if (thd->is_slave_error)
 
2350
    {
 
2351
      sql_print_error(_("Slave SQL thread aborted. "
 
2352
                        "Can't execute init_slave query"));
 
2353
      goto err;
 
2354
    }
 
2355
  }
 
2356
 
 
2357
  /*
 
2358
    First check until condition - probably there is nothing to execute. We
 
2359
    do not want to wait for next event in this case.
 
2360
  */
 
2361
  pthread_mutex_lock(&rli->data_lock);
 
2362
  if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
 
2363
      rli->is_until_satisfied(rli->group_master_log_pos))
 
2364
  {
 
2365
    char buf[22];
 
2366
    sql_print_information(_("Slave SQL thread stopped because it reached its"
 
2367
                            " UNTIL position %s"), llstr(rli->until_pos(), buf));
 
2368
    pthread_mutex_unlock(&rli->data_lock);
 
2369
    goto err;
 
2370
  }
 
2371
  pthread_mutex_unlock(&rli->data_lock);
 
2372
 
 
2373
  /* Read queries from the IO/THREAD until this thread is killed */
 
2374
 
 
2375
  while (!sql_slave_killed(thd,rli))
 
2376
  {
 
2377
    thd->set_proc_info(_("Reading event from the relay log"));
 
2378
    assert(rli->sql_thd == thd);
 
2379
    THD_CHECK_SENTRY(thd);
 
2380
    if (exec_relay_log_event(thd,rli))
 
2381
    {
 
2382
      // do not scare the user if SQL thread was simply killed or stopped
 
2383
      if (!sql_slave_killed(thd,rli))
 
2384
      {
 
2385
        /*
 
2386
          retrieve as much info as possible from the thd and, error
 
2387
          codes and warnings and print this to the error log as to
 
2388
          allow the user to locate the error
 
2389
        */
 
2390
        uint32_t const last_errno= rli->last_error().number;
 
2391
 
 
2392
        if (thd->is_error())
 
2393
        {
 
2394
          char const *const errmsg= thd->main_da.message();
 
2395
 
 
2396
          if (last_errno == 0)
 
2397
          {
 
2398
            rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), errmsg);
 
2399
          }
 
2400
          else if (last_errno != thd->main_da.sql_errno())
 
2401
          {
 
2402
            sql_print_error(_("Slave (additional info): %s Error_code: %d"),
 
2403
                            errmsg, thd->main_da.sql_errno());
 
2404
          }
 
2405
        }
 
2406
 
 
2407
        /* Print any warnings issued */
 
2408
        List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
 
2409
        DRIZZLE_ERROR *err;
 
2410
        /*
 
2411
          Added controlled slave thread cancel for replication
 
2412
          of user-defined variables.
 
2413
        */
 
2414
        bool udf_error = false;
 
2415
        while ((err= it++))
 
2416
        {
 
2417
          if (err->code == ER_CANT_OPEN_LIBRARY)
 
2418
            udf_error = true;
 
2419
          sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
 
2420
        }
 
2421
        if (udf_error)
 
2422
          sql_print_error(_("Error loading user-defined library, slave SQL "
 
2423
                            "thread aborted. Install the missing library, "
 
2424
                            "and restart the slave SQL thread with "
 
2425
                            "\"SLAVE START\". We stopped at log '%s' "
 
2426
                            "position %s"),
 
2427
                          RPL_LOG_NAME, llstr(rli->group_master_log_pos,
 
2428
            llbuff));
 
2429
        else
 
2430
          sql_print_error(_("Error running query, slave SQL thread aborted. "
 
2431
                            "Fix the problem, and restart "
 
2432
                            "the slave SQL thread with \"SLAVE START\". "
 
2433
                            "We stopped at log '%s' position %s"),
 
2434
                          RPL_LOG_NAME,
 
2435
                          llstr(rli->group_master_log_pos, llbuff));
 
2436
      }
 
2437
      goto err;
 
2438
    }
 
2439
  }
 
2440
 
 
2441
  /* Thread stopped. Print the current replication position to the log */
 
2442
  sql_print_information(_("Slave SQL thread exiting, replication stopped in "
 
2443
                          "log '%s' at position %s"),
 
2444
                        RPL_LOG_NAME,
 
2445
                        llstr(rli->group_master_log_pos,llbuff));
 
2446
 
 
2447
 err:
 
2448
 
 
2449
  /*
 
2450
    Some events set some playgrounds, which won't be cleared because thread
 
2451
    stops. Stopping of this thread may not be known to these events ("stop"
 
2452
    request is detected only by the present function, not by events), so we
 
2453
    must "proactively" clear playgrounds:
 
2454
  */
 
2455
  rli->cleanup_context(thd, 1);
 
2456
  VOID(pthread_mutex_lock(&LOCK_thread_count));
 
2457
  /*
 
2458
    Some extra safety, which should not been needed (normally, event deletion
 
2459
    should already have done these assignments (each event which sets these
 
2460
    variables is supposed to set them to 0 before terminating)).
 
2461
  */
 
2462
  thd->query= thd->db= thd->catalog= 0;
 
2463
  thd->query_length= thd->db_length= 0;
 
2464
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
 
2465
  thd->set_proc_info("Waiting for slave mutex on exit");
 
2466
  pthread_mutex_lock(&rli->run_lock);
 
2467
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
 
2468
  pthread_mutex_lock(&rli->data_lock);
 
2469
  assert(rli->slave_running == 1); // tracking buffer overrun
 
2470
  /* When master_pos_wait() wakes up it will check this and terminate */
 
2471
  rli->slave_running= 0;
 
2472
  /* Forget the relay log's format */
 
2473
  delete rli->relay_log.description_event_for_exec;
 
2474
  rli->relay_log.description_event_for_exec= 0;
 
2475
  /* Wake up master_pos_wait() */
 
2476
  pthread_mutex_unlock(&rli->data_lock);
 
2477
  pthread_cond_broadcast(&rli->data_cond);
 
2478
  rli->ignore_log_space_limit= 0; /* don't need any lock */
 
2479
  /* we die so won't remember charset - re-update them on next thread start */
 
2480
  rli->cached_charset_invalidate();
 
2481
  rli->save_temporary_tables = thd->temporary_tables;
 
2482
 
 
2483
  /*
 
2484
    TODO: see if we can do this conditionally in next_event() instead
 
2485
    to avoid unneeded position re-init
 
2486
  */
 
2487
  thd->temporary_tables = 0; // remove tempation from destructor to close them
 
2488
  assert(thd->net.buff != 0);
 
2489
  net_end(&thd->net); // destructor will not free it, because we are weird
 
2490
  assert(rli->sql_thd == thd);
 
2491
  THD_CHECK_SENTRY(thd);
 
2492
  rli->sql_thd= 0;
 
2493
  pthread_mutex_lock(&LOCK_thread_count);
 
2494
  THD_CHECK_SENTRY(thd);
 
2495
  delete thd;
 
2496
  pthread_mutex_unlock(&LOCK_thread_count);
 
2497
 /*
 
2498
  Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
 
2499
  is important. Otherwise a killer_thread can execute between the calls and
 
2500
  delete the mi structure leading to a crash! (see BUG#25306 for details)
 
2501
 */ 
 
2502
  pthread_cond_broadcast(&rli->stop_cond);
 
2503
  pthread_mutex_unlock(&rli->run_lock);  // tell the world we are done
 
2504
  
 
2505
  my_thread_end();
 
2506
  pthread_exit(0);
 
2507
  return(0);                               // Can't return anything here
 
2508
}
 
2509
 
 
2510
 
 
2511
/*
 
2512
  process_io_create_file()
 
2513
*/
 
2514
 
 
2515
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
 
2516
{
 
2517
  int32_t error = 1;
 
2518
  uint32_t num_bytes;
 
2519
  bool cev_not_written;
 
2520
  THD *thd = mi->io_thd;
 
2521
  NET *net = &mi->drizzle->net;
 
2522
 
 
2523
  if (unlikely(!cev->is_valid()))
 
2524
    return(1);
 
2525
 
 
2526
  if (!rpl_filter->db_ok(cev->db))
 
2527
  {
 
2528
    skip_load_data_infile(net);
 
2529
    return(0);
 
2530
  }
 
2531
  assert(cev->inited_from_old);
 
2532
  thd->file_id = cev->file_id = mi->file_id++;
 
2533
  thd->server_id = cev->server_id;
 
2534
  cev_not_written = 1;
 
2535
 
 
2536
  if (unlikely(net_request_file(net,cev->fname)))
 
2537
  {
 
2538
    sql_print_error(_("Slave I/O: failed requesting download of '%s'"),
 
2539
                    cev->fname);
 
2540
    goto err;
 
2541
  }
 
2542
 
 
2543
  /*
 
2544
    This dummy block is so we could instantiate Append_block_log_event
 
2545
    once and then modify it slightly instead of doing it multiple times
 
2546
    in the loop
 
2547
  */
 
2548
  {
 
2549
    Append_block_log_event aev(thd,0,0,0,0);
 
2550
 
 
2551
    for (;;)
 
2552
    {
 
2553
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
 
2554
      {
 
2555
        sql_print_error(_("Network read error downloading '%s' from master"),
 
2556
                        cev->fname);
 
2557
        goto err;
 
2558
      }
 
2559
      if (unlikely(!num_bytes)) /* eof */
 
2560
      {
 
2561
        /* 3.23 master wants it */
 
2562
        net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0);
 
2563
        /*
 
2564
          If we wrote Create_file_log_event, then we need to write
 
2565
          Execute_load_log_event. If we did not write Create_file_log_event,
 
2566
          then this is an empty file and we can just do as if the LOAD DATA
 
2567
          INFILE had not existed, i.e. write nothing.
 
2568
        */
 
2569
        if (unlikely(cev_not_written))
 
2570
          break;
 
2571
        Execute_load_log_event xev(thd,0,0);
 
2572
        xev.log_pos = cev->log_pos;
 
2573
        if (unlikely(mi->rli.relay_log.append(&xev)))
 
2574
        {
 
2575
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2576
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2577
                     _("error writing Exec_load event to relay log"));
 
2578
          goto err;
 
2579
        }
 
2580
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
 
2581
        break;
 
2582
      }
 
2583
      if (unlikely(cev_not_written))
 
2584
      {
 
2585
        cev->block = net->read_pos;
 
2586
        cev->block_len = num_bytes;
 
2587
        if (unlikely(mi->rli.relay_log.append(cev)))
 
2588
        {
 
2589
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2590
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2591
                     _("error writing Create_file event to relay log"));
 
2592
          goto err;
 
2593
        }
 
2594
        cev_not_written=0;
 
2595
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
 
2596
      }
 
2597
      else
 
2598
      {
 
2599
        aev.block = net->read_pos;
 
2600
        aev.block_len = num_bytes;
 
2601
        aev.log_pos = cev->log_pos;
 
2602
        if (unlikely(mi->rli.relay_log.append(&aev)))
 
2603
        {
 
2604
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2605
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2606
                     _("error writing Append_block event to relay log"));
 
2607
          goto err;
 
2608
        }
 
2609
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
 
2610
      }
 
2611
    }
 
2612
  }
 
2613
  error=0;
 
2614
err:
 
2615
  return(error);
 
2616
}
 
2617
 
 
2618
 
 
2619
/*
 
2620
  Start using a new binary log on the master
 
2621
 
 
2622
  SYNOPSIS
 
2623
    process_io_rotate()
 
2624
    mi                  master_info for the slave
 
2625
    rev                 The rotate log event read from the binary log
 
2626
 
 
2627
  DESCRIPTION
 
2628
    Updates the master info with the place in the next binary
 
2629
    log where we should start reading.
 
2630
    Rotate the relay log to avoid mixed-format relay logs.
 
2631
 
 
2632
  NOTES
 
2633
    We assume we already locked mi->data_lock
 
2634
 
 
2635
  RETURN VALUES
 
2636
    0           ok
 
2637
    1           Log event is illegal
 
2638
 
 
2639
*/
 
2640
 
 
2641
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
 
2642
{
 
2643
  safe_mutex_assert_owner(&mi->data_lock);
 
2644
 
 
2645
  if (unlikely(!rev->is_valid()))
 
2646
    return(1);
 
2647
 
 
2648
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
 
2649
  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
 
2650
  mi->master_log_pos= rev->pos;
 
2651
  /*
 
2652
    If we do not do this, we will be getting the first
 
2653
    rotate event forever, so we need to not disconnect after one.
 
2654
  */
 
2655
  if (disconnect_slave_event_count)
 
2656
    mi->events_till_disconnect++;
 
2657
 
 
2658
  /*
 
2659
    If description_event_for_queue is format <4, there is conversion in the
 
2660
    relay log to the slave's format (4). And Rotate can mean upgrade or
 
2661
    nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
 
2662
    no need to reset description_event_for_queue now. And if it's nothing (same
 
2663
    master version as before), no need (still using the slave's format).
 
2664
  */
 
2665
  if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
 
2666
  {
 
2667
    delete mi->rli.relay_log.description_event_for_queue;
 
2668
    /* start from format 3 (DRIZZLE 4.0) again */
 
2669
    mi->rli.relay_log.description_event_for_queue= new
 
2670
      Format_description_log_event(3);
 
2671
  }
 
2672
  /*
 
2673
    Rotate the relay log makes binlog format detection easier (at next slave
 
2674
    start or mysqlbinlog)
 
2675
  */
 
2676
  rotate_relay_log(mi); /* will take the right mutexes */
 
2677
  return(0);
 
2678
}
 
2679
 
 
2680
/*
 
2681
  Reads a 3.23 event and converts it to the slave's format. This code was
 
2682
  copied from DRIZZLE 4.0.
 
2683
*/
 
2684
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
 
2685
                           uint32_t event_len)
 
2686
{
 
2687
  const char *errmsg = 0;
 
2688
  uint32_t inc_pos;
 
2689
  bool ignore_event= 0;
 
2690
  char *tmp_buf = 0;
 
2691
  Relay_log_info *rli= &mi->rli;
 
2692
 
 
2693
  /*
 
2694
    If we get Load event, we need to pass a non-reusable buffer
 
2695
    to read_log_event, so we do a trick
 
2696
  */
 
2697
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
 
2698
  {
 
2699
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
 
2700
    {
 
2701
      mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
 
2702
                 ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
 
2703
      return(1);
 
2704
    }
 
2705
    memcpy(tmp_buf,buf,event_len);
 
2706
    /*
 
2707
      Create_file constructor wants a 0 as last char of buffer, this 0 will
 
2708
      serve as the string-termination char for the file's name (which is at the
 
2709
      end of the buffer)
 
2710
      We must increment event_len, otherwise the event constructor will not see
 
2711
      this end 0, which leads to segfault.
 
2712
    */
 
2713
    tmp_buf[event_len++]=0;
 
2714
    int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
 
2715
    buf = (const char*)tmp_buf;
 
2716
  }
 
2717
  /*
 
2718
    This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
 
2719
    send the loaded file, and write it to the relay log in the form of
 
2720
    Append_block/Exec_load (the SQL thread needs the data, as that thread is not
 
2721
    connected to the master).
 
2722
  */
 
2723
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
 
2724
                                            mi->rli.relay_log.description_event_for_queue);
 
2725
  if (unlikely(!ev))
 
2726
  {
 
2727
    sql_print_error(_("Read invalid event from master: '%s', "
 
2728
                      "master could be corrupt but a more likely cause "
 
2729
                      "of this is a bug"),
 
2730
                    errmsg);
 
2731
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
 
2732
    return(1);
 
2733
  }
 
2734
 
 
2735
  pthread_mutex_lock(&mi->data_lock);
 
2736
  ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
 
2737
  switch (ev->get_type_code()) {
 
2738
  case STOP_EVENT:
 
2739
    ignore_event= 1;
 
2740
    inc_pos= event_len;
 
2741
    break;
 
2742
  case ROTATE_EVENT:
 
2743
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
 
2744
    {
 
2745
      delete ev;
 
2746
      pthread_mutex_unlock(&mi->data_lock);
 
2747
      return(1);
 
2748
    }
 
2749
    inc_pos= 0;
 
2750
    break;
 
2751
  case CREATE_FILE_EVENT:
 
2752
    /*
 
2753
      Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
 
2754
      queue_old_event() which is for 3.23 events which don't comprise
 
2755
      CREATE_FILE_EVENT. This is because read_log_event() above has just
 
2756
      transformed LOAD_EVENT into CREATE_FILE_EVENT.
 
2757
    */
 
2758
  {
 
2759
    /* We come here when and only when tmp_buf != 0 */
 
2760
    assert(tmp_buf != 0);
 
2761
    inc_pos=event_len;
 
2762
    ev->log_pos+= inc_pos;
 
2763
    int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
 
2764
    delete ev;
 
2765
    mi->master_log_pos += inc_pos;
 
2766
    pthread_mutex_unlock(&mi->data_lock);
 
2767
    my_free((char*)tmp_buf, MYF(0));
 
2768
    return(error);
 
2769
  }
 
2770
  default:
 
2771
    inc_pos= event_len;
 
2772
    break;
 
2773
  }
 
2774
  if (likely(!ignore_event))
 
2775
  {
 
2776
    if (ev->log_pos)
 
2777
      /*
 
2778
         Don't do it for fake Rotate events (see comment in
 
2779
      Log_event::Log_event(const char* buf...) in log_event.cc).
 
2780
      */
 
2781
      ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
 
2782
    if (unlikely(rli->relay_log.append(ev)))
 
2783
    {
 
2784
      delete ev;
 
2785
      pthread_mutex_unlock(&mi->data_lock);
 
2786
      return(1);
 
2787
    }
 
2788
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
2789
  }
 
2790
  delete ev;
 
2791
  mi->master_log_pos+= inc_pos;
 
2792
  pthread_mutex_unlock(&mi->data_lock);
 
2793
  return(0);
 
2794
}
 
2795
 
 
2796
/*
 
2797
  Reads a 4.0 event and converts it to the slave's format. This code was copied
 
2798
  from queue_binlog_ver_1_event(), with some affordable simplifications.
 
2799
*/
 
2800
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
 
2801
                           uint32_t event_len)
 
2802
{
 
2803
  const char *errmsg = 0;
 
2804
  uint32_t inc_pos;
 
2805
  char *tmp_buf = 0;
 
2806
  Relay_log_info *rli= &mi->rli;
 
2807
 
 
2808
  /* read_log_event() will adjust log_pos to be end_log_pos */
 
2809
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
 
2810
                                            mi->rli.relay_log.description_event_for_queue);
 
2811
  if (unlikely(!ev))
 
2812
  {
 
2813
    sql_print_error(_("Read invalid event from master: '%s', "
 
2814
                      "master could be corrupt but a more likely cause of "
 
2815
                      "this is a bug"),
 
2816
                    errmsg);
 
2817
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
 
2818
    return(1);
 
2819
  }
 
2820
  pthread_mutex_lock(&mi->data_lock);
 
2821
  switch (ev->get_type_code()) {
 
2822
  case STOP_EVENT:
 
2823
    goto err;
 
2824
  case ROTATE_EVENT:
 
2825
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
 
2826
    {
 
2827
      delete ev;
 
2828
      pthread_mutex_unlock(&mi->data_lock);
 
2829
      return(1);
 
2830
    }
 
2831
    inc_pos= 0;
 
2832
    break;
 
2833
  default:
 
2834
    inc_pos= event_len;
 
2835
    break;
 
2836
  }
 
2837
  if (unlikely(rli->relay_log.append(ev)))
 
2838
  {
 
2839
    delete ev;
 
2840
    pthread_mutex_unlock(&mi->data_lock);
 
2841
    return(1);
 
2842
  }
 
2843
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
2844
  delete ev;
 
2845
  mi->master_log_pos+= inc_pos;
 
2846
err:
 
2847
  pthread_mutex_unlock(&mi->data_lock);
 
2848
  return(0);
 
2849
}
 
2850
 
 
2851
/*
 
2852
  queue_old_event()
 
2853
 
 
2854
  Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
 
2855
  (exactly, slave's) format. To do the conversion, we create a 5.0 event from
 
2856
  the 3.23/4.0 bytes, then write this event to the relay log.
 
2857
 
 
2858
  TODO:
 
2859
    Test this code before release - it has to be tested on a separate
 
2860
    setup with 3.23 master or 4.0 master
 
2861
*/
 
2862
 
 
2863
static int32_t queue_old_event(Master_info *mi, const char *buf,
 
2864
                           uint32_t event_len)
 
2865
{
 
2866
  switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
 
2867
  {
 
2868
  case 1:
 
2869
      return(queue_binlog_ver_1_event(mi,buf,event_len));
 
2870
  case 3:
 
2871
      return(queue_binlog_ver_3_event(mi,buf,event_len));
 
2872
  default: /* unsupported format; eg version 2 */
 
2873
    return(1);
 
2874
  }
 
2875
}
 
2876
 
 
2877
/*
 
2878
  queue_event()
 
2879
 
 
2880
  If the event is 3.23/4.0, passes it to queue_old_event() which will convert
 
2881
  it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
 
2882
  no format conversion, it's pure read/write of bytes.
 
2883
  So a 5.0.0 slave's relay log can contain events in the slave's format or in
 
2884
  any >=5.0.0 format.
 
2885
*/
 
2886
 
 
2887
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
 
2888
{
 
2889
  int32_t error= 0;
 
2890
  String error_msg;
 
2891
  uint32_t inc_pos= 0;
 
2892
  Relay_log_info *rli= &mi->rli;
 
2893
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
 
2894
 
 
2895
 
 
2896
  if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
 
2897
      buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
 
2898
    return(queue_old_event(mi,buf,event_len));
 
2899
 
 
2900
  pthread_mutex_lock(&mi->data_lock);
 
2901
 
 
2902
  switch (buf[EVENT_TYPE_OFFSET]) {
 
2903
  case STOP_EVENT:
 
2904
    /*
 
2905
      We needn't write this event to the relay log. Indeed, it just indicates a
 
2906
      master server shutdown. The only thing this does is cleaning. But
 
2907
      cleaning is already done on a per-master-thread basis (as the master
 
2908
      server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
 
2909
      prepared statements' deletion are TODO only when we binlog prep stmts).
 
2910
 
 
2911
      We don't even increment mi->master_log_pos, because we may be just after
 
2912
      a Rotate event. Btw, in a few milliseconds we are going to have a Start
 
2913
      event from the next binlog (unless the master is presently running
 
2914
      without --log-bin).
 
2915
    */
 
2916
    goto err;
 
2917
  case ROTATE_EVENT:
 
2918
  {
 
2919
    Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
 
2920
    if (unlikely(process_io_rotate(mi,&rev)))
 
2921
    {
 
2922
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
2923
      goto err;
 
2924
    }
 
2925
    /*
 
2926
      Now the I/O thread has just changed its mi->master_log_name, so
 
2927
      incrementing mi->master_log_pos is nonsense.
 
2928
    */
 
2929
    inc_pos= 0;
 
2930
    break;
 
2931
  }
 
2932
  case FORMAT_DESCRIPTION_EVENT:
 
2933
  {
 
2934
    /*
 
2935
      Create an event, and save it (when we rotate the relay log, we will have
 
2936
      to write this event again).
 
2937
    */
 
2938
    /*
 
2939
      We are the only thread which reads/writes description_event_for_queue.
 
2940
      The relay_log struct does not move (though some members of it can
 
2941
      change), so we needn't any lock (no rli->data_lock, no log lock).
 
2942
    */
 
2943
    Format_description_log_event* tmp;
 
2944
    const char* errmsg;
 
2945
    if (!(tmp= (Format_description_log_event*)
 
2946
          Log_event::read_log_event(buf, event_len, &errmsg,
 
2947
                                    mi->rli.relay_log.description_event_for_queue)))
 
2948
    {
 
2949
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
2950
      goto err;
 
2951
    }
 
2952
    delete mi->rli.relay_log.description_event_for_queue;
 
2953
    mi->rli.relay_log.description_event_for_queue= tmp;
 
2954
    /*
 
2955
       Though this does some conversion to the slave's format, this will
 
2956
       preserve the master's binlog format version, and number of event types.
 
2957
    */
 
2958
    /*
 
2959
       If the event was not requested by the slave (the slave did not ask for
 
2960
       it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
 
2961
    */
 
2962
    inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
 
2963
  }
 
2964
  break;
 
2965
 
 
2966
  case HEARTBEAT_LOG_EVENT:
 
2967
  {
 
2968
    /*
 
2969
      HB (heartbeat) cannot come before RL (Relay)
 
2970
    */
 
2971
    char  llbuf[22];
 
2972
    Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
 
2973
    if (!hb.is_valid())
 
2974
    {
 
2975
      error= ER_SLAVE_HEARTBEAT_FAILURE;
 
2976
      error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
 
2977
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
 
2978
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
 
2979
      error_msg.append(STRING_WITH_LEN(" log_pos "));
 
2980
      llstr(hb.log_pos, llbuf);
 
2981
      error_msg.append(llbuf, strlen(llbuf));
 
2982
      goto err;
 
2983
    }
 
2984
    mi->received_heartbeats++;
 
2985
    /* 
 
2986
       compare local and event's versions of log_file, log_pos.
 
2987
       
 
2988
       Heartbeat is sent only after an event corresponding to the corrdinates
 
2989
       the heartbeat carries.
 
2990
       Slave can not have a difference in coordinates except in the only
 
2991
       special case when mi->master_log_name, master_log_pos have never
 
2992
       been updated by Rotate event i.e when slave does not have any history
 
2993
       with the master (and thereafter mi->master_log_pos is NULL).
 
2994
 
 
2995
       TODO: handling `when' for SHOW SLAVE STATUS' snds behind
 
2996
    */
 
2997
    if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
 
2998
         && mi->master_log_name != NULL)
 
2999
        || mi->master_log_pos != hb.log_pos)
 
3000
    {
 
3001
      /* missed events of heartbeat from the past */
 
3002
      error= ER_SLAVE_HEARTBEAT_FAILURE;
 
3003
      error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
 
3004
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
 
3005
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
 
3006
      error_msg.append(STRING_WITH_LEN(" log_pos "));
 
3007
      llstr(hb.log_pos, llbuf);
 
3008
      error_msg.append(llbuf, strlen(llbuf));
 
3009
      goto err;
 
3010
    }
 
3011
    goto skip_relay_logging;
 
3012
  }
 
3013
  break;
 
3014
    
 
3015
  default:
 
3016
    inc_pos= event_len;
 
3017
    break;
 
3018
  }
 
3019
 
 
3020
  /*
 
3021
     If this event is originating from this server, don't queue it.
 
3022
     We don't check this for 3.23 events because it's simpler like this; 3.23
 
3023
     will be filtered anyway by the SQL slave thread which also tests the
 
3024
     server id (we must also keep this test in the SQL thread, in case somebody
 
3025
     upgrades a 4.0 slave which has a not-filtered relay log).
 
3026
 
 
3027
     ANY event coming from ourselves can be ignored: it is obvious for queries;
 
3028
     for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
 
3029
     (--log-slave-updates would not log that) unless this slave is also its
 
3030
     direct master (an unsupported, useless setup!).
 
3031
  */
 
3032
 
 
3033
  pthread_mutex_lock(log_lock);
 
3034
 
 
3035
  if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
 
3036
      !mi->rli.replicate_same_server_id)
 
3037
  {
 
3038
    /*
 
3039
      Do not write it to the relay log.
 
3040
      a) We still want to increment mi->master_log_pos, so that we won't
 
3041
      re-read this event from the master if the slave IO thread is now
 
3042
      stopped/restarted (more efficient if the events we are ignoring are big
 
3043
      LOAD DATA INFILE).
 
3044
      b) We want to record that we are skipping events, for the information of
 
3045
      the slave SQL thread, otherwise that thread may let
 
3046
      rli->group_relay_log_pos stay too small if the last binlog's event is
 
3047
      ignored.
 
3048
      But events which were generated by this slave and which do not exist in
 
3049
      the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
 
3050
      mi->master_log_pos.
 
3051
    */
 
3052
    if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
 
3053
        buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
 
3054
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
 
3055
    {
 
3056
      mi->master_log_pos+= inc_pos;
 
3057
      memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
 
3058
      assert(rli->ign_master_log_name_end[0]);
 
3059
      rli->ign_master_log_pos_end= mi->master_log_pos;
 
3060
    }
 
3061
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
 
3062
  }
 
3063
  else
 
3064
  {
 
3065
    /* write the event to the relay log */
 
3066
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
 
3067
    {
 
3068
      mi->master_log_pos+= inc_pos;
 
3069
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3070
    }
 
3071
    else
 
3072
    {
 
3073
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
3074
    }
 
3075
    rli->ign_master_log_name_end[0]= 0; // last event is not ignored
 
3076
  }
 
3077
  pthread_mutex_unlock(log_lock);
 
3078
 
 
3079
skip_relay_logging:
 
3080
  
 
3081
err:
 
3082
  pthread_mutex_unlock(&mi->data_lock);
 
3083
  if (error)
 
3084
    mi->report(ERROR_LEVEL, error, ER(error),
 
3085
               (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
 
3086
               _("could not queue event from master") :
 
3087
               error_msg.ptr());
 
3088
  return(error);
 
3089
}
 
3090
 
 
3091
 
 
3092
void end_relay_log_info(Relay_log_info* rli)
 
3093
{
 
3094
  if (!rli->inited)
 
3095
    return;
 
3096
  if (rli->info_fd >= 0)
 
3097
  {
 
3098
    end_io_cache(&rli->info_file);
 
3099
    (void) my_close(rli->info_fd, MYF(MY_WME));
 
3100
    rli->info_fd = -1;
 
3101
  }
 
3102
  if (rli->cur_log_fd >= 0)
 
3103
  {
 
3104
    end_io_cache(&rli->cache_buf);
 
3105
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
 
3106
    rli->cur_log_fd = -1;
 
3107
  }
 
3108
  rli->inited = 0;
 
3109
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
3110
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3111
  /*
 
3112
    Delete the slave's temporary tables from memory.
 
3113
    In the future there will be other actions than this, to ensure persistance
 
3114
    of slave's temp tables after shutdown.
 
3115
  */
 
3116
  rli->close_temporary_tables();
 
3117
  return;
 
3118
}
 
3119
 
 
3120
/*
 
3121
  Try to connect until successful or slave killed
 
3122
 
 
3123
  SYNPOSIS
 
3124
    safe_connect()
 
3125
    thd                 Thread handler for slave
 
3126
    DRIZZLE               DRIZZLE connection handle
 
3127
    mi                  Replication handle
 
3128
 
 
3129
  RETURN
 
3130
    0   ok
 
3131
    #   Error
 
3132
*/
 
3133
 
 
3134
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi)
 
3135
{
 
3136
  return(connect_to_master(thd, drizzle, mi, 0, 0));
 
3137
}
 
3138
 
 
3139
 
 
3140
/*
 
3141
  SYNPOSIS
 
3142
    connect_to_master()
 
3143
 
 
3144
  IMPLEMENTATION
 
3145
    Try to connect until successful or slave killed or we have retried
 
3146
    master_retry_count times
 
3147
*/
 
3148
 
 
3149
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
3150
                             bool reconnect, bool suppress_warnings)
 
3151
{
 
3152
  int32_t slave_was_killed;
 
3153
  int32_t last_errno= -2;                           // impossible error
 
3154
  uint32_t err_count=0;
 
3155
  char llbuff[22];
 
3156
 
 
3157
  mi->events_till_disconnect = disconnect_slave_event_count;
 
3158
  uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
 
3159
  if (opt_slave_compressed_protocol)
 
3160
    client_flag=CLIENT_COMPRESS;                /* We will use compression */
 
3161
 
 
3162
  drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
 
3163
  drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
 
3164
 
 
3165
  drizzle_options(drizzle, DRIZZLE_SET_CHARSET_NAME, default_charset_info->csname);
 
3166
  /* This one is not strictly needed but we have it here for completeness */
 
3167
  drizzle_options(drizzle, DRIZZLE_SET_CHARSET_DIR, (char *) charsets_dir);
 
3168
 
 
3169
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
 
3170
         (reconnect ? drizzle_reconnect(drizzle) != 0 :
 
3171
          drizzle_connect(drizzle, mi->host, mi->user, mi->password, 0,
 
3172
                             mi->port, 0, client_flag) == 0))
 
3173
  {
 
3174
    /* Don't repeat last error */
 
3175
    if ((int32_t)drizzle_errno(drizzle) != last_errno)
 
3176
    {
 
3177
      last_errno=drizzle_errno(drizzle);
 
3178
      suppress_warnings= 0;
 
3179
      mi->report(ERROR_LEVEL, last_errno,
 
3180
                 _("error %s to master '%s@%s:%d'"
 
3181
                   " - retry-time: %d  retries: %u"),
 
3182
                 (reconnect ? _("reconnecting") : _("connecting")),
 
3183
                 mi->user, mi->host, mi->port,
 
3184
                 mi->connect_retry, master_retry_count);
 
3185
    }
 
3186
    /*
 
3187
      By default we try forever. The reason is that failure will trigger
 
3188
      master election, so if the user did not set master_retry_count we
 
3189
      do not want to have election triggered on the first failure to
 
3190
      connect
 
3191
    */
 
3192
    if (++err_count == master_retry_count)
 
3193
    {
 
3194
      slave_was_killed=1;
 
3195
      if (reconnect)
 
3196
        change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
 
3197
      break;
 
3198
    }
 
3199
    safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
 
3200
               (void*)mi);
 
3201
  }
 
3202
 
 
3203
  if (!slave_was_killed)
 
3204
  {
 
3205
    if (reconnect)
 
3206
    {
 
3207
      if (!suppress_warnings && global_system_variables.log_warnings)
 
3208
        sql_print_information(_("Slave: connected to master '%s@%s:%d', "
 
3209
                                "replication resumed in log '%s' at "
 
3210
                                "position %s"), mi->user,
 
3211
                                mi->host, mi->port,
 
3212
                                IO_RPL_LOG_NAME,
 
3213
                                llstr(mi->master_log_pos,llbuff));
 
3214
    }
 
3215
    else
 
3216
    {
 
3217
      change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
 
3218
      general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d",
 
3219
                        mi->user, mi->host, mi->port);
 
3220
    }
 
3221
#ifdef SIGNAL_WITH_VIO_CLOSE
 
3222
    thd->set_active_vio(drizzle->net.vio);
 
3223
#endif
 
3224
  }
 
3225
  drizzle->reconnect= 1;
 
3226
  return(slave_was_killed);
 
3227
}
 
3228
 
 
3229
 
 
3230
/*
 
3231
  safe_reconnect()
 
3232
 
 
3233
  IMPLEMENTATION
 
3234
    Try to connect until successful or slave killed or we have retried
 
3235
    master_retry_count times
 
3236
*/
 
3237
 
 
3238
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
3239
                          bool suppress_warnings)
 
3240
{
 
3241
  return(connect_to_master(thd, drizzle, mi, 1, suppress_warnings));
 
3242
}
 
3243
 
 
3244
 
 
3245
/*
 
3246
  Store the file and position where the execute-slave thread are in the
 
3247
  relay log.
 
3248
 
 
3249
  SYNOPSIS
 
3250
    flush_relay_log_info()
 
3251
    rli                 Relay log information
 
3252
 
 
3253
  NOTES
 
3254
    - As this is only called by the slave thread, we don't need to
 
3255
      have a lock on this.
 
3256
    - If there is an active transaction, then we don't update the position
 
3257
      in the relay log.  This is to ensure that we re-execute statements
 
3258
      if we die in the middle of an transaction that was rolled back.
 
3259
    - As a transaction never spans binary logs, we don't have to handle the
 
3260
      case where we do a relay-log-rotation in the middle of the transaction.
 
3261
      If this would not be the case, we would have to ensure that we
 
3262
      don't delete the relay log file where the transaction started when
 
3263
      we switch to a new relay log file.
 
3264
 
 
3265
  TODO
 
3266
    - Change the log file information to a binary format to avoid calling
 
3267
      int64_t2str.
 
3268
 
 
3269
  RETURN VALUES
 
3270
    0   ok
 
3271
    1   write error
 
3272
*/
 
3273
 
 
3274
bool flush_relay_log_info(Relay_log_info* rli)
 
3275
{
 
3276
  bool error=0;
 
3277
 
 
3278
  if (unlikely(rli->no_storage))
 
3279
    return(0);
 
3280
 
 
3281
  IO_CACHE *file = &rli->info_file;
 
3282
  char buff[FN_REFLEN*2+22*2+4], *pos;
 
3283
 
 
3284
  my_b_seek(file, 0L);
 
3285
  pos=stpcpy(buff, rli->group_relay_log_name);
 
3286
  *pos++='\n';
 
3287
  pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
 
3288
  *pos++='\n';
 
3289
  pos=stpcpy(pos, rli->group_master_log_name);
 
3290
  *pos++='\n';
 
3291
  pos=int64_t2str(rli->group_master_log_pos, pos, 10);
 
3292
  *pos='\n';
 
3293
  if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
 
3294
    error=1;
 
3295
  if (flush_io_cache(file))
 
3296
    error=1;
 
3297
 
 
3298
  /* Flushing the relay log is done by the slave I/O thread */
 
3299
  return(error);
 
3300
}
 
3301
 
 
3302
 
 
3303
/*
 
3304
  Called when we notice that the current "hot" log got rotated under our feet.
 
3305
*/
 
3306
 
 
3307
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
 
3308
{
 
3309
  assert(rli->cur_log != &rli->cache_buf);
 
3310
  assert(rli->cur_log_fd == -1);
 
3311
 
 
3312
  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
 
3313
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
 
3314
                                   errmsg)) <0)
 
3315
    return(0);
 
3316
  /*
 
3317
    We want to start exactly where we was before:
 
3318
    relay_log_pos       Current log pos
 
3319
    pending             Number of bytes already processed from the event
 
3320
  */
 
3321
  rli->event_relay_log_pos= max(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
 
3322
  my_b_seek(cur_log,rli->event_relay_log_pos);
 
3323
  return(cur_log);
 
3324
}
 
3325
 
 
3326
 
 
3327
static Log_event* next_event(Relay_log_info* rli)
 
3328
{
 
3329
  Log_event* ev;
 
3330
  IO_CACHE* cur_log = rli->cur_log;
 
3331
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
 
3332
  const char* errmsg=0;
 
3333
  THD* thd = rli->sql_thd;
 
3334
 
 
3335
  assert(thd != 0);
 
3336
 
 
3337
  if (abort_slave_event_count && !rli->events_till_abort--)
 
3338
    return(0);
 
3339
 
 
3340
  /*
 
3341
    For most operations we need to protect rli members with data_lock,
 
3342
    so we assume calling function acquired this mutex for us and we will
 
3343
    hold it for the most of the loop below However, we will release it
 
3344
    whenever it is worth the hassle,  and in the cases when we go into a
 
3345
    pthread_cond_wait() with the non-data_lock mutex
 
3346
  */
 
3347
  safe_mutex_assert_owner(&rli->data_lock);
 
3348
 
 
3349
  while (!sql_slave_killed(thd,rli))
 
3350
  {
 
3351
    /*
 
3352
      We can have two kinds of log reading:
 
3353
      hot_log:
 
3354
        rli->cur_log points at the IO_CACHE of relay_log, which
 
3355
        is actively being updated by the I/O thread. We need to be careful
 
3356
        in this case and make sure that we are not looking at a stale log that
 
3357
        has already been rotated. If it has been, we reopen the log.
 
3358
 
 
3359
      The other case is much simpler:
 
3360
        We just have a read only log that nobody else will be updating.
 
3361
    */
 
3362
    bool hot_log;
 
3363
    if ((hot_log = (cur_log != &rli->cache_buf)))
 
3364
    {
 
3365
      assert(rli->cur_log_fd == -1); // foreign descriptor
 
3366
      pthread_mutex_lock(log_lock);
 
3367
 
 
3368
      /*
 
3369
        Reading xxx_file_id is safe because the log will only
 
3370
        be rotated when we hold relay_log.LOCK_log
 
3371
      */
 
3372
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
 
3373
      {
 
3374
        // The master has switched to a new log file; Reopen the old log file
 
3375
        cur_log=reopen_relay_log(rli, &errmsg);
 
3376
        pthread_mutex_unlock(log_lock);
 
3377
        if (!cur_log)                           // No more log files
 
3378
          goto err;
 
3379
        hot_log=0;                              // Using old binary log
 
3380
      }
 
3381
    }
 
3382
    /* 
 
3383
      As there is no guarantee that the relay is open (for example, an I/O
 
3384
      error during a write by the slave I/O thread may have closed it), we
 
3385
      have to test it.
 
3386
    */
 
3387
    if (!my_b_inited(cur_log))
 
3388
      goto err;
 
3389
    assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
 
3390
    assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
 
3391
 
 
3392
    /*
 
3393
      Relay log is always in new format - if the master is 3.23, the
 
3394
      I/O thread will convert the format for us.
 
3395
      A problem: the description event may be in a previous relay log. So if
 
3396
      the slave has been shutdown meanwhile, we would have to look in old relay
 
3397
      logs, which may even have been deleted. So we need to write this
 
3398
      description event at the beginning of the relay log.
 
3399
      When the relay log is created when the I/O thread starts, easy: the
 
3400
      master will send the description event and we will queue it.
 
3401
      But if the relay log is created by new_file(): then the solution is:
 
3402
      DRIZZLE_BIN_LOG::open() will write the buffered description event.
 
3403
    */
 
3404
    if ((ev=Log_event::read_log_event(cur_log,0,
 
3405
                                      rli->relay_log.description_event_for_exec)))
 
3406
 
 
3407
    {
 
3408
      assert(thd==rli->sql_thd);
 
3409
      /*
 
3410
        read it while we have a lock, to avoid a mutex lock in
 
3411
        inc_event_relay_log_pos()
 
3412
      */
 
3413
      rli->future_event_relay_log_pos= my_b_tell(cur_log);
 
3414
      if (hot_log)
 
3415
        pthread_mutex_unlock(log_lock);
 
3416
      return(ev);
 
3417
    }
 
3418
    assert(thd==rli->sql_thd);
 
3419
    if (opt_reckless_slave)                     // For mysql-test
 
3420
      cur_log->error = 0;
 
3421
    if (cur_log->error < 0)
 
3422
    {
 
3423
      errmsg = "slave SQL thread aborted because of I/O error";
 
3424
      if (hot_log)
 
3425
        pthread_mutex_unlock(log_lock);
 
3426
      goto err;
 
3427
    }
 
3428
    if (!cur_log->error) /* EOF */
 
3429
    {
 
3430
      /*
 
3431
        On a hot log, EOF means that there are no more updates to
 
3432
        process and we must block until I/O thread adds some and
 
3433
        signals us to continue
 
3434
      */
 
3435
      if (hot_log)
 
3436
      {
 
3437
        /*
 
3438
          We say in Seconds_Behind_Master that we have "caught up". Note that
 
3439
          for example if network link is broken but I/O slave thread hasn't
 
3440
          noticed it (slave_net_timeout not elapsed), then we'll say "caught
 
3441
          up" whereas we're not really caught up. Fixing that would require
 
3442
          internally cutting timeout in smaller pieces in network read, no
 
3443
          thanks. Another example: SQL has caught up on I/O, now I/O has read
 
3444
          a new event and is queuing it; the false "0" will exist until SQL
 
3445
          finishes executing the new event; it will be look abnormal only if
 
3446
          the events have old timestamps (then you get "many", 0, "many").
 
3447
 
 
3448
          Transient phases like this can be fixed with implemeting
 
3449
          Heartbeat event which provides the slave the status of the
 
3450
          master at time the master does not have any new update to send.
 
3451
          Seconds_Behind_Master would be zero only when master has no
 
3452
          more updates in binlog for slave. The heartbeat can be sent
 
3453
          in a (small) fraction of slave_net_timeout. Until it's done
 
3454
          rli->last_master_timestamp is temporarely (for time of
 
3455
          waiting for the following event) reset whenever EOF is
 
3456
          reached.
 
3457
        */
 
3458
        time_t save_timestamp= rli->last_master_timestamp;
 
3459
        rli->last_master_timestamp= 0;
 
3460
 
 
3461
        assert(rli->relay_log.get_open_count() ==
 
3462
                    rli->cur_log_old_open_count);
 
3463
 
 
3464
        if (rli->ign_master_log_name_end[0])
 
3465
        {
 
3466
          /* We generate and return a Rotate, to make our positions advance */
 
3467
          ev= new Rotate_log_event(rli->ign_master_log_name_end,
 
3468
                                   0, rli->ign_master_log_pos_end,
 
3469
                                   Rotate_log_event::DUP_NAME);
 
3470
          rli->ign_master_log_name_end[0]= 0;
 
3471
          pthread_mutex_unlock(log_lock);
 
3472
          if (unlikely(!ev))
 
3473
          {
 
3474
            errmsg= "Slave SQL thread failed to create a Rotate event "
 
3475
              "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
 
3476
            goto err;
 
3477
          }
 
3478
          ev->server_id= 0; // don't be ignored by slave SQL thread
 
3479
          return(ev);
 
3480
        }
 
3481
 
 
3482
        /*
 
3483
          We can, and should release data_lock while we are waiting for
 
3484
          update. If we do not, show slave status will block
 
3485
        */
 
3486
        pthread_mutex_unlock(&rli->data_lock);
 
3487
 
 
3488
        /*
 
3489
          Possible deadlock :
 
3490
          - the I/O thread has reached log_space_limit
 
3491
          - the SQL thread has read all relay logs, but cannot purge for some
 
3492
          reason:
 
3493
            * it has already purged all logs except the current one
 
3494
            * there are other logs than the current one but they're involved in
 
3495
            a transaction that finishes in the current one (or is not finished)
 
3496
          Solution :
 
3497
          Wake up the possibly waiting I/O thread, and set a boolean asking
 
3498
          the I/O thread to temporarily ignore the log_space_limit
 
3499
          constraint, because we do not want the I/O thread to block because of
 
3500
          space (it's ok if it blocks for any other reason (e.g. because the
 
3501
          master does not send anything). Then the I/O thread stops waiting
 
3502
          and reads more events.
 
3503
          The SQL thread decides when the I/O thread should take log_space_limit
 
3504
          into account again : ignore_log_space_limit is reset to 0
 
3505
          in purge_first_log (when the SQL thread purges the just-read relay
 
3506
          log), and also when the SQL thread starts. We should also reset
 
3507
          ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
 
3508
          fact, no need as RESET SLAVE requires that the slave
 
3509
          be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
 
3510
          it stops.
 
3511
        */
 
3512
        pthread_mutex_lock(&rli->log_space_lock);
 
3513
        // prevent the I/O thread from blocking next times
 
3514
        rli->ignore_log_space_limit= 1;
 
3515
        /*
 
3516
          If the I/O thread is blocked, unblock it.  Ok to broadcast
 
3517
          after unlock, because the mutex is only destroyed in
 
3518
          ~Relay_log_info(), i.e. when rli is destroyed, and rli will
 
3519
          not be destroyed before we exit the present function.
 
3520
        */
 
3521
        pthread_mutex_unlock(&rli->log_space_lock);
 
3522
        pthread_cond_broadcast(&rli->log_space_cond);
 
3523
        // Note that wait_for_update_relay_log unlocks lock_log !
 
3524
        rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
 
3525
        // re-acquire data lock since we released it earlier
 
3526
        pthread_mutex_lock(&rli->data_lock);
 
3527
        rli->last_master_timestamp= save_timestamp;
 
3528
        continue;
 
3529
      }
 
3530
      /*
 
3531
        If the log was not hot, we need to move to the next log in
 
3532
        sequence. The next log could be hot or cold, we deal with both
 
3533
        cases separately after doing some common initialization
 
3534
      */
 
3535
      end_io_cache(cur_log);
 
3536
      assert(rli->cur_log_fd >= 0);
 
3537
      my_close(rli->cur_log_fd, MYF(MY_WME));
 
3538
      rli->cur_log_fd = -1;
 
3539
 
 
3540
      if (relay_log_purge)
 
3541
      {
 
3542
        /*
 
3543
          purge_first_log will properly set up relay log coordinates in rli.
 
3544
          If the group's coordinates are equal to the event's coordinates
 
3545
          (i.e. the relay log was not rotated in the middle of a group),
 
3546
          we can purge this relay log too.
 
3547
          We do uint64_t and string comparisons, this may be slow but
 
3548
          - purging the last relay log is nice (it can save 1GB of disk), so we
 
3549
          like to detect the case where we can do it, and given this,
 
3550
          - I see no better detection method
 
3551
          - purge_first_log is not called that often
 
3552
        */
 
3553
        if (rli->relay_log.purge_first_log
 
3554
            (rli,
 
3555
             rli->group_relay_log_pos == rli->event_relay_log_pos
 
3556
             && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
 
3557
        {
 
3558
          errmsg = "Error purging processed logs";
 
3559
          goto err;
 
3560
        }
 
3561
      }
 
3562
      else
 
3563
      {
 
3564
        /*
 
3565
          If hot_log is set, then we already have a lock on
 
3566
          LOCK_log.  If not, we have to get the lock.
 
3567
 
 
3568
          According to Sasha, the only time this code will ever be executed
 
3569
          is if we are recovering from a bug.
 
3570
        */
 
3571
        if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
 
3572
        {
 
3573
          errmsg = "error switching to the next log";
 
3574
          goto err;
 
3575
        }
 
3576
        rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
 
3577
        strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
 
3578
                sizeof(rli->event_relay_log_name)-1);
 
3579
        flush_relay_log_info(rli);
 
3580
      }
 
3581
 
 
3582
      /*
 
3583
        Now we want to open this next log. To know if it's a hot log (the one
 
3584
        being written by the I/O thread now) or a cold log, we can use
 
3585
        is_active(); if it is hot, we use the I/O cache; if it's cold we open
 
3586
        the file normally. But if is_active() reports that the log is hot, this
 
3587
        may change between the test and the consequence of the test. So we may
 
3588
        open the I/O cache whereas the log is now cold, which is nonsense.
 
3589
        To guard against this, we need to have LOCK_log.
 
3590
      */
 
3591
 
 
3592
      if (!hot_log) /* if hot_log, we already have this mutex */
 
3593
        pthread_mutex_lock(log_lock);
 
3594
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
 
3595
      {
 
3596
#ifdef EXTRA_DEBUG
 
3597
        if (global_system_variables.log_warnings)
 
3598
          sql_print_information(_("next log '%s' is currently active"),
 
3599
                                rli->linfo.log_file_name);
 
3600
#endif
 
3601
        rli->cur_log= cur_log= rli->relay_log.get_log_file();
 
3602
        rli->cur_log_old_open_count= rli->relay_log.get_open_count();
 
3603
        assert(rli->cur_log_fd == -1);
 
3604
 
 
3605
        /*
 
3606
          Read pointer has to be at the start since we are the only
 
3607
          reader.
 
3608
          We must keep the LOCK_log to read the 4 first bytes, as this is a hot
 
3609
          log (same as when we call read_log_event() above: for a hot log we
 
3610
          take the mutex).
 
3611
        */
 
3612
        if (check_binlog_magic(cur_log,&errmsg))
 
3613
        {
 
3614
          if (!hot_log) pthread_mutex_unlock(log_lock);
 
3615
          goto err;
 
3616
        }
 
3617
        if (!hot_log) pthread_mutex_unlock(log_lock);
 
3618
        continue;
 
3619
      }
 
3620
      if (!hot_log) pthread_mutex_unlock(log_lock);
 
3621
      /*
 
3622
        if we get here, the log was not hot, so we will have to open it
 
3623
        ourselves. We are sure that the log is still not hot now (a log can get
 
3624
        from hot to cold, but not from cold to hot). No need for LOCK_log.
 
3625
      */
 
3626
#ifdef EXTRA_DEBUG
 
3627
      if (global_system_variables.log_warnings)
 
3628
        sql_print_information(_("next log '%s' is not active"),
 
3629
                              rli->linfo.log_file_name);
 
3630
#endif
 
3631
      // open_binlog() will check the magic header
 
3632
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
 
3633
                                       &errmsg)) <0)
 
3634
        goto err;
 
3635
    }
 
3636
    else
 
3637
    {
 
3638
      /*
 
3639
        Read failed with a non-EOF error.
 
3640
        TODO: come up with something better to handle this error
 
3641
      */
 
3642
      if (hot_log)
 
3643
        pthread_mutex_unlock(log_lock);
 
3644
      sql_print_error(_("Slave SQL thread: I/O error reading "
 
3645
                        "event(errno: %d  cur_log->error: %d)"),
 
3646
                      my_errno,cur_log->error);
 
3647
      // set read position to the beginning of the event
 
3648
      my_b_seek(cur_log,rli->event_relay_log_pos);
 
3649
      /* otherwise, we have had a partial read */
 
3650
      errmsg = _("Aborting slave SQL thread because of partial event read");
 
3651
      break;                                    // To end of function
 
3652
    }
 
3653
  }
 
3654
  if (!errmsg && global_system_variables.log_warnings)
 
3655
  {
 
3656
    sql_print_information(_("Error reading relay log event: %s"),
 
3657
                          _("slave SQL thread was killed"));
 
3658
    return(0);
 
3659
  }
 
3660
 
 
3661
err:
 
3662
  if (errmsg)
 
3663
    sql_print_error(_("Error reading relay log event: %s"), errmsg);
 
3664
  return(0);
 
3665
}
 
3666
 
 
3667
/*
 
3668
  Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
 
3669
  because of size is simpler because when we do it we already have all relevant
 
3670
  locks; here we don't, so this function is mainly taking locks).
 
3671
  Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
 
3672
  is void).
 
3673
*/
 
3674
 
 
3675
void rotate_relay_log(Master_info* mi)
 
3676
{
 
3677
  Relay_log_info* rli= &mi->rli;
 
3678
 
 
3679
  /* We don't lock rli->run_lock. This would lead to deadlocks. */
 
3680
  pthread_mutex_lock(&mi->run_lock);
 
3681
 
 
3682
  /*
 
3683
     We need to test inited because otherwise, new_file() will attempt to lock
 
3684
     LOCK_log, which may not be inited (if we're not a slave).
 
3685
  */
 
3686
  if (!rli->inited)
 
3687
  {
 
3688
    goto end;
 
3689
  }
 
3690
 
 
3691
  /* If the relay log is closed, new_file() will do nothing. */
 
3692
  rli->relay_log.new_file();
 
3693
 
 
3694
  /*
 
3695
    We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
 
3696
    be counted, so imagine a succession of FLUSH LOGS  and assume the slave
 
3697
    threads are started:
 
3698
    relay_log_space decreases by the size of the deleted relay log, but does
 
3699
    not increase, so flush-after-flush we may become negative, which is wrong.
 
3700
    Even if this will be corrected as soon as a query is replicated on the
 
3701
    slave (because the I/O thread will then call harvest_bytes_written() which
 
3702
    will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
 
3703
    output in SHOW SLAVE STATUS meanwhile. So we harvest now.
 
3704
    If the log is closed, then this will just harvest the last writes, probably
 
3705
    0 as they probably have been harvested.
 
3706
  */
 
3707
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3708
end:
 
3709
  pthread_mutex_unlock(&mi->run_lock);
 
3710
  return;
 
3711
}
 
3712
 
 
3713
 
 
3714
/**
 
3715
   Detects, based on master's version (as found in the relay log), if master
 
3716
   has a certain bug.
 
3717
   @param rli Relay_log_info which tells the master's version
 
3718
   @param bug_id Number of the bug as found in bugs.mysql.com
 
3719
   @param report bool report error message, default TRUE
 
3720
   @return true if master has the bug, FALSE if it does not.
 
3721
*/
 
3722
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
 
3723
{
 
3724
  struct st_version_range_for_one_bug {
 
3725
    uint32_t        bug_id;
 
3726
    const uchar introduced_in[3]; // first version with bug
 
3727
    const uchar fixed_in[3];      // first version with fix
 
3728
  };
 
3729
  static struct st_version_range_for_one_bug versions_for_all_bugs[]=
 
3730
  {
 
3731
    {24432, { 5, 0, 24 }, { 5, 0, 38 } },
 
3732
    {24432, { 5, 1, 12 }, { 5, 1, 17 } },
 
3733
    {33029, { 5, 0,  0 }, { 5, 0, 58 } },
 
3734
    {33029, { 5, 1,  0 }, { 5, 1, 12 } },
 
3735
  };
 
3736
  const uchar *master_ver=
 
3737
    rli->relay_log.description_event_for_exec->server_version_split;
 
3738
 
 
3739
  assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
 
3740
 
 
3741
  for (uint32_t i= 0;
 
3742
       i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
 
3743
  {
 
3744
    const uchar *introduced_in= versions_for_all_bugs[i].introduced_in,
 
3745
      *fixed_in= versions_for_all_bugs[i].fixed_in;
 
3746
    if ((versions_for_all_bugs[i].bug_id == bug_id) &&
 
3747
        (memcmp(introduced_in, master_ver, 3) <= 0) &&
 
3748
        (memcmp(fixed_in,      master_ver, 3) >  0))
 
3749
    {
 
3750
      if (!report)
 
3751
        return true;
 
3752
 
 
3753
      // a short message for SHOW SLAVE STATUS (message length constraints)
 
3754
      my_printf_error(ER_UNKNOWN_ERROR,
 
3755
                      _("master may suffer from"
 
3756
                        " http://bugs.mysql.com/bug.php?id=%u"
 
3757
                        " so slave stops; check error log on slave"
 
3758
                        " for more info"), MYF(0), bug_id);
 
3759
      // a verbose message for the error log
 
3760
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
 
3761
                  _("According to the master's version ('%s'),"
 
3762
                    " it is probable that master suffers from this bug:"
 
3763
                    " http://bugs.mysql.com/bug.php?id=%u"
 
3764
                    " and thus replicating the current binary log event"
 
3765
                    " may make the slave's data become different from the"
 
3766
                    " master's data."
 
3767
                    " To take no risk, slave refuses to replicate"
 
3768
                    " this event and stops."
 
3769
                    " We recommend that all updates be stopped on the"
 
3770
                    " master and slave, that the data of both be"
 
3771
                    " manually synchronized,"
 
3772
                    " that master's binary logs be deleted,"
 
3773
                    " that master be upgraded to a version at least"
 
3774
                    " equal to '%d.%d.%d'. Then replication can be"
 
3775
                    " restarted."),
 
3776
                  rli->relay_log.description_event_for_exec->server_version,
 
3777
                  bug_id,
 
3778
                  fixed_in[0], fixed_in[1], fixed_in[2]);
 
3779
      return true;
 
3780
    }
 
3781
  }
 
3782
  return false;
 
3783
}
 
3784
 
 
3785
/**
 
3786
   BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
 
3787
   exclusive, if one statement in a SP generated AUTO_INCREMENT value
 
3788
   by the top statement, all statements after it would be considered
 
3789
   generated AUTO_INCREMENT value by the top statement, and a
 
3790
   erroneous INSERT_ID value might be associated with these statement,
 
3791
   which could cause duplicate entry error and stop the slave.
 
3792
 
 
3793
   Detect buggy master to work around.
 
3794
 */
 
3795
bool rpl_master_erroneous_autoinc(THD *thd)
 
3796
{
 
3797
  if (active_mi && active_mi->rli.sql_thd == thd)
 
3798
  {
 
3799
    Relay_log_info *rli= &active_mi->rli;
 
3800
    return rpl_master_has_bug(rli, 33029, false);
 
3801
  }
 
3802
  return false;
 
3803
}
 
3804
 
 
3805
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
3806
template class I_List_iterator<i_string>;
 
3807
template class I_List_iterator<i_string_pair>;
 
3808
#endif
 
3809
 
 
3810
/**
 
3811
  @} (end of group Replication)
 
3812
*/
 
3813
 
 
3814
#endif /* HAVE_REPLICATION */