~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/slave.cc

  • Committer: Monty Taylor
  • Date: 2008-11-16 20:15:33 UTC
  • mto: (584.1.9 devel)
  • mto: This revision was merged to the branch mainline in revision 589.
  • Revision ID: monty@inaugust.com-20081116201533-d0f19s1bk1h95iyw
Removed a big bank of includes from item.h.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2003 DRIZZLE AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/**
 
18
  @addtogroup Replication
 
19
  @{
 
20
 
 
21
  @file
 
22
 
 
23
  @brief Code to run the io thread and the sql thread on the
 
24
  replication slave.
 
25
*/
 
26
#include <drizzled/server_includes.h>
 
27
 
 
28
#include <storage/myisam/myisam.h>
 
29
#include <drizzled/replication/mi.h>
 
30
#include <drizzled/replication/rli.h>
 
31
#include <drizzled/replication/replication.h>
 
32
#include <drizzled/replication/filter.h>
 
33
#include <mysys/thr_alarm.h>
 
34
#include <libdrizzle/errmsg.h>
 
35
#include <mysys/mysys_err.h>
 
36
#include <drizzled/error.h>
 
37
#include <drizzled/sql_parse.h>
 
38
#include <drizzled/gettext.h>
 
39
#include <signal.h>
 
40
 
 
41
#if TIME_WITH_SYS_TIME
 
42
# include <sys/time.h>
 
43
# include <time.h>
 
44
#else
 
45
# if HAVE_SYS_TIME_H
 
46
#  include <sys/time.h>
 
47
# else
 
48
#  include <time.h>
 
49
# endif
 
50
#endif
 
51
 
 
52
#include <drizzled/tztime.h>
 
53
 
 
54
#include <drizzled/replication/tblmap.h>
 
55
 
 
56
#define MAX_SLAVE_RETRY_PAUSE 5
 
57
bool use_slave_mask = 0;
 
58
MY_BITMAP slave_error_mask;
 
59
 
 
60
typedef bool (*CHECK_KILLED_FUNC)(Session*,void*);
 
61
 
 
62
char* slave_load_tmpdir = 0;
 
63
Master_info *active_mi= 0;
 
64
bool replicate_same_server_id;
 
65
uint64_t relay_log_space_limit = 0;
 
66
 
 
67
/*
 
68
  When slave thread exits, we need to remember the temporary tables so we
 
69
  can re-use them on slave start.
 
70
 
 
71
  TODO: move the vars below under Master_info
 
72
*/
 
73
 
 
74
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
 
75
int32_t events_till_abort = -1;
 
76
 
 
77
enum enum_slave_reconnect_actions
 
78
{
 
79
  SLAVE_RECON_ACT_REG= 0,
 
80
  SLAVE_RECON_ACT_DUMP= 1,
 
81
  SLAVE_RECON_ACT_EVENT= 2,
 
82
  SLAVE_RECON_ACT_MAX
 
83
};
 
84
 
 
85
enum enum_slave_reconnect_messages
 
86
{
 
87
  SLAVE_RECON_MSG_WAIT= 0,
 
88
  SLAVE_RECON_MSG_KILLED_WAITING= 1,
 
89
  SLAVE_RECON_MSG_AFTER= 2,
 
90
  SLAVE_RECON_MSG_FAILED= 3,
 
91
  SLAVE_RECON_MSG_COMMAND= 4,
 
92
  SLAVE_RECON_MSG_KILLED_AFTER= 5,
 
93
  SLAVE_RECON_MSG_MAX
 
94
};
 
95
 
 
96
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
 
97
{
 
98
  {
 
99
    N_("Waiting to reconnect after a failed registration on master"),
 
100
    N_("Slave I/O thread killed while waitnig to reconnect after a "
 
101
                 "failed registration on master"),
 
102
    N_("Reconnecting after a failed registration on master"),
 
103
    N_("failed registering on master, reconnecting to try again, "
 
104
                 "log '%s' at postion %s"),
 
105
    "COM_REGISTER_SLAVE",
 
106
    N_("Slave I/O thread killed during or after reconnect")
 
107
  },
 
108
  {
 
109
    N_("Waiting to reconnect after a failed binlog dump request"),
 
110
    N_("Slave I/O thread killed while retrying master dump"),
 
111
    N_("Reconnecting after a failed binlog dump request"),
 
112
    N_("failed dump request, reconnecting to try again, "
 
113
                 "log '%s' at postion %s"),
 
114
    "COM_BINLOG_DUMP",
 
115
    N_("Slave I/O thread killed during or after reconnect")
 
116
  },
 
117
  {
 
118
    N_("Waiting to reconnect after a failed master event read"),
 
119
    N_("Slave I/O thread killed while waiting to reconnect "
 
120
                 "after a failed read"),
 
121
    N_("Reconnecting after a failed master event read"),
 
122
    N_("Slave I/O thread: Failed reading log event, "
 
123
                 "reconnecting to retry, log '%s' at postion %s"),
 
124
    "",
 
125
    N_("Slave I/O thread killed during or after a "
 
126
                 "reconnect done to recover from failed read")
 
127
  }
 
128
};
 
129
 
 
130
 
 
131
typedef enum { SLAVE_Session_IO, SLAVE_Session_SQL} SLAVE_Session_TYPE;
 
132
 
 
133
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
 
134
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
 
135
static bool wait_for_relay_log_space(Relay_log_info* rli);
 
136
static inline bool io_slave_killed(Session* session,Master_info* mi);
 
137
static inline bool sql_slave_killed(Session* session,Relay_log_info* rli);
 
138
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type);
 
139
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi);
 
140
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
 
141
                          bool suppress_warnings);
 
142
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
 
143
                             bool reconnect, bool suppress_warnings);
 
144
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
145
                      void* thread_killed_arg);
 
146
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
 
147
static Log_event* next_event(Relay_log_info* rli);
 
148
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
 
149
static int32_t terminate_slave_thread(Session *session,
 
150
                                  pthread_mutex_t* term_lock,
 
151
                                  pthread_cond_t* term_cond,
 
152
                                  volatile uint32_t *slave_running,
 
153
                                  bool skip_lock);
 
154
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info);
 
155
 
 
156
/*
 
157
  Find out which replications threads are running
 
158
 
 
159
  SYNOPSIS
 
160
    init_thread_mask()
 
161
    mask                Return value here
 
162
    mi                  master_info for slave
 
163
    inverse             If set, returns which threads are not running
 
164
 
 
165
  IMPLEMENTATION
 
166
    Get a bit mask for which threads are running so that we can later restart
 
167
    these threads.
 
168
 
 
169
  RETURN
 
170
    mask        If inverse == 0, running threads
 
171
                If inverse == 1, stopped threads
 
172
*/
 
173
 
 
174
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
 
175
{
 
176
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
 
177
  register int32_t tmp_mask=0;
 
178
 
 
179
  if (set_io)
 
180
    tmp_mask |= SLAVE_IO;
 
181
  if (set_sql)
 
182
    tmp_mask |= SLAVE_SQL;
 
183
  if (inverse)
 
184
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
 
185
  *mask = tmp_mask;
 
186
  return;
 
187
}
 
188
 
 
189
 
 
190
/*
 
191
  lock_slave_threads()
 
192
*/
 
193
 
 
194
void lock_slave_threads(Master_info* mi)
 
195
{
 
196
  //TODO: see if we can do this without dual mutex
 
197
  pthread_mutex_lock(&mi->run_lock);
 
198
  pthread_mutex_lock(&mi->rli.run_lock);
 
199
  return;
 
200
}
 
201
 
 
202
 
 
203
/*
 
204
  unlock_slave_threads()
 
205
*/
 
206
 
 
207
void unlock_slave_threads(Master_info* mi)
 
208
{
 
209
  //TODO: see if we can do this without dual mutex
 
210
  pthread_mutex_unlock(&mi->rli.run_lock);
 
211
  pthread_mutex_unlock(&mi->run_lock);
 
212
  return;
 
213
}
 
214
 
 
215
 
 
216
/* Initialize slave structures */
 
217
 
 
218
int32_t init_slave()
 
219
{
 
220
  /*
 
221
    This is called when mysqld starts. Before client connections are
 
222
    accepted. However bootstrap may conflict with us if it does START SLAVE.
 
223
    So it's safer to take the lock.
 
224
  */
 
225
  pthread_mutex_lock(&LOCK_active_mi);
 
226
  /*
 
227
    TODO: re-write this to interate through the list of files
 
228
    for multi-master
 
229
  */
 
230
  active_mi= new Master_info;
 
231
 
 
232
  /*
 
233
    If master_host is not specified, try to read it from the master_info file.
 
234
    If master_host is specified, create the master_info file if it doesn't
 
235
    exists.
 
236
  */
 
237
  if (!active_mi)
 
238
  {
 
239
    sql_print_error(_("Failed to allocate memory for the master info structure"));
 
240
    goto err;
 
241
  }
 
242
 
 
243
  if (active_mi->init_master_info(master_info_file, relay_log_info_file, (SLAVE_IO | SLAVE_SQL)))
 
244
  {
 
245
    sql_print_error(_("Failed to initialize the master info structure"));
 
246
    goto err;
 
247
  }
 
248
 
 
249
  /* If server id is not set, start_slave_thread() will say it */
 
250
 
 
251
  if (active_mi->host[0] && !opt_skip_slave_start)
 
252
  {
 
253
    if (start_slave_threads(1 /* need mutex */,
 
254
                            0 /* no wait for start*/,
 
255
                            active_mi,
 
256
                            master_info_file,
 
257
                            relay_log_info_file,
 
258
                            SLAVE_IO | SLAVE_SQL))
 
259
    {
 
260
      sql_print_error(_("Failed to create slave threads"));
 
261
      goto err;
 
262
    }
 
263
  }
 
264
  pthread_mutex_unlock(&LOCK_active_mi);
 
265
  return(0);
 
266
 
 
267
err:
 
268
  pthread_mutex_unlock(&LOCK_active_mi);
 
269
  return(1);
 
270
}
 
271
 
 
272
 
 
273
/*
 
274
  Init function to set up array for errors that should be skipped for slave
 
275
 
 
276
  SYNOPSIS
 
277
    init_slave_skip_errors()
 
278
    arg         List of errors numbers to skip, separated with ','
 
279
 
 
280
  NOTES
 
281
    Called from get_options() in mysqld.cc on start-up
 
282
*/
 
283
 
 
284
void init_slave_skip_errors(const char* arg)
 
285
{
 
286
  const char *p;
 
287
 
 
288
  if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
 
289
  {
 
290
    fprintf(stderr, "Badly out of memory, please check your system status\n");
 
291
    exit(1);
 
292
  }
 
293
  use_slave_mask = 1;
 
294
  for (;my_isspace(system_charset_info,*arg);++arg)
 
295
    /* empty */;
 
296
  if (!my_strnncoll(system_charset_info,(unsigned char*)arg,4,(const unsigned char*)"all",4))
 
297
  {
 
298
    bitmap_set_all(&slave_error_mask);
 
299
    return;
 
300
  }
 
301
  for (p= arg ; *p; )
 
302
  {
 
303
    long err_code;
 
304
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
 
305
      break;
 
306
    if (err_code < MAX_SLAVE_ERROR)
 
307
       bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
 
308
    while (!my_isdigit(system_charset_info,*p) && *p)
 
309
      p++;
 
310
  }
 
311
  return;
 
312
}
 
313
 
 
314
 
 
315
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
 
316
{
 
317
  if (!mi->inited)
 
318
    return(0); /* successfully do nothing */
 
319
  int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
 
320
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
 
321
 
 
322
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
 
323
  {
 
324
    mi->abort_slave=1;
 
325
    if ((error=terminate_slave_thread(mi->io_session,io_lock,
 
326
                                      &mi->stop_cond,
 
327
                                      &mi->slave_running,
 
328
                                      skip_lock)) &&
 
329
        !force_all)
 
330
      return(error);
 
331
  }
 
332
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
 
333
  {
 
334
    mi->rli.abort_slave=1;
 
335
    if ((error=terminate_slave_thread(mi->rli.sql_session,sql_lock,
 
336
                                      &mi->rli.stop_cond,
 
337
                                      &mi->rli.slave_running,
 
338
                                      skip_lock)) &&
 
339
        !force_all)
 
340
      return(error);
 
341
  }
 
342
  return(0);
 
343
}
 
344
 
 
345
 
 
346
/**
 
347
   Wait for a slave thread to terminate.
 
348
 
 
349
   This function is called after requesting the thread to terminate
 
350
   (by setting @c abort_slave member of @c Relay_log_info or @c
 
351
   Master_info structure to 1). Termination of the thread is
 
352
   controlled with the the predicate <code>*slave_running</code>.
 
353
 
 
354
   Function will acquire @c term_lock before waiting on the condition
 
355
   unless @c skip_lock is true in which case the mutex should be owned
 
356
   by the caller of this function and will remain acquired after
 
357
   return from the function.
 
358
 
 
359
   @param term_lock
 
360
          Associated lock to use when waiting for @c term_cond
 
361
 
 
362
   @param term_cond
 
363
          Condition that is signalled when the thread has terminated
 
364
 
 
365
   @param slave_running
 
366
          Pointer to predicate to check for slave thread termination
 
367
 
 
368
   @param skip_lock
 
369
          If @c true the lock will not be acquired before waiting on
 
370
          the condition. In this case, it is assumed that the calling
 
371
          function acquires the lock before calling this function.
 
372
 
 
373
   @retval 0 All OK
 
374
 */
 
375
static int32_t
 
376
terminate_slave_thread(Session *session,
 
377
                       pthread_mutex_t* term_lock,
 
378
                       pthread_cond_t* term_cond,
 
379
                       volatile uint32_t *slave_running,
 
380
                       bool skip_lock)
 
381
{
 
382
  int32_t error;
 
383
 
 
384
  if (!skip_lock)
 
385
    pthread_mutex_lock(term_lock);
 
386
 
 
387
  safe_mutex_assert_owner(term_lock);
 
388
 
 
389
  if (!*slave_running)
 
390
  {
 
391
    if (!skip_lock)
 
392
      pthread_mutex_unlock(term_lock);
 
393
    return(ER_SLAVE_NOT_RUNNING);
 
394
  }
 
395
  assert(session != 0);
 
396
  Session_CHECK_SENTRY(session);
 
397
 
 
398
  /*
 
399
    Is is critical to test if the slave is running. Otherwise, we might
 
400
    be referening freed memory trying to kick it
 
401
  */
 
402
 
 
403
  while (*slave_running)                        // Should always be true
 
404
  {
 
405
    pthread_mutex_lock(&session->LOCK_delete);
 
406
#ifndef DONT_USE_THR_ALARM
 
407
    /*
 
408
      Error codes from pthread_kill are:
 
409
      EINVAL: invalid signal number (can't happen)
 
410
      ESRCH: thread already killed (can happen, should be ignored)
 
411
    */
 
412
    int32_t err= pthread_kill(session->real_id, thr_client_alarm);
 
413
    assert(err != EINVAL);
 
414
#endif
 
415
    session->awake(Session::NOT_KILLED);
 
416
    pthread_mutex_unlock(&session->LOCK_delete);
 
417
 
 
418
    /*
 
419
      There is a small chance that slave thread might miss the first
 
420
      alarm. To protect againts it, resend the signal until it reacts
 
421
    */
 
422
    struct timespec abstime;
 
423
    set_timespec(abstime,2);
 
424
    error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
 
425
    assert(error == ETIMEDOUT || error == 0);
 
426
  }
 
427
 
 
428
  assert(*slave_running == 0);
 
429
 
 
430
  if (!skip_lock)
 
431
    pthread_mutex_unlock(term_lock);
 
432
  return(0);
 
433
}
 
434
 
 
435
 
 
436
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
 
437
                           pthread_mutex_t *cond_lock,
 
438
                           pthread_cond_t *start_cond,
 
439
                           volatile uint32_t *slave_running,
 
440
                           volatile uint32_t *slave_run_id,
 
441
                           Master_info* mi,
 
442
                           bool high_priority)
 
443
{
 
444
  pthread_t th;
 
445
  uint32_t start_id;
 
446
 
 
447
  assert(mi->inited);
 
448
 
 
449
  if (start_lock)
 
450
    pthread_mutex_lock(start_lock);
 
451
  if (!server_id)
 
452
  {
 
453
    if (start_cond)
 
454
      pthread_cond_broadcast(start_cond);
 
455
    if (start_lock)
 
456
      pthread_mutex_unlock(start_lock);
 
457
    sql_print_error(_("Server id not set, will not start slave"));
 
458
    return(ER_BAD_SLAVE);
 
459
  }
 
460
 
 
461
  if (*slave_running)
 
462
  {
 
463
    if (start_cond)
 
464
      pthread_cond_broadcast(start_cond);
 
465
    if (start_lock)
 
466
      pthread_mutex_unlock(start_lock);
 
467
    return(ER_SLAVE_MUST_STOP);
 
468
  }
 
469
  start_id= *slave_run_id;
 
470
  if (high_priority)
 
471
  {
 
472
    struct sched_param tmp_sched_param;
 
473
 
 
474
    memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
 
475
    tmp_sched_param.sched_priority= CONNECT_PRIOR;
 
476
    (void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
 
477
  }
 
478
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
 
479
  {
 
480
    if (start_lock)
 
481
      pthread_mutex_unlock(start_lock);
 
482
    return(ER_SLAVE_THREAD);
 
483
  }
 
484
  if (start_cond && cond_lock) // caller has cond_lock
 
485
  {
 
486
    Session* session = current_session;
 
487
    while (start_id == *slave_run_id)
 
488
    {
 
489
      const char* old_msg = session->enter_cond(start_cond,cond_lock,
 
490
                                            "Waiting for slave thread to start");
 
491
      pthread_cond_wait(start_cond,cond_lock);
 
492
      session->exit_cond(old_msg);
 
493
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
 
494
      if (session->killed)
 
495
        return(session->killed_errno());
 
496
    }
 
497
  }
 
498
  if (start_lock)
 
499
    pthread_mutex_unlock(start_lock);
 
500
  return(0);
 
501
}
 
502
 
 
503
 
 
504
/*
 
505
  start_slave_threads()
 
506
 
 
507
  NOTES
 
508
    SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
 
509
    sense to do that for starting a slave--we always care if it actually
 
510
    started the threads that were not previously running
 
511
*/
 
512
 
 
513
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
 
514
                        Master_info* mi, const char*, const char*,
 
515
                        int32_t thread_mask)
 
516
{
 
517
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
 
518
  pthread_cond_t* cond_io=0,*cond_sql=0;
 
519
  int32_t error=0;
 
520
 
 
521
  if (need_slave_mutex)
 
522
  {
 
523
    lock_io = &mi->run_lock;
 
524
    lock_sql = &mi->rli.run_lock;
 
525
  }
 
526
  if (wait_for_start)
 
527
  {
 
528
    cond_io = &mi->start_cond;
 
529
    cond_sql = &mi->rli.start_cond;
 
530
    lock_cond_io = &mi->run_lock;
 
531
    lock_cond_sql = &mi->rli.run_lock;
 
532
  }
 
533
 
 
534
  if (thread_mask & SLAVE_IO)
 
535
    error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
 
536
                              cond_io,
 
537
                              &mi->slave_running, &mi->slave_run_id,
 
538
                              mi, 1); //high priority, to read the most possible
 
539
  if (!error && (thread_mask & SLAVE_SQL))
 
540
  {
 
541
    error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
 
542
                              cond_sql,
 
543
                              &mi->rli.slave_running, &mi->rli.slave_run_id,
 
544
                              mi, 0);
 
545
    if (error)
 
546
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
 
547
  }
 
548
  return(error);
 
549
}
 
550
 
 
551
 
 
552
#ifdef NOT_USED_YET
 
553
static int32_t end_slave_on_walk(Master_info* mi, unsigned char* /*unused*/)
 
554
{
 
555
  end_master_info(mi);
 
556
  return(0);
 
557
}
 
558
#endif
 
559
 
 
560
 
 
561
/*
 
562
  Free all resources used by slave
 
563
 
 
564
  SYNOPSIS
 
565
    end_slave()
 
566
*/
 
567
 
 
568
void end_slave()
 
569
{
 
570
  /*
 
571
    This is called when the server terminates, in close_connections().
 
572
    It terminates slave threads. However, some CHANGE MASTER etc may still be
 
573
    running presently. If a START SLAVE was in progress, the mutex lock below
 
574
    will make us wait until slave threads have started, and START SLAVE
 
575
    returns, then we terminate them here.
 
576
  */
 
577
  pthread_mutex_lock(&LOCK_active_mi);
 
578
  if (active_mi)
 
579
  {
 
580
    /*
 
581
      TODO: replace the line below with
 
582
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
 
583
      once multi-master code is ready.
 
584
    */
 
585
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
 
586
    active_mi->end_master_info();
 
587
    delete active_mi;
 
588
    active_mi= 0;
 
589
  }
 
590
  pthread_mutex_unlock(&LOCK_active_mi);
 
591
  return;
 
592
}
 
593
 
 
594
 
 
595
static bool io_slave_killed(Session* session, Master_info* mi)
 
596
{
 
597
  assert(mi->io_session == session);
 
598
  assert(mi->slave_running); // tracking buffer overrun
 
599
  return(mi->abort_slave || abort_loop || session->killed);
 
600
}
 
601
 
 
602
 
 
603
static bool sql_slave_killed(Session* session, Relay_log_info* rli)
 
604
{
 
605
  assert(rli->sql_session == session);
 
606
  assert(rli->slave_running == 1);// tracking buffer overrun
 
607
  if (abort_loop || session->killed || rli->abort_slave)
 
608
  {
 
609
    /*
 
610
      If we are in an unsafe situation (stopping could corrupt replication),
 
611
      we give one minute to the slave SQL thread of grace before really
 
612
      terminating, in the hope that it will be able to read more events and
 
613
      the unsafe situation will soon be left. Note that this one minute starts
 
614
      from the last time anything happened in the slave SQL thread. So it's
 
615
      really one minute of idleness, we don't timeout if the slave SQL thread
 
616
      is actively working.
 
617
    */
 
618
    if (rli->last_event_start_time == 0)
 
619
      return(1);
 
620
    if (difftime(time(0), rli->last_event_start_time) > 60)
 
621
    {
 
622
      rli->report(ERROR_LEVEL, 0,
 
623
                  _("SQL thread had to stop in an unsafe situation, in "
 
624
                  "the middle of applying updates to a "
 
625
                  "non-transactional table without any primary key. "
 
626
                  "There is a risk of duplicate updates when the slave "
 
627
                  "SQL thread is restarted. Please check your tables' "
 
628
                  "contents after restart."));
 
629
      return(1);
 
630
    }
 
631
  }
 
632
  return(0);
 
633
}
 
634
 
 
635
 
 
636
/*
 
637
  skip_load_data_infile()
 
638
 
 
639
  NOTES
 
640
    This is used to tell a 3.23 master to break send_file()
 
641
*/
 
642
 
 
643
void skip_load_data_infile(NET *net)
 
644
{
 
645
  (void)net_request_file(net, "/dev/null");
 
646
  (void)my_net_read(net);                               // discard response
 
647
  (void)net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0); // ok
 
648
  return;
 
649
}
 
650
 
 
651
 
 
652
bool net_request_file(NET* net, const char* fname)
 
653
{
 
654
  return(net_write_command(net, 251, (unsigned char*) fname, strlen(fname),
 
655
                                (unsigned char*) "", 0));
 
656
}
 
657
 
 
658
/*
 
659
  From other comments and tests in code, it looks like
 
660
  sometimes Query_log_event and Load_log_event can have db == 0
 
661
  (see rewrite_db() above for example)
 
662
  (cases where this happens are unclear; it may be when the master is 3.23).
 
663
*/
 
664
 
 
665
const char *print_slave_db_safe(const char* db)
 
666
{
 
667
  return((db ? db : ""));
 
668
}
 
669
 
 
670
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
 
671
                                 const char *default_val)
 
672
{
 
673
  uint32_t length;
 
674
 
 
675
  if ((length=my_b_gets(f,var, max_size)))
 
676
  {
 
677
    char* last_p = var + length -1;
 
678
    if (*last_p == '\n')
 
679
      *last_p = 0; // if we stopped on newline, kill it
 
680
    else
 
681
    {
 
682
      /*
 
683
        If we truncated a line or stopped on last char, remove all chars
 
684
        up to and including newline.
 
685
      */
 
686
      int32_t c;
 
687
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
 
688
    }
 
689
    return(0);
 
690
  }
 
691
  else if (default_val)
 
692
  {
 
693
    strmake(var,  default_val, max_size-1);
 
694
    return(0);
 
695
  }
 
696
  return(1);
 
697
}
 
698
 
 
699
 
 
700
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
 
701
{
 
702
  char buf[32];
 
703
 
 
704
 
 
705
  if (my_b_gets(f, buf, sizeof(buf)))
 
706
  {
 
707
    *var = atoi(buf);
 
708
    return(0);
 
709
  }
 
710
  else if (default_val)
 
711
  {
 
712
    *var = default_val;
 
713
    return(0);
 
714
  }
 
715
  return(1);
 
716
}
 
717
 
 
718
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
 
719
{
 
720
  char buf[16];
 
721
 
 
722
 
 
723
  if (my_b_gets(f, buf, sizeof(buf)))
 
724
  {
 
725
    if (sscanf(buf, "%f", var) != 1)
 
726
      return(1);
 
727
    else
 
728
      return(0);
 
729
  }
 
730
  else if (default_val != 0.0)
 
731
  {
 
732
    *var = default_val;
 
733
    return(0);
 
734
  }
 
735
  return(1);
 
736
}
 
737
 
 
738
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info)
 
739
{
 
740
  if (io_slave_killed(session, mi))
 
741
  {
 
742
    if (info && global_system_variables.log_warnings)
 
743
      sql_print_information("%s",info);
 
744
    return true;
 
745
  }
 
746
  return false;
 
747
}
 
748
 
 
749
 
 
750
/*
 
751
  Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
 
752
  relying on the binlog's version. This is not perfect: imagine an upgrade
 
753
  of the master without waiting that all slaves are in sync with the master;
 
754
  then a slave could be fooled about the binlog's format. This is what happens
 
755
  when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
 
756
  slaves are fooled. So we do this only to distinguish between 3.23 and more
 
757
  recent masters (it's too late to change things for 3.23).
 
758
 
 
759
  RETURNS
 
760
  0       ok
 
761
  1       error
 
762
*/
 
763
 
 
764
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
 
765
{
 
766
  char error_buf[512];
 
767
  String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
 
768
  char err_buff[MAX_SLAVE_ERRMSG];
 
769
  const char* errmsg= 0;
 
770
  int32_t err_code= 0;
 
771
  DRIZZLE_RES *master_res= 0;
 
772
  DRIZZLE_ROW master_row;
 
773
 
 
774
  err_msg.length(0);
 
775
  /*
 
776
    Free old description_event_for_queue (that is needed if we are in
 
777
    a reconnection).
 
778
  */
 
779
  delete mi->rli.relay_log.description_event_for_queue;
 
780
  mi->rli.relay_log.description_event_for_queue= 0;
 
781
 
 
782
  if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
 
783
  {
 
784
    errmsg = _("Master reported unrecognized DRIZZLE version");
 
785
    err_code= ER_SLAVE_FATAL_ERROR;
 
786
    sprintf(err_buff, ER(err_code), errmsg);
 
787
    err_msg.append(err_buff);
 
788
  }
 
789
  else
 
790
  {
 
791
    /*
 
792
      Note the following switch will bug when we have DRIZZLE branch 30 ;)
 
793
    */
 
794
    switch (*drizzle->server_version)
 
795
    {
 
796
    case '0':
 
797
    case '1':
 
798
    case '2':
 
799
      errmsg = _("Master reported unrecognized DRIZZLE version");
 
800
      err_code= ER_SLAVE_FATAL_ERROR;
 
801
      sprintf(err_buff, ER(err_code), errmsg);
 
802
      err_msg.append(err_buff);
 
803
      break;
 
804
    case '3':
 
805
      mi->rli.relay_log.description_event_for_queue= new
 
806
        Format_description_log_event(1, drizzle->server_version);
 
807
      break;
 
808
    case '4':
 
809
      mi->rli.relay_log.description_event_for_queue= new
 
810
        Format_description_log_event(3, drizzle->server_version);
 
811
      break;
 
812
    default:
 
813
      /*
 
814
        Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
 
815
        take the early steps (like tests for "is this a 3.23 master") which we
 
816
        have to take before we receive the real master's Format_desc which will
 
817
        override this one. Note that the Format_desc we create below is garbage
 
818
        (it has the format of the *slave*); it's only good to help know if the
 
819
        master is 3.23, 4.0, etc.
 
820
      */
 
821
      mi->rli.relay_log.description_event_for_queue= new
 
822
        Format_description_log_event(4, drizzle->server_version);
 
823
      break;
 
824
    }
 
825
  }
 
826
 
 
827
  /*
 
828
     This does not mean that a 5.0 slave will be able to read a 6.0 master; but
 
829
     as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
 
830
     can't read a 6.0 master, this will show up when the slave can't read some
 
831
     events sent by the master, and there will be error messages.
 
832
  */
 
833
 
 
834
  if (err_msg.length() != 0)
 
835
    goto err;
 
836
 
 
837
  /* as we are here, we tried to allocate the event */
 
838
  if (!mi->rli.relay_log.description_event_for_queue)
 
839
  {
 
840
    errmsg= _("default Format_description_log_event");
 
841
    err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
 
842
    sprintf(err_buff, ER(err_code), errmsg);
 
843
    err_msg.append(err_buff);
 
844
    goto err;
 
845
  }
 
846
 
 
847
  /*
 
848
    Compare the master and slave's clock. Do not die if master's clock is
 
849
    unavailable (very old master not supporting UNIX_TIMESTAMP()?).
 
850
  */
 
851
 
 
852
  if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
 
853
      (master_res= drizzle_store_result(drizzle)) &&
 
854
      (master_row= drizzle_fetch_row(master_res)))
 
855
  {
 
856
    mi->clock_diff_with_master=
 
857
      (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
 
858
  }
 
859
  else if (!check_io_slave_killed(mi->io_session, mi, NULL))
 
860
  {
 
861
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
 
862
    sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
 
863
                        "do not trust column Seconds_Behind_Master of SHOW "
 
864
                        "SLAVE STATUS. Error: %s (%d)"),
 
865
                      drizzle_error(drizzle), drizzle_errno(drizzle));
 
866
  }
 
867
  if (master_res)
 
868
    drizzle_free_result(master_res);
 
869
 
 
870
  /*
 
871
    Check that the master's server id and ours are different. Because if they
 
872
    are equal (which can result from a simple copy of master's datadir to slave,
 
873
    thus copying some drizzle.cnf), replication will work but all events will be
 
874
    skipped.
 
875
    Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
 
876
    master?).
 
877
    Note: we could have put a @@SERVER_ID in the previous SELECT
 
878
    UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
 
879
  */
 
880
  if (!drizzle_real_query(drizzle,
 
881
                        STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
 
882
      (master_res= drizzle_store_result(drizzle)))
 
883
  {
 
884
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
885
        (::server_id == strtoul(master_row[1], 0, 10)) &&
 
886
        !mi->rli.replicate_same_server_id)
 
887
    {
 
888
      errmsg=
 
889
        _("The slave I/O thread stops because master and slave have equal "
 
890
          "DRIZZLE server ids; these ids must be different "
 
891
          "for replication to work (or "
 
892
          "the --replicate-same-server-id option must be used "
 
893
          "on slave but this does"
 
894
          "not always make sense; please check the manual before using it).");
 
895
      err_code= ER_SLAVE_FATAL_ERROR;
 
896
      sprintf(err_buff, ER(err_code), errmsg);
 
897
      err_msg.append(err_buff);
 
898
    }
 
899
    drizzle_free_result(master_res);
 
900
    if (errmsg)
 
901
      goto err;
 
902
  }
 
903
 
 
904
  /*
 
905
    Check that the master's global character_set_server and ours are the same.
 
906
    Not fatal if query fails (old master?).
 
907
    Note that we don't check for equality of global character_set_client and
 
908
    collation_connection (neither do we prevent their setting in
 
909
    set_var.cc). That's because from what I (Guilhem) have tested, the global
 
910
    values of these 2 are never used (new connections don't use them).
 
911
    We don't test equality of global collation_database either as it's is
 
912
    going to be deprecated (made read-only) in 4.1 very soon.
 
913
    The test is only relevant if master < 5.0.3 (we'll test only if it's older
 
914
    than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
 
915
    charset info in each binlog event.
 
916
    We don't do it for 3.23 because masters <3.23.50 hang on
 
917
    SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
 
918
    test only if master is 4.x.
 
919
  */
 
920
 
 
921
  /* redundant with rest of code but safer against later additions */
 
922
  if (*drizzle->server_version == '3')
 
923
    goto err;
 
924
 
 
925
  if ((*drizzle->server_version == '4') &&
 
926
      !drizzle_real_query(drizzle,
 
927
                        STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
 
928
      (master_res= drizzle_store_result(drizzle)))
 
929
  {
 
930
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
931
        strcmp(master_row[0], global_system_variables.collation_server->name))
 
932
    {
 
933
      errmsg=
 
934
        _("The slave I/O thread stops because master and slave have"
 
935
          " different values for the COLLATION_SERVER global variable."
 
936
          " The values must be equal for replication to work");
 
937
      err_code= ER_SLAVE_FATAL_ERROR;
 
938
      sprintf(err_buff, ER(err_code), errmsg);
 
939
      err_msg.append(err_buff);
 
940
    }
 
941
    drizzle_free_result(master_res);
 
942
    if (errmsg)
 
943
      goto err;
 
944
  }
 
945
 
 
946
  /*
 
947
    Perform analogous check for time zone. Theoretically we also should
 
948
    perform check here to verify that SYSTEM time zones are the same on
 
949
    slave and master, but we can't rely on value of @@system_time_zone
 
950
    variable (it is time zone abbreviation) since it determined at start
 
951
    time and so could differ for slave and master even if they are really
 
952
    in the same system time zone. So we are omiting this check and just
 
953
    relying on documentation. Also according to Monty there are many users
 
954
    who are using replication between servers in various time zones. Hence
 
955
    such check will broke everything for them. (And now everything will
 
956
    work for them because by default both their master and slave will have
 
957
    'SYSTEM' time zone).
 
958
    This check is only necessary for 4.x masters (and < 5.0.4 masters but
 
959
    those were alpha).
 
960
  */
 
961
  if ((*drizzle->server_version == '4') &&
 
962
      !drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
 
963
      (master_res= drizzle_store_result(drizzle)))
 
964
  {
 
965
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
966
        strcmp(master_row[0],
 
967
               global_system_variables.time_zone->get_name()->ptr()))
 
968
    {
 
969
      errmsg=
 
970
        _("The slave I/O thread stops because master and slave have"
 
971
          " different values for the TIME_ZONE global variable."
 
972
          " The values must be equal for replication to work");
 
973
      err_code= ER_SLAVE_FATAL_ERROR;
 
974
      sprintf(err_buff, ER(err_code), errmsg);
 
975
      err_msg.append(err_buff);
 
976
    }
 
977
    drizzle_free_result(master_res);
 
978
 
 
979
    if (errmsg)
 
980
      goto err;
 
981
  }
 
982
 
 
983
  if (mi->heartbeat_period != 0.0)
 
984
  {
 
985
    char llbuf[22];
 
986
    const char query_format[]= "SET @master_heartbeat_period= %s";
 
987
    char query[sizeof(query_format) - 2 + sizeof(llbuf)];
 
988
    /* 
 
989
       the period is an uint64_t of nano-secs. 
 
990
    */
 
991
    llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
 
992
    sprintf(query, query_format, llbuf);
 
993
 
 
994
    if (drizzle_real_query(drizzle, query, strlen(query))
 
995
        && !check_io_slave_killed(mi->io_session, mi, NULL))
 
996
    {
 
997
      err_msg.append("The slave I/O thread stops because querying master with '");
 
998
      err_msg.append(query);
 
999
      err_msg.append("' failed;");
 
1000
      err_msg.append(" error: ");
 
1001
      err_code= drizzle_errno(drizzle);
 
1002
      err_msg.qs_append(err_code);
 
1003
      err_msg.append("  '");
 
1004
      err_msg.append(drizzle_error(drizzle));
 
1005
      err_msg.append("'");
 
1006
      drizzle_free_result(drizzle_store_result(drizzle));
 
1007
      goto err;
 
1008
    }
 
1009
    drizzle_free_result(drizzle_store_result(drizzle));
 
1010
  }
 
1011
  
 
1012
err:
 
1013
  if (err_msg.length() != 0)
 
1014
  {
 
1015
    sql_print_error("%s",err_msg.ptr());
 
1016
    assert(err_code != 0);
 
1017
    mi->report(ERROR_LEVEL, err_code, "%s",err_msg.ptr());
 
1018
    return(1);
 
1019
  }
 
1020
 
 
1021
  return(0);
 
1022
}
 
1023
 
 
1024
 
 
1025
static bool wait_for_relay_log_space(Relay_log_info* rli)
 
1026
{
 
1027
  bool slave_killed=0;
 
1028
  Master_info* mi = rli->mi;
 
1029
  const char *save_proc_info;
 
1030
  Session* session = mi->io_session;
 
1031
 
 
1032
  pthread_mutex_lock(&rli->log_space_lock);
 
1033
  save_proc_info= session->enter_cond(&rli->log_space_cond,
 
1034
                                  &rli->log_space_lock,
 
1035
                                  _("Waiting for the slave SQL thread "
 
1036
                                    "to free enough relay log space"));
 
1037
  while (rli->log_space_limit < rli->log_space_total &&
 
1038
         !(slave_killed=io_slave_killed(session,mi)) &&
 
1039
         !rli->ignore_log_space_limit)
 
1040
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
 
1041
  session->exit_cond(save_proc_info);
 
1042
  return(slave_killed);
 
1043
}
 
1044
 
 
1045
 
 
1046
/*
 
1047
  Builds a Rotate from the ignored events' info and writes it to relay log.
 
1048
 
 
1049
  SYNOPSIS
 
1050
  write_ignored_events_info_to_relay_log()
 
1051
    session             pointer to I/O thread's session
 
1052
    mi
 
1053
 
 
1054
  DESCRIPTION
 
1055
    Slave I/O thread, going to die, must leave a durable trace of the
 
1056
    ignored events' end position for the use of the slave SQL thread, by
 
1057
    calling this function. Only that thread can call it (see assertion).
 
1058
 */
 
1059
static void write_ignored_events_info_to_relay_log(Session *session,
 
1060
                                                   Master_info *mi)
 
1061
{
 
1062
  Relay_log_info *rli= &mi->rli;
 
1063
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
 
1064
 
 
1065
  assert(session == mi->io_session);
 
1066
  pthread_mutex_lock(log_lock);
 
1067
  if (rli->ign_master_log_name_end[0])
 
1068
  {
 
1069
    Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
 
1070
                                               0, rli->ign_master_log_pos_end,
 
1071
                                               Rotate_log_event::DUP_NAME);
 
1072
    rli->ign_master_log_name_end[0]= 0;
 
1073
    /* can unlock before writing as slave SQL session will soon see our Rotate */
 
1074
    pthread_mutex_unlock(log_lock);
 
1075
    if (likely((bool)ev))
 
1076
    {
 
1077
      ev->server_id= 0; // don't be ignored by slave SQL thread
 
1078
      if (unlikely(rli->relay_log.append(ev)))
 
1079
        mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
1080
                   ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
1081
                   _("failed to write a Rotate event"
 
1082
                     " to the relay log, SHOW SLAVE STATUS may be"
 
1083
                     " inaccurate"));
 
1084
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
1085
      if (mi->flush())
 
1086
        sql_print_error(_("Failed to flush master info file"));
 
1087
      delete ev;
 
1088
    }
 
1089
    else
 
1090
      mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
 
1091
                 ER(ER_SLAVE_CREATE_EVENT_FAILURE),
 
1092
                 _("Rotate_event (out of memory?),"
 
1093
                   " SHOW SLAVE STATUS may be inaccurate"));
 
1094
  }
 
1095
  else
 
1096
    pthread_mutex_unlock(log_lock);
 
1097
  return;
 
1098
}
 
1099
 
 
1100
 
 
1101
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
 
1102
                             bool *suppress_warnings)
 
1103
{
 
1104
  unsigned char buf[1024], *pos= buf;
 
1105
  uint32_t report_host_len, report_user_len=0, report_password_len=0;
 
1106
 
 
1107
  *suppress_warnings= false;
 
1108
  if (!report_host)
 
1109
    return(0);
 
1110
  report_host_len= strlen(report_host);
 
1111
  /* 30 is a good safety margin */
 
1112
  if (report_host_len + report_user_len + report_password_len + 30 >
 
1113
      sizeof(buf))
 
1114
    return(0);                                     // safety
 
1115
 
 
1116
  int4store(pos, server_id); pos+= 4;
 
1117
  pos= net_store_data(pos, (unsigned char*) report_host, report_host_len);
 
1118
  pos= net_store_data(pos, NULL, report_user_len);
 
1119
  pos= net_store_data(pos, NULL, report_password_len);
 
1120
  int2store(pos, (uint16_t) report_port); pos+= 2;
 
1121
  int4store(pos, 0);    pos+= 4;
 
1122
  /* The master will fill in master_id */
 
1123
  int4store(pos, 0);                    pos+= 4;
 
1124
 
 
1125
  if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
 
1126
  {
 
1127
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1128
    {
 
1129
      *suppress_warnings= true;                 // Suppress reconnect warning
 
1130
    }
 
1131
    else if (!check_io_slave_killed(mi->io_session, mi, NULL))
 
1132
    {
 
1133
      char buf[256];
 
1134
      snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle), 
 
1135
               drizzle_errno(drizzle));
 
1136
      mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
 
1137
                 ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
 
1138
    }
 
1139
    return(1);
 
1140
  }
 
1141
  return(0);
 
1142
}
 
1143
 
 
1144
 
 
1145
bool show_master_info(Session* session, Master_info* mi)
 
1146
{
 
1147
  // TODO: fix this for multi-master
 
1148
  List<Item> field_list;
 
1149
  Protocol *protocol= session->protocol;
 
1150
 
 
1151
  field_list.push_back(new Item_empty_string("Slave_IO_State",
 
1152
                                                     14));
 
1153
  field_list.push_back(new Item_empty_string("Master_Host",
 
1154
                                                     sizeof(mi->host)));
 
1155
  field_list.push_back(new Item_empty_string("Master_User",
 
1156
                                                     sizeof(mi->user)));
 
1157
  field_list.push_back(new Item_return_int("Master_Port", 7,
 
1158
                                           DRIZZLE_TYPE_LONG));
 
1159
  field_list.push_back(new Item_return_int("Connect_Retry", 10,
 
1160
                                           DRIZZLE_TYPE_LONG));
 
1161
  field_list.push_back(new Item_empty_string("Master_Log_File",
 
1162
                                             FN_REFLEN));
 
1163
  field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
 
1164
                                           DRIZZLE_TYPE_LONGLONG));
 
1165
  field_list.push_back(new Item_empty_string("Relay_Log_File",
 
1166
                                             FN_REFLEN));
 
1167
  field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
 
1168
                                           DRIZZLE_TYPE_LONGLONG));
 
1169
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
 
1170
                                             FN_REFLEN));
 
1171
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
 
1172
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
 
1173
  field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
 
1174
  field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
 
1175
  field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
 
1176
  field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
 
1177
  field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
 
1178
  field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
 
1179
                                             28));
 
1180
  field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
 
1181
  field_list.push_back(new Item_empty_string("Last_Error", 20));
 
1182
  field_list.push_back(new Item_return_int("Skip_Counter", 10,
 
1183
                                           DRIZZLE_TYPE_LONG));
 
1184
  field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
 
1185
                                           DRIZZLE_TYPE_LONGLONG));
 
1186
  field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
 
1187
                                           DRIZZLE_TYPE_LONGLONG));
 
1188
  field_list.push_back(new Item_empty_string("Until_Condition", 6));
 
1189
  field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
 
1190
  field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
 
1191
                                           DRIZZLE_TYPE_LONGLONG));
 
1192
  field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
 
1193
                                           DRIZZLE_TYPE_LONGLONG));
 
1194
  field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
 
1195
  field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
 
1196
  field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
 
1197
  field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
 
1198
 
 
1199
  if (protocol->send_fields(&field_list,
 
1200
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1201
    return(true);
 
1202
 
 
1203
  if (mi->host[0])
 
1204
  {
 
1205
    String *packet= &session->packet;
 
1206
    protocol->prepare_for_resend();
 
1207
 
 
1208
    /*
 
1209
      slave_running can be accessed without run_lock but not other
 
1210
      non-volotile members like mi->io_session, which is guarded by the mutex.
 
1211
    */
 
1212
    pthread_mutex_lock(&mi->run_lock);
 
1213
    protocol->store(mi->io_session ? mi->io_session->get_proc_info() : "", &my_charset_bin);
 
1214
    pthread_mutex_unlock(&mi->run_lock);
 
1215
 
 
1216
    pthread_mutex_lock(&mi->data_lock);
 
1217
    pthread_mutex_lock(&mi->rli.data_lock);
 
1218
    protocol->store(mi->getHostname(), &my_charset_bin);
 
1219
    protocol->store(mi->getUsername(), &my_charset_bin);
 
1220
    protocol->store((uint32_t) mi->getPort());
 
1221
    protocol->store(mi->getConnectionRetry());
 
1222
    protocol->store(mi->getLogName(), &my_charset_bin);
 
1223
    protocol->store((uint64_t) mi->getLogPosition());
 
1224
    protocol->store(mi->rli.group_relay_log_name.c_str() +
 
1225
                    dirname_length(mi->rli.group_relay_log_name.c_str()),
 
1226
                    &my_charset_bin);
 
1227
    protocol->store((uint64_t) mi->rli.group_relay_log_pos);
 
1228
    protocol->store(mi->rli.group_master_log_name.c_str(), &my_charset_bin);
 
1229
    protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
 
1230
                    "Yes" : "No", &my_charset_bin);
 
1231
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
 
1232
    protocol->store(rpl_filter->get_do_db());
 
1233
    protocol->store(rpl_filter->get_ignore_db());
 
1234
 
 
1235
    char buf[256];
 
1236
    String tmp(buf, sizeof(buf), &my_charset_bin);
 
1237
    rpl_filter->get_do_table(&tmp);
 
1238
    protocol->store(&tmp);
 
1239
    rpl_filter->get_ignore_table(&tmp);
 
1240
    protocol->store(&tmp);
 
1241
    rpl_filter->get_wild_do_table(&tmp);
 
1242
    protocol->store(&tmp);
 
1243
    rpl_filter->get_wild_ignore_table(&tmp);
 
1244
    protocol->store(&tmp);
 
1245
 
 
1246
    protocol->store(mi->rli.last_error().number);
 
1247
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
 
1248
    protocol->store((uint32_t) mi->rli.slave_skip_counter);
 
1249
    protocol->store((uint64_t) mi->rli.group_master_log_pos);
 
1250
    protocol->store((uint64_t) mi->rli.log_space_total);
 
1251
 
 
1252
    protocol->store(
 
1253
      mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
 
1254
        ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
 
1255
          "Relay"), &my_charset_bin);
 
1256
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
 
1257
    protocol->store((uint64_t) mi->rli.until_log_pos);
 
1258
 
 
1259
    /*
 
1260
      Seconds_Behind_Master: if SQL thread is running and I/O thread is
 
1261
      connected, we can compute it otherwise show NULL (i.e. unknown).
 
1262
    */
 
1263
    if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
 
1264
        mi->rli.slave_running)
 
1265
    {
 
1266
      long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
 
1267
                       - mi->clock_diff_with_master);
 
1268
      /*
 
1269
        Apparently on some systems time_diff can be <0. Here are possible
 
1270
        reasons related to MySQL:
 
1271
        - the master is itself a slave of another master whose time is ahead.
 
1272
        - somebody used an explicit SET TIMESTAMP on the master.
 
1273
        Possible reason related to granularity-to-second of time functions
 
1274
        (nothing to do with MySQL), which can explain a value of -1:
 
1275
        assume the master's and slave's time are perfectly synchronized, and
 
1276
        that at slave's connection time, when the master's timestamp is read,
 
1277
        it is at the very end of second 1, and (a very short time later) when
 
1278
        the slave's timestamp is read it is at the very beginning of second
 
1279
        2. Then the recorded value for master is 1 and the recorded value for
 
1280
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
 
1281
        between timestamp of slave and rli->last_master_timestamp is 0
 
1282
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
 
1283
        This confuses users, so we don't go below 0: hence the cmax().
 
1284
 
 
1285
        last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
 
1286
        special marker to say "consider we have caught up".
 
1287
      */
 
1288
      protocol->store((int64_t)(mi->rli.last_master_timestamp ?
 
1289
                                 cmax((long)0, time_diff) : 0));
 
1290
    }
 
1291
    else
 
1292
    {
 
1293
      protocol->store_null();
 
1294
    }
 
1295
 
 
1296
    // Last_IO_Errno
 
1297
    protocol->store(mi->last_error().number);
 
1298
    // Last_IO_Error
 
1299
    protocol->store(mi->last_error().message, &my_charset_bin);
 
1300
    // Last_SQL_Errno
 
1301
    protocol->store(mi->rli.last_error().number);
 
1302
    // Last_SQL_Error
 
1303
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
 
1304
 
 
1305
    pthread_mutex_unlock(&mi->rli.data_lock);
 
1306
    pthread_mutex_unlock(&mi->data_lock);
 
1307
 
 
1308
    if (my_net_write(&session->net, (unsigned char*) session->packet.ptr(), packet->length()))
 
1309
      return(true);
 
1310
  }
 
1311
  my_eof(session);
 
1312
  return(false);
 
1313
}
 
1314
 
 
1315
 
 
1316
void set_slave_thread_options(Session* session)
 
1317
{
 
1318
  /*
 
1319
     It's nonsense to constrain the slave threads with max_join_size; if a
 
1320
     query succeeded on master, we HAVE to execute it. So set
 
1321
     OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
 
1322
     (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
 
1323
     SELECT examining more than 4 billion rows would still fail (yes, because
 
1324
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
 
1325
     only for client threads.
 
1326
  */
 
1327
  uint64_t options= session->options | OPTION_BIG_SELECTS;
 
1328
  if (opt_log_slave_updates)
 
1329
    options|= OPTION_BIN_LOG;
 
1330
  else
 
1331
    options&= ~OPTION_BIN_LOG;
 
1332
  session->options= options;
 
1333
  session->variables.completion_type= 0;
 
1334
  return;
 
1335
}
 
1336
 
 
1337
/*
 
1338
  init_slave_thread()
 
1339
*/
 
1340
 
 
1341
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type)
 
1342
{
 
1343
  int32_t simulate_error= 0;
 
1344
  session->system_thread = (session_type == SLAVE_Session_SQL) ?
 
1345
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
 
1346
  session->security_ctx->skip_grants();
 
1347
  my_net_init(&session->net, 0);
 
1348
/*
 
1349
  Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
 
1350
  slave threads, since a replication event can become this much larger
 
1351
  than the corresponding packet (query) sent from client to master.
 
1352
*/
 
1353
  session->variables.max_allowed_packet= global_system_variables.max_allowed_packet
 
1354
    + MAX_LOG_EVENT_HEADER;  /* note, incr over the global not session var */
 
1355
  session->slave_thread = 1;
 
1356
  set_slave_thread_options(session);
 
1357
  session->client_capabilities = CLIENT_LOCAL_FILES;
 
1358
  pthread_mutex_lock(&LOCK_thread_count);
 
1359
  session->thread_id= session->variables.pseudo_thread_id= thread_id++;
 
1360
  pthread_mutex_unlock(&LOCK_thread_count);
 
1361
 
 
1362
 simulate_error|= (1 << SLAVE_Session_IO);
 
1363
 simulate_error|= (1 << SLAVE_Session_SQL);
 
1364
  if (init_thr_lock() || session->store_globals() || simulate_error & (1<< session_type))
 
1365
  {
 
1366
    session->cleanup();
 
1367
    return(-1);
 
1368
  }
 
1369
  lex_start(session);
 
1370
 
 
1371
  if (session_type == SLAVE_Session_SQL)
 
1372
    session->set_proc_info("Waiting for the next event in relay log");
 
1373
  else
 
1374
    session->set_proc_info("Waiting for master update");
 
1375
  session->version=refresh_version;
 
1376
  session->set_time();
 
1377
  return(0);
 
1378
}
 
1379
 
 
1380
 
 
1381
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
1382
                      void* thread_killed_arg)
 
1383
{
 
1384
  int32_t nap_time;
 
1385
  thr_alarm_t alarmed;
 
1386
 
 
1387
  thr_alarm_init(&alarmed);
 
1388
  time_t start_time= my_time(0);
 
1389
  time_t end_time= start_time+sec;
 
1390
 
 
1391
  while ((nap_time= (int32_t) (end_time - start_time)) > 0)
 
1392
  {
 
1393
    ALARM alarm_buff;
 
1394
    /*
 
1395
      The only reason we are asking for alarm is so that
 
1396
      we will be woken up in case of murder, so if we do not get killed,
 
1397
      set the alarm so it goes off after we wake up naturally
 
1398
    */
 
1399
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
 
1400
    sleep(nap_time);
 
1401
    thr_end_alarm(&alarmed);
 
1402
 
 
1403
    if ((*thread_killed)(session,thread_killed_arg))
 
1404
      return(1);
 
1405
    start_time= my_time(0);
 
1406
  }
 
1407
  return(0);
 
1408
}
 
1409
 
 
1410
 
 
1411
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
 
1412
                        bool *suppress_warnings)
 
1413
{
 
1414
  unsigned char buf[FN_REFLEN + 10];
 
1415
  int32_t len;
 
1416
  int32_t binlog_flags = 0; // for now
 
1417
  const char* logname = mi->getLogName();
 
1418
  
 
1419
  *suppress_warnings= false;
 
1420
 
 
1421
  // TODO if big log files: Change next to int8store()
 
1422
  int4store(buf, (uint32_t) mi->getLogPosition());
 
1423
  int2store(buf + 4, binlog_flags);
 
1424
  int4store(buf + 6, server_id);
 
1425
  len = (uint32_t) strlen(logname);
 
1426
  memcpy(buf + 10, logname,len);
 
1427
  if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
 
1428
  {
 
1429
    /*
 
1430
      Something went wrong, so we will just reconnect and retry later
 
1431
      in the future, we should do a better error analysis, but for
 
1432
      now we just fill up the error log :-)
 
1433
    */
 
1434
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1435
      *suppress_warnings= true;                 // Suppress reconnect warning
 
1436
    else
 
1437
      sql_print_error(_("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs"),
 
1438
                      drizzle_errno(drizzle), drizzle_error(drizzle),
 
1439
                      mi->connect_retry);
 
1440
    return(1);
 
1441
  }
 
1442
 
 
1443
  return(0);
 
1444
}
 
1445
 
 
1446
/*
 
1447
  Read one event from the master
 
1448
 
 
1449
  SYNOPSIS
 
1450
    read_event()
 
1451
    DRIZZLE               DRIZZLE connection
 
1452
    mi                  Master connection information
 
1453
    suppress_warnings   TRUE when a normal net read timeout has caused us to
 
1454
                        try a reconnect.  We do not want to print anything to
 
1455
                        the error log in this case because this a anormal
 
1456
                        event in an idle server.
 
1457
 
 
1458
    RETURN VALUES
 
1459
    'packet_error'      Error
 
1460
    number              Length of packet
 
1461
*/
 
1462
 
 
1463
static uint32_t read_event(DRIZZLE *drizzle,
 
1464
                           Master_info *mi,
 
1465
                           bool* suppress_warnings)
 
1466
{
 
1467
  uint32_t len;
 
1468
 
 
1469
  *suppress_warnings= false;
 
1470
  /*
 
1471
    my_real_read() will time us out
 
1472
    We check if we were told to die, and if not, try reading again
 
1473
  */
 
1474
  if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
 
1475
    return(packet_error);
 
1476
 
 
1477
  len = cli_safe_read(drizzle);
 
1478
  if (len == packet_error || (int32_t) len < 1)
 
1479
  {
 
1480
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1481
    {
 
1482
      /*
 
1483
        We are trying a normal reconnect after a read timeout;
 
1484
        we suppress prints to .err file as long as the reconnect
 
1485
        happens without problems
 
1486
      */
 
1487
      *suppress_warnings= true;
 
1488
    }
 
1489
    else
 
1490
      sql_print_error(_("Error reading packet from server: %s ( server_errno=%d)"),
 
1491
                      drizzle_error(drizzle), drizzle_errno(drizzle));
 
1492
    return(packet_error);
 
1493
  }
 
1494
 
 
1495
  /* Check if eof packet */
 
1496
  if (len < 8 && drizzle->net.read_pos[0] == 254)
 
1497
  {
 
1498
    sql_print_information(_("Slave: received end packet from server, apparent "
 
1499
                            "master shutdown: %s"),
 
1500
                     drizzle_error(drizzle));
 
1501
     return(packet_error);
 
1502
  }
 
1503
 
 
1504
  return(len - 1);
 
1505
}
 
1506
 
 
1507
 
 
1508
int32_t check_expected_error(Session*, Relay_log_info const *,
 
1509
                             int32_t expected_error)
 
1510
{
 
1511
  switch (expected_error) {
 
1512
  case ER_NET_READ_ERROR:
 
1513
  case ER_NET_ERROR_ON_WRITE:
 
1514
  case ER_QUERY_INTERRUPTED:
 
1515
  case ER_SERVER_SHUTDOWN:
 
1516
  case ER_NEW_ABORTING_CONNECTION:
 
1517
    return(1);
 
1518
  default:
 
1519
    return(0);
 
1520
  }
 
1521
}
 
1522
 
 
1523
 
 
1524
/*
 
1525
  Check if the current error is of temporary nature of not.
 
1526
  Some errors are temporary in nature, such as
 
1527
  ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT.  Ndb also signals
 
1528
  that the error is temporary by pushing a warning with the error code
 
1529
  ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
 
1530
*/
 
1531
static int32_t has_temporary_error(Session *session)
 
1532
{
 
1533
  if (session->is_fatal_error)
 
1534
    return(0);
 
1535
 
 
1536
  if (session->main_da.is_error())
 
1537
  {
 
1538
    session->clear_error();
 
1539
    my_error(ER_LOCK_DEADLOCK, MYF(0));
 
1540
  }
 
1541
 
 
1542
  /*
 
1543
    If there is no message in Session, we can't say if it's a temporary
 
1544
    error or not. This is currently the case for Incident_log_event,
 
1545
    which sets no message. Return FALSE.
 
1546
  */
 
1547
  if (!session->is_error())
 
1548
    return(0);
 
1549
 
 
1550
  /*
 
1551
    Temporary error codes:
 
1552
    currently, InnoDB deadlock detected by InnoDB or lock
 
1553
    wait timeout (innodb_lock_wait_timeout exceeded
 
1554
  */
 
1555
  if (session->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
 
1556
      session->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
 
1557
    return(1);
 
1558
 
 
1559
  return(0);
 
1560
}
 
1561
 
 
1562
 
 
1563
/**
 
1564
  Applies the given event and advances the relay log position.
 
1565
 
 
1566
  In essence, this function does:
 
1567
 
 
1568
  @code
 
1569
    ev->apply_event(rli);
 
1570
    ev->update_pos(rli);
 
1571
  @endcode
 
1572
 
 
1573
  But it also does some maintainance, such as skipping events if
 
1574
  needed and reporting errors.
 
1575
 
 
1576
  If the @c skip flag is set, then it is tested whether the event
 
1577
  should be skipped, by looking at the slave_skip_counter and the
 
1578
  server id.  The skip flag should be set when calling this from a
 
1579
  replication thread but not set when executing an explicit BINLOG
 
1580
  statement.
 
1581
 
 
1582
  @retval 0 OK.
 
1583
 
 
1584
  @retval 1 Error calling ev->apply_event().
 
1585
 
 
1586
  @retval 2 No error calling ev->apply_event(), but error calling
 
1587
  ev->update_pos().
 
1588
*/
 
1589
int32_t apply_event_and_update_pos(Log_event* ev, Session* session, Relay_log_info* rli,
 
1590
                               bool skip)
 
1591
{
 
1592
  int32_t exec_res= 0;
 
1593
 
 
1594
  /*
 
1595
    Execute the event to change the database and update the binary
 
1596
    log coordinates, but first we set some data that is needed for
 
1597
    the thread.
 
1598
 
 
1599
    The event will be executed unless it is supposed to be skipped.
 
1600
 
 
1601
    Queries originating from this server must be skipped.  Low-level
 
1602
    events (Format_description_log_event, Rotate_log_event,
 
1603
    Stop_log_event) from this server must also be skipped. But for
 
1604
    those we don't want to modify 'group_master_log_pos', because
 
1605
    these events did not exist on the master.
 
1606
    Format_description_log_event is not completely skipped.
 
1607
 
 
1608
    Skip queries specified by the user in 'slave_skip_counter'.  We
 
1609
    can't however skip events that has something to do with the log
 
1610
    files themselves.
 
1611
 
 
1612
    Filtering on own server id is extremely important, to ignore
 
1613
    execution of events created by the creation/rotation of the relay
 
1614
    log (remember that now the relay log starts with its Format_desc,
 
1615
    has a Rotate etc).
 
1616
  */
 
1617
 
 
1618
  session->server_id = ev->server_id; // use the original server id for logging
 
1619
  session->set_time();                            // time the query
 
1620
  session->lex->current_select= 0;
 
1621
  if (!ev->when)
 
1622
    ev->when= my_time(0);
 
1623
  ev->session = session; // because up to this point, ev->session == 0
 
1624
 
 
1625
  if (skip)
 
1626
  {
 
1627
    int32_t reason= ev->shall_skip(rli);
 
1628
    if (reason == Log_event::EVENT_SKIP_COUNT)
 
1629
      --rli->slave_skip_counter;
 
1630
    pthread_mutex_unlock(&rli->data_lock);
 
1631
    if (reason == Log_event::EVENT_SKIP_NOT)
 
1632
      exec_res= ev->apply_event(rli);
 
1633
  }
 
1634
  else
 
1635
    exec_res= ev->apply_event(rli);
 
1636
 
 
1637
  if (exec_res == 0)
 
1638
  {
 
1639
    int32_t error= ev->update_pos(rli);
 
1640
    /*
 
1641
      The update should not fail, so print an error message and
 
1642
      return an error code.
 
1643
 
 
1644
      TODO: Replace this with a decent error message when merged
 
1645
      with BUG#24954 (which adds several new error message).
 
1646
    */
 
1647
    if (error)
 
1648
    {
 
1649
      char buf[22];
 
1650
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
 
1651
                  _("It was not possible to update the positions"
 
1652
                  " of the relay log information: the slave may"
 
1653
                  " be in an inconsistent state."
 
1654
                  " Stopped in %s position %s"),
 
1655
                  rli->group_relay_log_name.c_str(),
 
1656
                  llstr(rli->group_relay_log_pos, buf));
 
1657
      return(2);
 
1658
    }
 
1659
  }
 
1660
 
 
1661
  return(exec_res ? 1 : 0);
 
1662
}
 
1663
 
 
1664
 
 
1665
/**
 
1666
  Top-level function for executing the next event from the relay log.
 
1667
 
 
1668
  This function reads the event from the relay log, executes it, and
 
1669
  advances the relay log position.  It also handles errors, etc.
 
1670
 
 
1671
  This function may fail to apply the event for the following reasons:
 
1672
 
 
1673
   - The position specfied by the UNTIL condition of the START SLAVE
 
1674
     command is reached.
 
1675
 
 
1676
   - It was not possible to read the event from the log.
 
1677
 
 
1678
   - The slave is killed.
 
1679
 
 
1680
   - An error occurred when applying the event, and the event has been
 
1681
     tried slave_trans_retries times.  If the event has been retried
 
1682
     fewer times, 0 is returned.
 
1683
 
 
1684
   - init_master_info or init_relay_log_pos failed. (These are called
 
1685
     if a failure occurs when applying the event.)</li>
 
1686
 
 
1687
   - An error occurred when updating the binlog position.
 
1688
 
 
1689
  @retval 0 The event was applied.
 
1690
 
 
1691
  @retval 1 The event was not applied.
 
1692
*/
 
1693
static int32_t exec_relay_log_event(Session* session, Relay_log_info* rli)
 
1694
{
 
1695
  /*
 
1696
     We acquire this mutex since we need it for all operations except
 
1697
     event execution. But we will release it in places where we will
 
1698
     wait for something for example inside of next_event().
 
1699
   */
 
1700
  pthread_mutex_lock(&rli->data_lock);
 
1701
 
 
1702
  Log_event * ev = next_event(rli);
 
1703
 
 
1704
  assert(rli->sql_session==session);
 
1705
 
 
1706
  if (sql_slave_killed(session,rli))
 
1707
  {
 
1708
    pthread_mutex_unlock(&rli->data_lock);
 
1709
    delete ev;
 
1710
    return(1);
 
1711
  }
 
1712
  if (ev)
 
1713
  {
 
1714
    int32_t exec_res;
 
1715
 
 
1716
    /*
 
1717
      This tests if the position of the beginning of the current event
 
1718
      hits the UNTIL barrier.
 
1719
    */
 
1720
    if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
 
1721
        rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
 
1722
                                rli->group_master_log_pos :
 
1723
                                ev->log_pos - ev->data_written))
 
1724
    {
 
1725
      char buf[22];
 
1726
      sql_print_information(_("Slave SQL thread stopped because it reached its"
 
1727
                              " UNTIL position %s"),
 
1728
                            llstr(rli->until_pos(), buf));
 
1729
      /*
 
1730
        Setting abort_slave flag because we do not want additional message about
 
1731
        error in query execution to be printed.
 
1732
      */
 
1733
      rli->abort_slave= 1;
 
1734
      pthread_mutex_unlock(&rli->data_lock);
 
1735
      delete ev;
 
1736
      return(1);
 
1737
    }
 
1738
    exec_res= apply_event_and_update_pos(ev, session, rli, true);
 
1739
 
 
1740
    /*
 
1741
      Format_description_log_event should not be deleted because it will be
 
1742
      used to read info about the relay log's format; it will be deleted when
 
1743
      the SQL thread does not need it, i.e. when this thread terminates.
 
1744
    */
 
1745
    if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
 
1746
    {
 
1747
      delete ev;
 
1748
    }
 
1749
 
 
1750
    /*
 
1751
      update_log_pos failed: this should not happen, so we don't
 
1752
      retry.
 
1753
    */
 
1754
    if (exec_res == 2)
 
1755
      return(1);
 
1756
 
 
1757
    if (slave_trans_retries)
 
1758
    {
 
1759
      int32_t temp_err= 0;
 
1760
      if (exec_res && (temp_err= has_temporary_error(session)))
 
1761
      {
 
1762
        const char *errmsg;
 
1763
        /*
 
1764
          We were in a transaction which has been rolled back because of a
 
1765
          temporary error;
 
1766
          let's seek back to BEGIN log event and retry it all again.
 
1767
          Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
 
1768
          there is no rollback since 5.0.13 (ref: manual).
 
1769
          We have to not only seek but also
 
1770
          a) init_master_info(), to seek back to hot relay log's start for later
 
1771
          (for when we will come back to this hot log after re-processing the
 
1772
          possibly existing old logs where BEGIN is: check_binlog_magic() will
 
1773
          then need the cache to be at position 0 (see comments at beginning of
 
1774
          init_master_info()).
 
1775
          b) init_relay_log_pos(), because the BEGIN may be an older relay log.
 
1776
        */
 
1777
        if (rli->trans_retries < slave_trans_retries)
 
1778
        {
 
1779
          if (rli->mi->init_master_info(0, 0, SLAVE_SQL))
 
1780
            sql_print_error(_("Failed to initialize the master info structure"));
 
1781
          else if (init_relay_log_pos(rli,
 
1782
                                      rli->group_relay_log_name.c_str(),
 
1783
                                      rli->group_relay_log_pos,
 
1784
                                      1, &errmsg, 1))
 
1785
            sql_print_error(_("Error initializing relay log position: %s"),
 
1786
                            errmsg);
 
1787
          else
 
1788
          {
 
1789
            exec_res= 0;
 
1790
            end_trans(session, ROLLBACK);
 
1791
            /* chance for concurrent connection to get more locks */
 
1792
            safe_sleep(session, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
 
1793
                       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
 
1794
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
 
1795
            rli->trans_retries++;
 
1796
            rli->retried_trans++;
 
1797
            pthread_mutex_unlock(&rli->data_lock);
 
1798
          }
 
1799
        }
 
1800
        else
 
1801
          sql_print_error(_("Slave SQL thread retried transaction %lu time(s) "
 
1802
                            "in vain, giving up. Consider raising the value of "
 
1803
                            "the slave_transaction_retries variable."),
 
1804
                          slave_trans_retries);
 
1805
      }
 
1806
      else if ((exec_res && !temp_err) ||
 
1807
               (opt_using_transactions &&
 
1808
                rli->group_relay_log_pos == rli->event_relay_log_pos))
 
1809
      {
 
1810
        /*
 
1811
          Only reset the retry counter if the entire group succeeded
 
1812
          or failed with a non-transient error.  On a successful
 
1813
          event, the execution will proceed as usual; in the case of a
 
1814
          non-transient error, the slave will stop with an error.
 
1815
         */
 
1816
        rli->trans_retries= 0; // restart from fresh
 
1817
      }
 
1818
    }
 
1819
    return(exec_res);
 
1820
  }
 
1821
  pthread_mutex_unlock(&rli->data_lock);
 
1822
  rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
 
1823
              ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
 
1824
              _("Could not parse relay log event entry. The possible reasons "
 
1825
                "are: the master's binary log is corrupted (you can check this "
 
1826
                "by running 'mysqlbinlog' on the binary log), the slave's "
 
1827
                "relay log is corrupted (you can check this by running "
 
1828
                "'mysqlbinlog' on the relay log), a network problem, or a bug "
 
1829
                "in the master's or slave's DRIZZLE code. If you want to check "
 
1830
                "the master's binary log or slave's relay log, you will be "
 
1831
                "able to know their names by issuing 'SHOW SLAVE STATUS' "
 
1832
                "on this slave."));
 
1833
  return(1);
 
1834
}
 
1835
 
 
1836
 
 
1837
/**
 
1838
  @brief Try to reconnect slave IO thread.
 
1839
 
 
1840
  @details Terminates current connection to master, sleeps for
 
1841
  @c mi->connect_retry msecs and initiates new connection with
 
1842
  @c safe_reconnect(). Variable pointed by @c retry_count is increased -
 
1843
  if it exceeds @c master_retry_count then connection is not re-established
 
1844
  and function signals error.
 
1845
  Unless @c suppres_warnings is TRUE, a warning is put in the server error log
 
1846
  when reconnecting. The warning message and messages used to report errors
 
1847
  are taken from @c messages array. In case @c master_retry_count is exceeded,
 
1848
  no messages are added to the log.
 
1849
 
 
1850
  @param[in]     session                 Thread context.
 
1851
  @param[in]     DRIZZLE               DRIZZLE connection.
 
1852
  @param[in]     mi                  Master connection information.
 
1853
  @param[in,out] retry_count         Number of attempts to reconnect.
 
1854
  @param[in]     suppress_warnings   TRUE when a normal net read timeout 
 
1855
                                     has caused to reconnecting.
 
1856
  @param[in]     messages            Messages to print/log, see 
 
1857
                                     reconnect_messages[] array.
 
1858
 
 
1859
  @retval        0                   OK.
 
1860
  @retval        1                   There was an error.
 
1861
*/
 
1862
 
 
1863
static int32_t try_to_reconnect(Session *session, DRIZZLE *drizzle, Master_info *mi,
 
1864
                                uint32_t *retry_count, bool suppress_warnings,
 
1865
                                const char *messages[SLAVE_RECON_MSG_MAX])
 
1866
{
 
1867
  mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
 
1868
  session->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
 
1869
  drizzle_disconnect(drizzle);
 
1870
  if ((*retry_count)++)
 
1871
  {
 
1872
    if (*retry_count > master_retry_count)
 
1873
      return 1;                             // Don't retry forever
 
1874
    safe_sleep(session, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
 
1875
               (void *) mi);
 
1876
  }
 
1877
  if (check_io_slave_killed(session, mi,
 
1878
                            _(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
 
1879
    return 1;
 
1880
  session->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
 
1881
  if (!suppress_warnings)
 
1882
  {
 
1883
    char buf[256], llbuff[22];
 
1884
    snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
 
1885
             IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
 
1886
    /*
 
1887
      Raise a warining during registering on master/requesting dump.
 
1888
      Log a message reading event.
 
1889
    */
 
1890
    if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
 
1891
    {
 
1892
      mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
 
1893
                 ER(ER_SLAVE_MASTER_COM_FAILURE),
 
1894
                 _(messages[SLAVE_RECON_MSG_COMMAND]), buf);
 
1895
    }
 
1896
    else
 
1897
    {
 
1898
      sql_print_information("%s",buf);
 
1899
    }
 
1900
  }
 
1901
  if (safe_reconnect(session, drizzle, mi, 1) || io_slave_killed(session, mi))
 
1902
  {
 
1903
    if (global_system_variables.log_warnings)
 
1904
      sql_print_information("%s",_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
 
1905
    return 1;
 
1906
  }
 
1907
  return 0;
 
1908
}
 
1909
 
 
1910
 
 
1911
/* Slave I/O Thread entry point */
 
1912
 
 
1913
pthread_handler_t handle_slave_io(void *arg)
 
1914
{
 
1915
  Session *session; // needs to be first for thread_stack
 
1916
  DRIZZLE *drizzle;
 
1917
  Master_info *mi = (Master_info*)arg;
 
1918
  Relay_log_info *rli= &mi->rli;
 
1919
  char llbuff[22];
 
1920
  uint32_t retry_count;
 
1921
  bool suppress_warnings;
 
1922
  uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
 
1923
  my_thread_init();
 
1924
 
 
1925
  assert(mi->inited);
 
1926
  drizzle= NULL ;
 
1927
  retry_count= 0;
 
1928
 
 
1929
  pthread_mutex_lock(&mi->run_lock);
 
1930
  /* Inform waiting threads that slave has started */
 
1931
  mi->slave_run_id++;
 
1932
 
 
1933
  mi->events_till_disconnect = disconnect_slave_event_count;
 
1934
 
 
1935
  session= new Session;
 
1936
  Session_CHECK_SENTRY(session);
 
1937
  mi->io_session = session;
 
1938
 
 
1939
  pthread_detach_this_thread();
 
1940
  session->thread_stack= (char*) &session; // remember where our stack is
 
1941
  if (init_slave_thread(session, SLAVE_Session_IO))
 
1942
  {
 
1943
    pthread_cond_broadcast(&mi->start_cond);
 
1944
    pthread_mutex_unlock(&mi->run_lock);
 
1945
    sql_print_error(_("Failed during slave I/O thread initialization"));
 
1946
    goto err;
 
1947
  }
 
1948
  pthread_mutex_lock(&LOCK_thread_count);
 
1949
  threads.append(session);
 
1950
  pthread_mutex_unlock(&LOCK_thread_count);
 
1951
  mi->slave_running = 1;
 
1952
  mi->abort_slave = 0;
 
1953
  pthread_mutex_unlock(&mi->run_lock);
 
1954
  pthread_cond_broadcast(&mi->start_cond);
 
1955
 
 
1956
  if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
 
1957
  {
 
1958
    mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
 
1959
               ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
 
1960
    goto err;
 
1961
  }
 
1962
 
 
1963
  session->set_proc_info("Connecting to master");
 
1964
  // we can get killed during safe_connect
 
1965
  if (!safe_connect(session, drizzle, mi))
 
1966
  {
 
1967
    sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
 
1968
                            "replication started in log '%s' at position %s"),
 
1969
                          mi->getUsername(), mi->getHostname(), mi->getPort(),
 
1970
                          IO_RPL_LOG_NAME,
 
1971
                          llstr(mi->getLogPosition(), llbuff));
 
1972
  /*
 
1973
    Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
 
1974
    thread, since a replication event can become this much larger than
 
1975
    the corresponding packet (query) sent from client to master.
 
1976
  */
 
1977
    drizzle->net.max_packet_size= session->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
 
1978
  }
 
1979
  else
 
1980
  {
 
1981
    sql_print_information(_("Slave I/O thread killed while connecting to master"));
 
1982
    goto err;
 
1983
  }
 
1984
 
 
1985
connected:
 
1986
 
 
1987
  // TODO: the assignment below should be under mutex (5.0)
 
1988
  mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
 
1989
  session->slave_net = &drizzle->net;
 
1990
  session->set_proc_info("Checking master version");
 
1991
  if (get_master_version_and_clock(drizzle, mi))
 
1992
    goto err;
 
1993
  
 
1994
  if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
 
1995
  {
 
1996
    /*
 
1997
      Register ourselves with the master.
 
1998
    */
 
1999
    session->set_proc_info("Registering slave on master");
 
2000
    if (register_slave_on_master(drizzle, mi, &suppress_warnings))
 
2001
    {
 
2002
      if (!check_io_slave_killed(session, mi, "Slave I/O thread killed "
 
2003
                                 "while registering slave on master"))
 
2004
      {
 
2005
        sql_print_error(_("Slave I/O thread couldn't register on master"));
 
2006
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2007
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
 
2008
          goto err;
 
2009
      }
 
2010
      else
 
2011
        goto err;
 
2012
      goto connected;
 
2013
    }
 
2014
    if (!retry_count_reg)
 
2015
    {
 
2016
      retry_count_reg++;
 
2017
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2018
      if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2019
                           reconnect_messages[SLAVE_RECON_ACT_REG]))
 
2020
        goto err;
 
2021
      goto connected;
 
2022
    }
 
2023
  }
 
2024
 
 
2025
  while (!io_slave_killed(session,mi))
 
2026
  {
 
2027
    session->set_proc_info("Requesting binlog dump");
 
2028
    if (request_dump(drizzle, mi, &suppress_warnings))
 
2029
    {
 
2030
      sql_print_error(_("Failed on request_dump()"));
 
2031
      if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while \
 
2032
requesting master dump")) ||
 
2033
          try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2034
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
 
2035
        goto err;
 
2036
      goto connected;
 
2037
    }
 
2038
    if (!retry_count_dump)
 
2039
    {
 
2040
      retry_count_dump++;
 
2041
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2042
      if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2043
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
 
2044
        goto err;
 
2045
      goto connected;
 
2046
    }
 
2047
 
 
2048
    while (!io_slave_killed(session,mi))
 
2049
    {
 
2050
      uint32_t event_len;
 
2051
      /*
 
2052
        We say "waiting" because read_event() will wait if there's nothing to
 
2053
        read. But if there's something to read, it will not wait. The
 
2054
        important thing is to not confuse users by saying "reading" whereas
 
2055
        we're in fact receiving nothing.
 
2056
      */
 
2057
      session->set_proc_info(_("Waiting for master to send event"));
 
2058
      event_len= read_event(drizzle, mi, &suppress_warnings);
 
2059
      if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while "
 
2060
                                           "reading event")))
 
2061
        goto err;
 
2062
      if (!retry_count_event)
 
2063
      {
 
2064
        retry_count_event++;
 
2065
        sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2066
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2067
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
 
2068
          goto err;
 
2069
        goto connected;
 
2070
      }
 
2071
 
 
2072
      if (event_len == packet_error)
 
2073
      {
 
2074
        uint32_t drizzle_error_number= drizzle_errno(drizzle);
 
2075
        switch (drizzle_error_number) {
 
2076
        case CR_NET_PACKET_TOO_LARGE:
 
2077
          sql_print_error(_("Log entry on master is longer than "
 
2078
                            "max_allowed_packet (%ld) on "
 
2079
                            "slave. If the entry is correct, restart the "
 
2080
                            "server with a higher value of "
 
2081
                            "max_allowed_packet"),
 
2082
                          session->variables.max_allowed_packet);
 
2083
          goto err;
 
2084
        case ER_MASTER_FATAL_ERROR_READING_BINLOG:
 
2085
          sql_print_error(ER(drizzle_error_number), drizzle_error_number,
 
2086
                          drizzle_error(drizzle));
 
2087
          goto err;
 
2088
        case EE_OUTOFMEMORY:
 
2089
        case ER_OUTOFMEMORY:
 
2090
          sql_print_error(
 
2091
       _("Stopping slave I/O thread due to out-of-memory error from master"));
 
2092
          goto err;
 
2093
        }
 
2094
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2095
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
 
2096
          goto err;
 
2097
        goto connected;
 
2098
      } // if (event_len == packet_error)
 
2099
 
 
2100
      retry_count=0;                    // ok event, reset retry counter
 
2101
      session->set_proc_info(_("Queueing master event to the relay log"));
 
2102
      if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
 
2103
      {
 
2104
        goto err;
 
2105
      }
 
2106
      if (mi->flush())
 
2107
      {
 
2108
        sql_print_error(_("Failed to flush master info file"));
 
2109
        goto err;
 
2110
      }
 
2111
      /*
 
2112
        See if the relay logs take too much space.
 
2113
        We don't lock mi->rli.log_space_lock here; this dirty read saves time
 
2114
        and does not introduce any problem:
 
2115
        - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
 
2116
        the clean value is 0), then we are reading only one more event as we
 
2117
        should, and we'll block only at the next event. No big deal.
 
2118
        - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
 
2119
        the clean value is 1), then we are going into wait_for_relay_log_space()
 
2120
        for no reason, but this function will do a clean read, notice the clean
 
2121
        value and exit immediately.
 
2122
      */
 
2123
      if (rli->log_space_limit && rli->log_space_limit <
 
2124
          rli->log_space_total &&
 
2125
          !rli->ignore_log_space_limit)
 
2126
        if (wait_for_relay_log_space(rli))
 
2127
        {
 
2128
          sql_print_error(_("Slave I/O thread aborted while waiting for "
 
2129
                            "relay log space"));
 
2130
          goto err;
 
2131
        }
 
2132
    }
 
2133
  }
 
2134
 
 
2135
// error = 0;
 
2136
err:
 
2137
// print the current replication position
 
2138
  sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
 
2139
                          "position %s"),
 
2140
                        IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
 
2141
  pthread_mutex_lock(&LOCK_thread_count);
 
2142
  session->query = session->db = 0; // extra safety
 
2143
  session->query_length= session->db_length= 0;
 
2144
  pthread_mutex_unlock(&LOCK_thread_count);
 
2145
  if (drizzle)
 
2146
  {
 
2147
    /*
 
2148
      Here we need to clear the active VIO before closing the
 
2149
      connection with the master.  The reason is that Session::awake()
 
2150
      might be called from terminate_slave_thread() because somebody
 
2151
      issued a STOP SLAVE.  If that happends, the close_active_vio()
 
2152
      can be called in the middle of closing the VIO associated with
 
2153
      the 'mysql' object, causing a crash.
 
2154
    */
 
2155
    drizzle_close(drizzle);
 
2156
    mi->drizzle=0;
 
2157
  }
 
2158
  write_ignored_events_info_to_relay_log(session, mi);
 
2159
  session->set_proc_info(_("Waiting for slave mutex on exit"));
 
2160
  pthread_mutex_lock(&mi->run_lock);
 
2161
 
 
2162
  /* Forget the relay log's format */
 
2163
  delete mi->rli.relay_log.description_event_for_queue;
 
2164
  mi->rli.relay_log.description_event_for_queue= 0;
 
2165
  assert(session->net.buff != 0);
 
2166
  net_end(&session->net); // destructor will not free it, because net.vio is 0
 
2167
  close_thread_tables(session);
 
2168
  pthread_mutex_lock(&LOCK_thread_count);
 
2169
  Session_CHECK_SENTRY(session);
 
2170
  delete session;
 
2171
  pthread_mutex_unlock(&LOCK_thread_count);
 
2172
  mi->abort_slave= 0;
 
2173
  mi->slave_running= 0;
 
2174
  mi->io_session= 0;
 
2175
  /*
 
2176
    Note: the order of the two following calls (first broadcast, then unlock)
 
2177
    is important. Otherwise a killer_thread can execute between the calls and
 
2178
    delete the mi structure leading to a crash! (see BUG#25306 for details)
 
2179
   */ 
 
2180
  pthread_cond_broadcast(&mi->stop_cond);       // tell the world we are done
 
2181
  pthread_mutex_unlock(&mi->run_lock);
 
2182
  my_thread_end();
 
2183
  pthread_exit(0);
 
2184
  return(0);                               // Can't return anything here
 
2185
}
 
2186
 
 
2187
 
 
2188
/* Slave SQL Thread entry point */
 
2189
 
 
2190
pthread_handler_t handle_slave_sql(void *arg)
 
2191
{
 
2192
  Session *session;                     /* needs to be first for thread_stack */
 
2193
  char llbuff[22],llbuff1[22];
 
2194
 
 
2195
  Relay_log_info* rli = &((Master_info*)arg)->rli;
 
2196
  const char *errmsg;
 
2197
 
 
2198
  my_thread_init();
 
2199
 
 
2200
  assert(rli->inited);
 
2201
  pthread_mutex_lock(&rli->run_lock);
 
2202
  assert(!rli->slave_running);
 
2203
  errmsg= 0;
 
2204
  rli->events_till_abort = abort_slave_event_count;
 
2205
 
 
2206
  session = new Session;
 
2207
  session->thread_stack = (char*)&session; // remember where our stack is
 
2208
  rli->sql_session= session;
 
2209
  
 
2210
  /* Inform waiting threads that slave has started */
 
2211
  rli->slave_run_id++;
 
2212
  rli->slave_running = 1;
 
2213
 
 
2214
  pthread_detach_this_thread();
 
2215
  if (init_slave_thread(session, SLAVE_Session_SQL))
 
2216
  {
 
2217
    /*
 
2218
      TODO: this is currently broken - slave start and change master
 
2219
      will be stuck if we fail here
 
2220
    */
 
2221
    pthread_cond_broadcast(&rli->start_cond);
 
2222
    pthread_mutex_unlock(&rli->run_lock);
 
2223
    sql_print_error(_("Failed during slave thread initialization"));
 
2224
    goto err;
 
2225
  }
 
2226
  session->init_for_queries();
 
2227
  session->temporary_tables = rli->save_temporary_tables; // restore temp tables
 
2228
  pthread_mutex_lock(&LOCK_thread_count);
 
2229
  threads.append(session);
 
2230
  pthread_mutex_unlock(&LOCK_thread_count);
 
2231
  /*
 
2232
    We are going to set slave_running to 1. Assuming slave I/O thread is
 
2233
    alive and connected, this is going to make Seconds_Behind_Master be 0
 
2234
    i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
 
2235
    the moment we start we can think we are caught up, and the next second we
 
2236
    start receiving data so we realize we are not caught up and
 
2237
    Seconds_Behind_Master grows. No big deal.
 
2238
  */
 
2239
  rli->abort_slave = 0;
 
2240
  pthread_mutex_unlock(&rli->run_lock);
 
2241
  pthread_cond_broadcast(&rli->start_cond);
 
2242
 
 
2243
  /*
 
2244
    Reset errors for a clean start (otherwise, if the master is idle, the SQL
 
2245
    thread may execute no Query_log_event, so the error will remain even
 
2246
    though there's no problem anymore). Do not reset the master timestamp
 
2247
    (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
 
2248
    as we are not sure that we are going to receive a query, we want to
 
2249
    remember the last master timestamp (to say how many seconds behind we are
 
2250
    now.
 
2251
    But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
 
2252
  */
 
2253
  rli->clear_error();
 
2254
 
 
2255
  //tell the I/O thread to take relay_log_space_limit into account from now on
 
2256
  pthread_mutex_lock(&rli->log_space_lock);
 
2257
  rli->ignore_log_space_limit= 0;
 
2258
  pthread_mutex_unlock(&rli->log_space_lock);
 
2259
  rli->trans_retries= 0; // start from "no error"
 
2260
 
 
2261
  if (init_relay_log_pos(rli,
 
2262
                         rli->group_relay_log_name.c_str(),
 
2263
                         rli->group_relay_log_pos,
 
2264
                         1 /*need data lock*/, &errmsg,
 
2265
                         1 /*look for a description_event*/))
 
2266
  {
 
2267
    sql_print_error(_("Error initializing relay log position: %s"),
 
2268
                    errmsg);
 
2269
    goto err;
 
2270
  }
 
2271
  Session_CHECK_SENTRY(session);
 
2272
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
 
2273
  /*
 
2274
    Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
 
2275
    correct position when it's called just after my_b_seek() (the questionable
 
2276
    stuff is those "seek is done on next read" comments in the my_b_seek()
 
2277
    source code).
 
2278
    The crude reality is that this assertion randomly fails whereas
 
2279
    replication seems to work fine. And there is no easy explanation why it
 
2280
    fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
 
2281
    init_relay_log_pos() called above). Maybe the assertion would be
 
2282
    meaningful if we held rli->data_lock between the my_b_seek() and the
 
2283
    assert().
 
2284
  */
 
2285
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
 
2286
  assert(rli->sql_session == session);
 
2287
 
 
2288
  if (global_system_variables.log_warnings)
 
2289
    sql_print_information(_("Slave SQL thread initialized, "
 
2290
                            "starting replication in log '%s' at "
 
2291
                            "position %s, relay log '%s' position: %s"),
 
2292
                            RPL_LOG_NAME,
 
2293
                          llstr(rli->group_master_log_pos,llbuff),
 
2294
                          rli->group_relay_log_name.c_str(),
 
2295
                          llstr(rli->group_relay_log_pos,llbuff1));
 
2296
 
 
2297
  /* execute init_slave variable */
 
2298
  if (sys_init_slave.value_length)
 
2299
  {
 
2300
    execute_init_command(session, &sys_init_slave, &LOCK_sys_init_slave);
 
2301
    if (session->is_slave_error)
 
2302
    {
 
2303
      sql_print_error(_("Slave SQL thread aborted. "
 
2304
                        "Can't execute init_slave query"));
 
2305
      goto err;
 
2306
    }
 
2307
  }
 
2308
 
 
2309
  /*
 
2310
    First check until condition - probably there is nothing to execute. We
 
2311
    do not want to wait for next event in this case.
 
2312
  */
 
2313
  pthread_mutex_lock(&rli->data_lock);
 
2314
  if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
 
2315
      rli->is_until_satisfied(rli->group_master_log_pos))
 
2316
  {
 
2317
    char buf[22];
 
2318
    sql_print_information(_("Slave SQL thread stopped because it reached its"
 
2319
                            " UNTIL position %s"), llstr(rli->until_pos(), buf));
 
2320
    pthread_mutex_unlock(&rli->data_lock);
 
2321
    goto err;
 
2322
  }
 
2323
  pthread_mutex_unlock(&rli->data_lock);
 
2324
 
 
2325
  /* Read queries from the IO/THREAD until this thread is killed */
 
2326
 
 
2327
  while (!sql_slave_killed(session,rli))
 
2328
  {
 
2329
    session->set_proc_info(_("Reading event from the relay log"));
 
2330
    assert(rli->sql_session == session);
 
2331
    Session_CHECK_SENTRY(session);
 
2332
    if (exec_relay_log_event(session,rli))
 
2333
    {
 
2334
      // do not scare the user if SQL thread was simply killed or stopped
 
2335
      if (!sql_slave_killed(session,rli))
 
2336
      {
 
2337
        /*
 
2338
          retrieve as much info as possible from the session and, error
 
2339
          codes and warnings and print this to the error log as to
 
2340
          allow the user to locate the error
 
2341
        */
 
2342
        uint32_t const last_errno= rli->last_error().number;
 
2343
 
 
2344
        if (session->is_error())
 
2345
        {
 
2346
          char const *const errmsg= session->main_da.message();
 
2347
 
 
2348
          if (last_errno == 0)
 
2349
          {
 
2350
            rli->report(ERROR_LEVEL, session->main_da.sql_errno(), "%s", errmsg);
 
2351
          }
 
2352
          else if (last_errno != session->main_da.sql_errno())
 
2353
          {
 
2354
            sql_print_error(_("Slave (additional info): %s Error_code: %d"),
 
2355
                            errmsg, session->main_da.sql_errno());
 
2356
          }
 
2357
        }
 
2358
 
 
2359
        /* Print any warnings issued */
 
2360
        List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
 
2361
        DRIZZLE_ERROR *err;
 
2362
        /*
 
2363
          Added controlled slave thread cancel for replication
 
2364
          of user-defined variables.
 
2365
        */
 
2366
        bool udf_error = false;
 
2367
        while ((err= it++))
 
2368
        {
 
2369
          if (err->code == ER_CANT_OPEN_LIBRARY)
 
2370
            udf_error = true;
 
2371
          sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
 
2372
        }
 
2373
        if (udf_error)
 
2374
          sql_print_error(_("Error loading user-defined library, slave SQL "
 
2375
                            "thread aborted. Install the missing library, "
 
2376
                            "and restart the slave SQL thread with "
 
2377
                            "\"SLAVE START\". We stopped at log '%s' "
 
2378
                            "position %s"),
 
2379
                          RPL_LOG_NAME, llstr(rli->group_master_log_pos,
 
2380
            llbuff));
 
2381
        else
 
2382
          sql_print_error(_("Error running query, slave SQL thread aborted. "
 
2383
                            "Fix the problem, and restart "
 
2384
                            "the slave SQL thread with \"SLAVE START\". "
 
2385
                            "We stopped at log '%s' position %s"),
 
2386
                          RPL_LOG_NAME,
 
2387
                          llstr(rli->group_master_log_pos, llbuff));
 
2388
      }
 
2389
      goto err;
 
2390
    }
 
2391
  }
 
2392
 
 
2393
  /* Thread stopped. Print the current replication position to the log */
 
2394
  sql_print_information(_("Slave SQL thread exiting, replication stopped in "
 
2395
                          "log '%s' at position %s"),
 
2396
                        RPL_LOG_NAME,
 
2397
                        llstr(rli->group_master_log_pos,llbuff));
 
2398
 
 
2399
 err:
 
2400
 
 
2401
  /*
 
2402
    Some events set some playgrounds, which won't be cleared because thread
 
2403
    stops. Stopping of this thread may not be known to these events ("stop"
 
2404
    request is detected only by the present function, not by events), so we
 
2405
    must "proactively" clear playgrounds:
 
2406
  */
 
2407
  rli->cleanup_context(session, 1);
 
2408
  pthread_mutex_lock(&LOCK_thread_count);
 
2409
  /*
 
2410
    Some extra safety, which should not been needed (normally, event deletion
 
2411
    should already have done these assignments (each event which sets these
 
2412
    variables is supposed to set them to 0 before terminating)).
 
2413
  */
 
2414
  session->query= session->db= session->catalog= 0;
 
2415
  session->query_length= session->db_length= 0;
 
2416
  pthread_mutex_unlock(&LOCK_thread_count);
 
2417
  session->set_proc_info("Waiting for slave mutex on exit");
 
2418
  pthread_mutex_lock(&rli->run_lock);
 
2419
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
 
2420
  pthread_mutex_lock(&rli->data_lock);
 
2421
  assert(rli->slave_running == 1); // tracking buffer overrun
 
2422
  /* When master_pos_wait() wakes up it will check this and terminate */
 
2423
  rli->slave_running= 0;
 
2424
  /* Forget the relay log's format */
 
2425
  delete rli->relay_log.description_event_for_exec;
 
2426
  rli->relay_log.description_event_for_exec= 0;
 
2427
  /* Wake up master_pos_wait() */
 
2428
  pthread_mutex_unlock(&rli->data_lock);
 
2429
  pthread_cond_broadcast(&rli->data_cond);
 
2430
  rli->ignore_log_space_limit= 0; /* don't need any lock */
 
2431
  rli->save_temporary_tables = session->temporary_tables;
 
2432
 
 
2433
  /*
 
2434
    TODO: see if we can do this conditionally in next_event() instead
 
2435
    to avoid unneeded position re-init
 
2436
  */
 
2437
  session->temporary_tables = 0; // remove tempation from destructor to close them
 
2438
  assert(session->net.buff != 0);
 
2439
  net_end(&session->net); // destructor will not free it, because we are weird
 
2440
  assert(rli->sql_session == session);
 
2441
  Session_CHECK_SENTRY(session);
 
2442
  rli->sql_session= 0;
 
2443
  pthread_mutex_lock(&LOCK_thread_count);
 
2444
  Session_CHECK_SENTRY(session);
 
2445
  delete session;
 
2446
  pthread_mutex_unlock(&LOCK_thread_count);
 
2447
 /*
 
2448
  Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
 
2449
  is important. Otherwise a killer_thread can execute between the calls and
 
2450
  delete the mi structure leading to a crash! (see BUG#25306 for details)
 
2451
 */ 
 
2452
  pthread_cond_broadcast(&rli->stop_cond);
 
2453
  pthread_mutex_unlock(&rli->run_lock);  // tell the world we are done
 
2454
  
 
2455
  my_thread_end();
 
2456
  pthread_exit(0);
 
2457
  return(0);                               // Can't return anything here
 
2458
}
 
2459
 
 
2460
 
 
2461
/*
 
2462
  process_io_create_file()
 
2463
*/
 
2464
 
 
2465
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
 
2466
{
 
2467
  int32_t error = 1;
 
2468
  uint32_t num_bytes;
 
2469
  bool cev_not_written;
 
2470
  Session *session = mi->io_session;
 
2471
  NET *net = &mi->drizzle->net;
 
2472
 
 
2473
  if (unlikely(!cev->is_valid()))
 
2474
    return(1);
 
2475
 
 
2476
  if (!rpl_filter->db_ok(cev->db))
 
2477
  {
 
2478
    skip_load_data_infile(net);
 
2479
    return(0);
 
2480
  }
 
2481
  assert(cev->inited_from_old);
 
2482
  session->file_id = cev->file_id = mi->file_id++;
 
2483
  session->server_id = cev->server_id;
 
2484
  cev_not_written = 1;
 
2485
 
 
2486
  if (unlikely(net_request_file(net,cev->fname)))
 
2487
  {
 
2488
    sql_print_error(_("Slave I/O: failed requesting download of '%s'"),
 
2489
                    cev->fname);
 
2490
    goto err;
 
2491
  }
 
2492
 
 
2493
  /*
 
2494
    This dummy block is so we could instantiate Append_block_log_event
 
2495
    once and then modify it slightly instead of doing it multiple times
 
2496
    in the loop
 
2497
  */
 
2498
  {
 
2499
    Append_block_log_event aev(session,0,0,0,0);
 
2500
 
 
2501
    for (;;)
 
2502
    {
 
2503
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
 
2504
      {
 
2505
        sql_print_error(_("Network read error downloading '%s' from master"),
 
2506
                        cev->fname);
 
2507
        goto err;
 
2508
      }
 
2509
      if (unlikely(!num_bytes)) /* eof */
 
2510
      {
 
2511
        /* 3.23 master wants it */
 
2512
        net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0);
 
2513
        /*
 
2514
          If we wrote Create_file_log_event, then we need to write
 
2515
          Execute_load_log_event. If we did not write Create_file_log_event,
 
2516
          then this is an empty file and we can just do as if the LOAD DATA
 
2517
          INFILE had not existed, i.e. write nothing.
 
2518
        */
 
2519
        if (unlikely(cev_not_written))
 
2520
          break;
 
2521
        Execute_load_log_event xev(session,0,0);
 
2522
        xev.log_pos = cev->log_pos;
 
2523
        if (unlikely(mi->rli.relay_log.append(&xev)))
 
2524
        {
 
2525
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2526
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2527
                     _("error writing Exec_load event to relay log"));
 
2528
          goto err;
 
2529
        }
 
2530
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
 
2531
        break;
 
2532
      }
 
2533
      if (unlikely(cev_not_written))
 
2534
      {
 
2535
        cev->block = net->read_pos;
 
2536
        cev->block_len = num_bytes;
 
2537
        if (unlikely(mi->rli.relay_log.append(cev)))
 
2538
        {
 
2539
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2540
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2541
                     _("error writing Create_file event to relay log"));
 
2542
          goto err;
 
2543
        }
 
2544
        cev_not_written=0;
 
2545
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
 
2546
      }
 
2547
      else
 
2548
      {
 
2549
        aev.block = net->read_pos;
 
2550
        aev.block_len = num_bytes;
 
2551
        aev.log_pos = cev->log_pos;
 
2552
        if (unlikely(mi->rli.relay_log.append(&aev)))
 
2553
        {
 
2554
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2555
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2556
                     _("error writing Append_block event to relay log"));
 
2557
          goto err;
 
2558
        }
 
2559
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
 
2560
      }
 
2561
    }
 
2562
  }
 
2563
  error=0;
 
2564
err:
 
2565
  return(error);
 
2566
}
 
2567
 
 
2568
 
 
2569
/*
 
2570
  Start using a new binary log on the master
 
2571
 
 
2572
  SYNOPSIS
 
2573
    process_io_rotate()
 
2574
    mi                  master_info for the slave
 
2575
    rev                 The rotate log event read from the binary log
 
2576
 
 
2577
  DESCRIPTION
 
2578
    Updates the master info with the place in the next binary
 
2579
    log where we should start reading.
 
2580
    Rotate the relay log to avoid mixed-format relay logs.
 
2581
 
 
2582
  NOTES
 
2583
    We assume we already locked mi->data_lock
 
2584
 
 
2585
  RETURN VALUES
 
2586
    0           ok
 
2587
    1           Log event is illegal
 
2588
 
 
2589
*/
 
2590
 
 
2591
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
 
2592
{
 
2593
  safe_mutex_assert_owner(&mi->data_lock);
 
2594
 
 
2595
  if (unlikely(!rev->is_valid()))
 
2596
    return(1);
 
2597
 
 
2598
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
 
2599
  mi->setLogName(rev->new_log_ident);
 
2600
  mi->setLogPosition(rev->pos);
 
2601
  /*
 
2602
    If we do not do this, we will be getting the first
 
2603
    rotate event forever, so we need to not disconnect after one.
 
2604
  */
 
2605
  if (disconnect_slave_event_count)
 
2606
    mi->events_till_disconnect++;
 
2607
 
 
2608
  /*
 
2609
    If description_event_for_queue is format <4, there is conversion in the
 
2610
    relay log to the slave's format (4). And Rotate can mean upgrade or
 
2611
    nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
 
2612
    no need to reset description_event_for_queue now. And if it's nothing (same
 
2613
    master version as before), no need (still using the slave's format).
 
2614
  */
 
2615
  if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
 
2616
  {
 
2617
    delete mi->rli.relay_log.description_event_for_queue;
 
2618
    /* start from format 3 (DRIZZLE 4.0) again */
 
2619
    mi->rli.relay_log.description_event_for_queue= new
 
2620
      Format_description_log_event(3);
 
2621
  }
 
2622
  /*
 
2623
    Rotate the relay log makes binlog format detection easier (at next slave
 
2624
    start or mysqlbinlog)
 
2625
  */
 
2626
  rotate_relay_log(mi); /* will take the right mutexes */
 
2627
  return(0);
 
2628
}
 
2629
 
 
2630
/*
 
2631
  Reads a 3.23 event and converts it to the slave's format. This code was
 
2632
  copied from DRIZZLE 4.0.
 
2633
*/
 
2634
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
 
2635
                           uint32_t event_len)
 
2636
{
 
2637
  const char *errmsg = 0;
 
2638
  uint32_t inc_pos;
 
2639
  bool ignore_event= 0;
 
2640
  char *tmp_buf = 0;
 
2641
  Relay_log_info *rli= &mi->rli;
 
2642
 
 
2643
  /*
 
2644
    If we get Load event, we need to pass a non-reusable buffer
 
2645
    to read_log_event, so we do a trick
 
2646
  */
 
2647
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
 
2648
  {
 
2649
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
 
2650
    {
 
2651
      mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
 
2652
                 ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
 
2653
      return(1);
 
2654
    }
 
2655
    memcpy(tmp_buf,buf,event_len);
 
2656
    /*
 
2657
      Create_file constructor wants a 0 as last char of buffer, this 0 will
 
2658
      serve as the string-termination char for the file's name (which is at the
 
2659
      end of the buffer)
 
2660
      We must increment event_len, otherwise the event constructor will not see
 
2661
      this end 0, which leads to segfault.
 
2662
    */
 
2663
    tmp_buf[event_len++]=0;
 
2664
    int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
 
2665
    buf = (const char*)tmp_buf;
 
2666
  }
 
2667
  /*
 
2668
    This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
 
2669
    send the loaded file, and write it to the relay log in the form of
 
2670
    Append_block/Exec_load (the SQL thread needs the data, as that thread is not
 
2671
    connected to the master).
 
2672
  */
 
2673
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
 
2674
                                            mi->rli.relay_log.description_event_for_queue);
 
2675
  if (unlikely(!ev))
 
2676
  {
 
2677
    sql_print_error(_("Read invalid event from master: '%s', "
 
2678
                      "master could be corrupt but a more likely cause "
 
2679
                      "of this is a bug"),
 
2680
                    errmsg);
 
2681
    free((char*) tmp_buf);
 
2682
    return(1);
 
2683
  }
 
2684
 
 
2685
  pthread_mutex_lock(&mi->data_lock);
 
2686
  ev->log_pos= mi->getLogPosition(); /* 3.23 events don't contain log_pos */
 
2687
  switch (ev->get_type_code()) {
 
2688
  case STOP_EVENT:
 
2689
    ignore_event= 1;
 
2690
    inc_pos= event_len;
 
2691
    break;
 
2692
  case ROTATE_EVENT:
 
2693
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
 
2694
    {
 
2695
      delete ev;
 
2696
      pthread_mutex_unlock(&mi->data_lock);
 
2697
      return(1);
 
2698
    }
 
2699
    inc_pos= 0;
 
2700
    break;
 
2701
  case CREATE_FILE_EVENT:
 
2702
    /*
 
2703
      Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
 
2704
      queue_old_event() which is for 3.23 events which don't comprise
 
2705
      CREATE_FILE_EVENT. This is because read_log_event() above has just
 
2706
      transformed LOAD_EVENT into CREATE_FILE_EVENT.
 
2707
    */
 
2708
  {
 
2709
    /* We come here when and only when tmp_buf != 0 */
 
2710
    assert(tmp_buf != 0);
 
2711
    inc_pos=event_len;
 
2712
    ev->log_pos+= inc_pos;
 
2713
    int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
 
2714
    delete ev;
 
2715
    mi->incrementLogPosition(inc_pos);
 
2716
    pthread_mutex_unlock(&mi->data_lock);
 
2717
    free((char*)tmp_buf);
 
2718
    return(error);
 
2719
  }
 
2720
  default:
 
2721
    inc_pos= event_len;
 
2722
    break;
 
2723
  }
 
2724
  if (likely(!ignore_event))
 
2725
  {
 
2726
    if (ev->log_pos)
 
2727
      /*
 
2728
         Don't do it for fake Rotate events (see comment in
 
2729
      Log_event::Log_event(const char* buf...) in log_event.cc).
 
2730
      */
 
2731
      ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
 
2732
    if (unlikely(rli->relay_log.append(ev)))
 
2733
    {
 
2734
      delete ev;
 
2735
      pthread_mutex_unlock(&mi->data_lock);
 
2736
      return(1);
 
2737
    }
 
2738
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
2739
  }
 
2740
  delete ev;
 
2741
  mi->incrementLogPosition(inc_pos);
 
2742
  pthread_mutex_unlock(&mi->data_lock);
 
2743
  return(0);
 
2744
}
 
2745
 
 
2746
/*
 
2747
  Reads a 4.0 event and converts it to the slave's format. This code was copied
 
2748
  from queue_binlog_ver_1_event(), with some affordable simplifications.
 
2749
*/
 
2750
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
 
2751
                           uint32_t event_len)
 
2752
{
 
2753
  const char *errmsg = 0;
 
2754
  uint32_t inc_pos;
 
2755
  char *tmp_buf = 0;
 
2756
  Relay_log_info *rli= &mi->rli;
 
2757
 
 
2758
  /* read_log_event() will adjust log_pos to be end_log_pos */
 
2759
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
 
2760
                                            mi->rli.relay_log.description_event_for_queue);
 
2761
  if (unlikely(!ev))
 
2762
  {
 
2763
    sql_print_error(_("Read invalid event from master: '%s', "
 
2764
                      "master could be corrupt but a more likely cause of "
 
2765
                      "this is a bug"),
 
2766
                    errmsg);
 
2767
    free((char*) tmp_buf);
 
2768
    return(1);
 
2769
  }
 
2770
  pthread_mutex_lock(&mi->data_lock);
 
2771
  switch (ev->get_type_code()) {
 
2772
  case STOP_EVENT:
 
2773
    goto err;
 
2774
  case ROTATE_EVENT:
 
2775
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
 
2776
    {
 
2777
      delete ev;
 
2778
      pthread_mutex_unlock(&mi->data_lock);
 
2779
      return(1);
 
2780
    }
 
2781
    inc_pos= 0;
 
2782
    break;
 
2783
  default:
 
2784
    inc_pos= event_len;
 
2785
    break;
 
2786
  }
 
2787
  if (unlikely(rli->relay_log.append(ev)))
 
2788
  {
 
2789
    delete ev;
 
2790
    pthread_mutex_unlock(&mi->data_lock);
 
2791
    return(1);
 
2792
  }
 
2793
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
2794
  delete ev;
 
2795
  mi->incrementLogPosition(inc_pos);
 
2796
err:
 
2797
  pthread_mutex_unlock(&mi->data_lock);
 
2798
  return(0);
 
2799
}
 
2800
 
 
2801
/*
 
2802
  queue_old_event()
 
2803
 
 
2804
  Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
 
2805
  (exactly, slave's) format. To do the conversion, we create a 5.0 event from
 
2806
  the 3.23/4.0 bytes, then write this event to the relay log.
 
2807
 
 
2808
  TODO:
 
2809
    Test this code before release - it has to be tested on a separate
 
2810
    setup with 3.23 master or 4.0 master
 
2811
*/
 
2812
 
 
2813
static int32_t queue_old_event(Master_info *mi, const char *buf,
 
2814
                           uint32_t event_len)
 
2815
{
 
2816
  switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
 
2817
  {
 
2818
  case 1:
 
2819
      return(queue_binlog_ver_1_event(mi,buf,event_len));
 
2820
  case 3:
 
2821
      return(queue_binlog_ver_3_event(mi,buf,event_len));
 
2822
  default: /* unsupported format; eg version 2 */
 
2823
    return(1);
 
2824
  }
 
2825
}
 
2826
 
 
2827
/*
 
2828
  queue_event()
 
2829
 
 
2830
  If the event is 3.23/4.0, passes it to queue_old_event() which will convert
 
2831
  it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
 
2832
  no format conversion, it's pure read/write of bytes.
 
2833
  So a 5.0.0 slave's relay log can contain events in the slave's format or in
 
2834
  any >=5.0.0 format.
 
2835
*/
 
2836
 
 
2837
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
 
2838
{
 
2839
  int32_t error= 0;
 
2840
  String error_msg;
 
2841
  uint32_t inc_pos= 0;
 
2842
  Relay_log_info *rli= &mi->rli;
 
2843
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
 
2844
 
 
2845
 
 
2846
  if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
 
2847
      buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
 
2848
    return(queue_old_event(mi,buf,event_len));
 
2849
 
 
2850
  pthread_mutex_lock(&mi->data_lock);
 
2851
 
 
2852
  switch (buf[EVENT_TYPE_OFFSET]) {
 
2853
  case STOP_EVENT:
 
2854
    /*
 
2855
      We needn't write this event to the relay log. Indeed, it just indicates a
 
2856
      master server shutdown. The only thing this does is cleaning. But
 
2857
      cleaning is already done on a per-master-thread basis (as the master
 
2858
      server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
 
2859
      prepared statements' deletion are TODO only when we binlog prep stmts).
 
2860
 
 
2861
      We don't even increment mi->master_log_pos, because we may be just after
 
2862
      a Rotate event. Btw, in a few milliseconds we are going to have a Start
 
2863
      event from the next binlog (unless the master is presently running
 
2864
      without --log-bin).
 
2865
    */
 
2866
    goto err;
 
2867
  case ROTATE_EVENT:
 
2868
  {
 
2869
    Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
 
2870
    if (unlikely(process_io_rotate(mi,&rev)))
 
2871
    {
 
2872
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
2873
      goto err;
 
2874
    }
 
2875
    /*
 
2876
      Now the I/O thread has just changed its mi->master_log_name, so
 
2877
      incrementing mi->master_log_pos is nonsense.
 
2878
    */
 
2879
    inc_pos= 0;
 
2880
    break;
 
2881
  }
 
2882
  case FORMAT_DESCRIPTION_EVENT:
 
2883
  {
 
2884
    /*
 
2885
      Create an event, and save it (when we rotate the relay log, we will have
 
2886
      to write this event again).
 
2887
    */
 
2888
    /*
 
2889
      We are the only thread which reads/writes description_event_for_queue.
 
2890
      The relay_log struct does not move (though some members of it can
 
2891
      change), so we needn't any lock (no rli->data_lock, no log lock).
 
2892
    */
 
2893
    Format_description_log_event* tmp;
 
2894
    const char* errmsg;
 
2895
    if (!(tmp= (Format_description_log_event*)
 
2896
          Log_event::read_log_event(buf, event_len, &errmsg,
 
2897
                                    mi->rli.relay_log.description_event_for_queue)))
 
2898
    {
 
2899
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
2900
      goto err;
 
2901
    }
 
2902
    delete mi->rli.relay_log.description_event_for_queue;
 
2903
    mi->rli.relay_log.description_event_for_queue= tmp;
 
2904
    /*
 
2905
       Though this does some conversion to the slave's format, this will
 
2906
       preserve the master's binlog format version, and number of event types.
 
2907
    */
 
2908
    /*
 
2909
       If the event was not requested by the slave (the slave did not ask for
 
2910
       it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
 
2911
    */
 
2912
    inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
 
2913
  }
 
2914
  break;
 
2915
 
 
2916
  case HEARTBEAT_LOG_EVENT:
 
2917
  {
 
2918
    /*
 
2919
      HB (heartbeat) cannot come before RL (Relay)
 
2920
    */
 
2921
    char  llbuf[22];
 
2922
    Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
 
2923
    if (!hb.is_valid())
 
2924
    {
 
2925
      error= ER_SLAVE_HEARTBEAT_FAILURE;
 
2926
      error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
 
2927
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
 
2928
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
 
2929
      error_msg.append(STRING_WITH_LEN(" log_pos "));
 
2930
      llstr(hb.log_pos, llbuf);
 
2931
      error_msg.append(llbuf, strlen(llbuf));
 
2932
      goto err;
 
2933
    }
 
2934
    mi->received_heartbeats++;
 
2935
    /* 
 
2936
       compare local and event's versions of log_file, log_pos.
 
2937
       
 
2938
       Heartbeat is sent only after an event corresponding to the corrdinates
 
2939
       the heartbeat carries.
 
2940
       Slave can not have a difference in coordinates except in the only
 
2941
       special case when mi->master_log_name, master_log_pos have never
 
2942
       been updated by Rotate event i.e when slave does not have any history
 
2943
       with the master (and thereafter mi->master_log_pos is NULL).
 
2944
 
 
2945
       TODO: handling `when' for SHOW SLAVE STATUS' snds behind
 
2946
    */
 
2947
    if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
 
2948
        || mi->getLogPosition() != hb.log_pos)
 
2949
    {
 
2950
      /* missed events of heartbeat from the past */
 
2951
      error= ER_SLAVE_HEARTBEAT_FAILURE;
 
2952
      error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
 
2953
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
 
2954
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
 
2955
      error_msg.append(STRING_WITH_LEN(" log_pos "));
 
2956
      llstr(hb.log_pos, llbuf);
 
2957
      error_msg.append(llbuf, strlen(llbuf));
 
2958
      goto err;
 
2959
    }
 
2960
    goto skip_relay_logging;
 
2961
  }
 
2962
  break;
 
2963
    
 
2964
  default:
 
2965
    inc_pos= event_len;
 
2966
    break;
 
2967
  }
 
2968
 
 
2969
  /*
 
2970
     If this event is originating from this server, don't queue it.
 
2971
     We don't check this for 3.23 events because it's simpler like this; 3.23
 
2972
     will be filtered anyway by the SQL slave thread which also tests the
 
2973
     server id (we must also keep this test in the SQL thread, in case somebody
 
2974
     upgrades a 4.0 slave which has a not-filtered relay log).
 
2975
 
 
2976
     ANY event coming from ourselves can be ignored: it is obvious for queries;
 
2977
     for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
 
2978
     (--log-slave-updates would not log that) unless this slave is also its
 
2979
     direct master (an unsupported, useless setup!).
 
2980
  */
 
2981
 
 
2982
  pthread_mutex_lock(log_lock);
 
2983
 
 
2984
  if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
 
2985
      !mi->rli.replicate_same_server_id)
 
2986
  {
 
2987
    /*
 
2988
      Do not write it to the relay log.
 
2989
      a) We still want to increment mi->master_log_pos, so that we won't
 
2990
      re-read this event from the master if the slave IO thread is now
 
2991
      stopped/restarted (more efficient if the events we are ignoring are big
 
2992
      LOAD DATA INFILE).
 
2993
      b) We want to record that we are skipping events, for the information of
 
2994
      the slave SQL thread, otherwise that thread may let
 
2995
      rli->group_relay_log_pos stay too small if the last binlog's event is
 
2996
      ignored.
 
2997
      But events which were generated by this slave and which do not exist in
 
2998
      the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
 
2999
      mi->master_log_pos.
 
3000
    */
 
3001
    if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
 
3002
        buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
 
3003
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
 
3004
    {
 
3005
      mi->incrementLogPosition(inc_pos);
 
3006
      memcpy(rli->ign_master_log_name_end, mi->getLogName(), FN_REFLEN);
 
3007
      assert(rli->ign_master_log_name_end[0]);
 
3008
      rli->ign_master_log_pos_end= mi->getLogPosition();
 
3009
    }
 
3010
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
 
3011
  }
 
3012
  else
 
3013
  {
 
3014
    /* write the event to the relay log */
 
3015
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
 
3016
    {
 
3017
      mi->incrementLogPosition(inc_pos);
 
3018
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3019
    }
 
3020
    else
 
3021
    {
 
3022
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
3023
    }
 
3024
    rli->ign_master_log_name_end[0]= 0; // last event is not ignored
 
3025
  }
 
3026
  pthread_mutex_unlock(log_lock);
 
3027
 
 
3028
skip_relay_logging:
 
3029
  
 
3030
err:
 
3031
  pthread_mutex_unlock(&mi->data_lock);
 
3032
  if (error)
 
3033
    mi->report(ERROR_LEVEL, error, ER(error),
 
3034
               (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
 
3035
               _("could not queue event from master") :
 
3036
               error_msg.ptr());
 
3037
  return(error);
 
3038
}
 
3039
 
 
3040
 
 
3041
void end_relay_log_info(Relay_log_info* rli)
 
3042
{
 
3043
  if (!rli->inited)
 
3044
    return;
 
3045
  if (rli->info_fd >= 0)
 
3046
  {
 
3047
    end_io_cache(&rli->info_file);
 
3048
    (void) my_close(rli->info_fd, MYF(MY_WME));
 
3049
    rli->info_fd = -1;
 
3050
  }
 
3051
  if (rli->cur_log_fd >= 0)
 
3052
  {
 
3053
    end_io_cache(&rli->cache_buf);
 
3054
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
 
3055
    rli->cur_log_fd = -1;
 
3056
  }
 
3057
  rli->inited = 0;
 
3058
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
3059
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3060
  /*
 
3061
    Delete the slave's temporary tables from memory.
 
3062
    In the future there will be other actions than this, to ensure persistance
 
3063
    of slave's temp tables after shutdown.
 
3064
  */
 
3065
  rli->close_temporary_tables();
 
3066
  return;
 
3067
}
 
3068
 
 
3069
/*
 
3070
  Try to connect until successful or slave killed
 
3071
 
 
3072
  SYNPOSIS
 
3073
    safe_connect()
 
3074
    session                 Thread handler for slave
 
3075
    DRIZZLE               DRIZZLE connection handle
 
3076
    mi                  Replication handle
 
3077
 
 
3078
  RETURN
 
3079
    0   ok
 
3080
    #   Error
 
3081
*/
 
3082
 
 
3083
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi)
 
3084
{
 
3085
  return(connect_to_master(session, drizzle, mi, 0, 0));
 
3086
}
 
3087
 
 
3088
 
 
3089
/*
 
3090
  SYNPOSIS
 
3091
    connect_to_master()
 
3092
 
 
3093
  IMPLEMENTATION
 
3094
    Try to connect until successful or slave killed or we have retried
 
3095
    master_retry_count times
 
3096
*/
 
3097
 
 
3098
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
 
3099
                                 bool reconnect, bool suppress_warnings)
 
3100
{
 
3101
  int32_t slave_was_killed;
 
3102
  int32_t last_errno= -2;                           // impossible error
 
3103
  uint32_t err_count=0;
 
3104
  char llbuff[22];
 
3105
 
 
3106
  mi->events_till_disconnect = disconnect_slave_event_count;
 
3107
  uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
 
3108
  if (opt_slave_compressed_protocol)
 
3109
    client_flag=CLIENT_COMPRESS;                /* We will use compression */
 
3110
 
 
3111
  drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
 
3112
  drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
 
3113
 
 
3114
  while (!(slave_was_killed = io_slave_killed(session,mi)) &&
 
3115
         (reconnect ? drizzle_reconnect(drizzle) != 0 :
 
3116
          drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
 
3117
                             mi->getPort(), 0, client_flag) == 0))
 
3118
  {
 
3119
    /* Don't repeat last error */
 
3120
    if ((int32_t)drizzle_errno(drizzle) != last_errno)
 
3121
    {
 
3122
      last_errno=drizzle_errno(drizzle);
 
3123
      suppress_warnings= 0;
 
3124
      mi->report(ERROR_LEVEL, last_errno,
 
3125
                 _("error %s to master '%s@%s:%d'"
 
3126
                   " - retry-time: %d  retries: %u"),
 
3127
                 (reconnect ? _("reconnecting") : _("connecting")),
 
3128
                 mi->getUsername(), mi->getHostname(), mi->getPort(),
 
3129
                 mi->getConnectionRetry(), master_retry_count);
 
3130
    }
 
3131
    /*
 
3132
      By default we try forever. The reason is that failure will trigger
 
3133
      master election, so if the user did not set master_retry_count we
 
3134
      do not want to have election triggered on the first failure to
 
3135
      connect
 
3136
    */
 
3137
    if (++err_count == master_retry_count)
 
3138
    {
 
3139
      slave_was_killed=1;
 
3140
      break;
 
3141
    }
 
3142
    safe_sleep(session,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
 
3143
               (void*)mi);
 
3144
  }
 
3145
 
 
3146
  if (!slave_was_killed)
 
3147
  {
 
3148
    if (reconnect)
 
3149
    {
 
3150
      if (!suppress_warnings && global_system_variables.log_warnings)
 
3151
        sql_print_information(_("Slave: connected to master '%s@%s:%d', "
 
3152
                                "replication resumed in log '%s' at "
 
3153
                                "position %s"), mi->getUsername(),
 
3154
                                mi->getHostname(), mi->getPort(),
 
3155
                                IO_RPL_LOG_NAME,
 
3156
                                llstr(mi->getLogPosition(),llbuff));
 
3157
    }
 
3158
  }
 
3159
  drizzle->reconnect= 1;
 
3160
  return(slave_was_killed);
 
3161
}
 
3162
 
 
3163
 
 
3164
/*
 
3165
  safe_reconnect()
 
3166
 
 
3167
  IMPLEMENTATION
 
3168
    Try to connect until successful or slave killed or we have retried
 
3169
    master_retry_count times
 
3170
*/
 
3171
 
 
3172
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
 
3173
                          bool suppress_warnings)
 
3174
{
 
3175
  return(connect_to_master(session, drizzle, mi, 1, suppress_warnings));
 
3176
}
 
3177
 
 
3178
 
 
3179
/*
 
3180
  Store the file and position where the execute-slave thread are in the
 
3181
  relay log.
 
3182
 
 
3183
  SYNOPSIS
 
3184
    flush_relay_log_info()
 
3185
    rli                 Relay log information
 
3186
 
 
3187
  NOTES
 
3188
    - As this is only called by the slave thread, we don't need to
 
3189
      have a lock on this.
 
3190
    - If there is an active transaction, then we don't update the position
 
3191
      in the relay log.  This is to ensure that we re-execute statements
 
3192
      if we die in the middle of an transaction that was rolled back.
 
3193
    - As a transaction never spans binary logs, we don't have to handle the
 
3194
      case where we do a relay-log-rotation in the middle of the transaction.
 
3195
      If this would not be the case, we would have to ensure that we
 
3196
      don't delete the relay log file where the transaction started when
 
3197
      we switch to a new relay log file.
 
3198
 
 
3199
  TODO
 
3200
    - Change the log file information to a binary format to avoid calling
 
3201
      int64_t2str.
 
3202
 
 
3203
  RETURN VALUES
 
3204
    0   ok
 
3205
    1   write error
 
3206
*/
 
3207
 
 
3208
bool flush_relay_log_info(Relay_log_info* rli)
 
3209
{
 
3210
  bool error= 0;
 
3211
 
 
3212
  if (unlikely(rli->no_storage))
 
3213
    return(0);
 
3214
 
 
3215
  return(error);
 
3216
}
 
3217
 
 
3218
 
 
3219
/*
 
3220
  Called when we notice that the current "hot" log got rotated under our feet.
 
3221
*/
 
3222
 
 
3223
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
 
3224
{
 
3225
  assert(rli->cur_log != &rli->cache_buf);
 
3226
  assert(rli->cur_log_fd == -1);
 
3227
 
 
3228
  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
 
3229
  if ((rli->cur_log_fd=open_binlog(cur_log, rli->event_relay_log_name.c_str(), errmsg)) < 0)
 
3230
    return(0);
 
3231
  /*
 
3232
    We want to start exactly where we was before:
 
3233
    relay_log_pos       Current log pos
 
3234
    pending             Number of bytes already processed from the event
 
3235
  */
 
3236
  rli->event_relay_log_pos= cmax(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
 
3237
  my_b_seek(cur_log,rli->event_relay_log_pos);
 
3238
  return(cur_log);
 
3239
}
 
3240
 
 
3241
 
 
3242
static Log_event* next_event(Relay_log_info* rli)
 
3243
{
 
3244
  Log_event* ev;
 
3245
  IO_CACHE* cur_log = rli->cur_log;
 
3246
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
 
3247
  const char* errmsg=0;
 
3248
  Session* session = rli->sql_session;
 
3249
 
 
3250
  assert(session != 0);
 
3251
 
 
3252
  if (abort_slave_event_count && !rli->events_till_abort--)
 
3253
    return(0);
 
3254
 
 
3255
  /*
 
3256
    For most operations we need to protect rli members with data_lock,
 
3257
    so we assume calling function acquired this mutex for us and we will
 
3258
    hold it for the most of the loop below However, we will release it
 
3259
    whenever it is worth the hassle,  and in the cases when we go into a
 
3260
    pthread_cond_wait() with the non-data_lock mutex
 
3261
  */
 
3262
  safe_mutex_assert_owner(&rli->data_lock);
 
3263
 
 
3264
  while (!sql_slave_killed(session,rli))
 
3265
  {
 
3266
    /*
 
3267
      We can have two kinds of log reading:
 
3268
      hot_log:
 
3269
        rli->cur_log points at the IO_CACHE of relay_log, which
 
3270
        is actively being updated by the I/O thread. We need to be careful
 
3271
        in this case and make sure that we are not looking at a stale log that
 
3272
        has already been rotated. If it has been, we reopen the log.
 
3273
 
 
3274
      The other case is much simpler:
 
3275
        We just have a read only log that nobody else will be updating.
 
3276
    */
 
3277
    bool hot_log;
 
3278
    if ((hot_log = (cur_log != &rli->cache_buf)))
 
3279
    {
 
3280
      assert(rli->cur_log_fd == -1); // foreign descriptor
 
3281
      pthread_mutex_lock(log_lock);
 
3282
 
 
3283
      /*
 
3284
        Reading xxx_file_id is safe because the log will only
 
3285
        be rotated when we hold relay_log.LOCK_log
 
3286
      */
 
3287
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
 
3288
      {
 
3289
        // The master has switched to a new log file; Reopen the old log file
 
3290
        cur_log=reopen_relay_log(rli, &errmsg);
 
3291
        pthread_mutex_unlock(log_lock);
 
3292
        if (!cur_log)                           // No more log files
 
3293
          goto err;
 
3294
        hot_log=0;                              // Using old binary log
 
3295
      }
 
3296
    }
 
3297
    /* 
 
3298
      As there is no guarantee that the relay is open (for example, an I/O
 
3299
      error during a write by the slave I/O thread may have closed it), we
 
3300
      have to test it.
 
3301
    */
 
3302
    if (!my_b_inited(cur_log))
 
3303
      goto err;
 
3304
    assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
 
3305
    assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
 
3306
 
 
3307
    /*
 
3308
      Relay log is always in new format - if the master is 3.23, the
 
3309
      I/O thread will convert the format for us.
 
3310
      A problem: the description event may be in a previous relay log. So if
 
3311
      the slave has been shutdown meanwhile, we would have to look in old relay
 
3312
      logs, which may even have been deleted. So we need to write this
 
3313
      description event at the beginning of the relay log.
 
3314
      When the relay log is created when the I/O thread starts, easy: the
 
3315
      master will send the description event and we will queue it.
 
3316
      But if the relay log is created by new_file(): then the solution is:
 
3317
      DRIZZLE_BIN_LOG::open() will write the buffered description event.
 
3318
    */
 
3319
    if ((ev=Log_event::read_log_event(cur_log,0,
 
3320
                                      rli->relay_log.description_event_for_exec)))
 
3321
 
 
3322
    {
 
3323
      assert(session==rli->sql_session);
 
3324
      /*
 
3325
        read it while we have a lock, to avoid a mutex lock in
 
3326
        inc_event_relay_log_pos()
 
3327
      */
 
3328
      rli->future_event_relay_log_pos= my_b_tell(cur_log);
 
3329
      if (hot_log)
 
3330
        pthread_mutex_unlock(log_lock);
 
3331
      return(ev);
 
3332
    }
 
3333
    assert(session==rli->sql_session);
 
3334
    if (opt_reckless_slave)                     // For mysql-test
 
3335
      cur_log->error = 0;
 
3336
    if (cur_log->error < 0)
 
3337
    {
 
3338
      errmsg = "slave SQL thread aborted because of I/O error";
 
3339
      if (hot_log)
 
3340
        pthread_mutex_unlock(log_lock);
 
3341
      goto err;
 
3342
    }
 
3343
    if (!cur_log->error) /* EOF */
 
3344
    {
 
3345
      /*
 
3346
        On a hot log, EOF means that there are no more updates to
 
3347
        process and we must block until I/O thread adds some and
 
3348
        signals us to continue
 
3349
      */
 
3350
      if (hot_log)
 
3351
      {
 
3352
        /*
 
3353
          We say in Seconds_Behind_Master that we have "caught up". Note that
 
3354
          for example if network link is broken but I/O slave thread hasn't
 
3355
          noticed it (slave_net_timeout not elapsed), then we'll say "caught
 
3356
          up" whereas we're not really caught up. Fixing that would require
 
3357
          internally cutting timeout in smaller pieces in network read, no
 
3358
          thanks. Another example: SQL has caught up on I/O, now I/O has read
 
3359
          a new event and is queuing it; the false "0" will exist until SQL
 
3360
          finishes executing the new event; it will be look abnormal only if
 
3361
          the events have old timestamps (then you get "many", 0, "many").
 
3362
 
 
3363
          Transient phases like this can be fixed with implemeting
 
3364
          Heartbeat event which provides the slave the status of the
 
3365
          master at time the master does not have any new update to send.
 
3366
          Seconds_Behind_Master would be zero only when master has no
 
3367
          more updates in binlog for slave. The heartbeat can be sent
 
3368
          in a (small) fraction of slave_net_timeout. Until it's done
 
3369
          rli->last_master_timestamp is temporarely (for time of
 
3370
          waiting for the following event) reset whenever EOF is
 
3371
          reached.
 
3372
        */
 
3373
        time_t save_timestamp= rli->last_master_timestamp;
 
3374
        rli->last_master_timestamp= 0;
 
3375
 
 
3376
        assert(rli->relay_log.get_open_count() ==
 
3377
                    rli->cur_log_old_open_count);
 
3378
 
 
3379
        if (rli->ign_master_log_name_end[0])
 
3380
        {
 
3381
          /* We generate and return a Rotate, to make our positions advance */
 
3382
          ev= new Rotate_log_event(rli->ign_master_log_name_end,
 
3383
                                   0, rli->ign_master_log_pos_end,
 
3384
                                   Rotate_log_event::DUP_NAME);
 
3385
          rli->ign_master_log_name_end[0]= 0;
 
3386
          pthread_mutex_unlock(log_lock);
 
3387
          if (unlikely(!ev))
 
3388
          {
 
3389
            errmsg= "Slave SQL thread failed to create a Rotate event "
 
3390
              "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
 
3391
            goto err;
 
3392
          }
 
3393
          ev->server_id= 0; // don't be ignored by slave SQL thread
 
3394
          return(ev);
 
3395
        }
 
3396
 
 
3397
        /*
 
3398
          We can, and should release data_lock while we are waiting for
 
3399
          update. If we do not, show slave status will block
 
3400
        */
 
3401
        pthread_mutex_unlock(&rli->data_lock);
 
3402
 
 
3403
        /*
 
3404
          Possible deadlock :
 
3405
          - the I/O thread has reached log_space_limit
 
3406
          - the SQL thread has read all relay logs, but cannot purge for some
 
3407
          reason:
 
3408
            * it has already purged all logs except the current one
 
3409
            * there are other logs than the current one but they're involved in
 
3410
            a transaction that finishes in the current one (or is not finished)
 
3411
          Solution :
 
3412
          Wake up the possibly waiting I/O thread, and set a boolean asking
 
3413
          the I/O thread to temporarily ignore the log_space_limit
 
3414
          constraint, because we do not want the I/O thread to block because of
 
3415
          space (it's ok if it blocks for any other reason (e.g. because the
 
3416
          master does not send anything). Then the I/O thread stops waiting
 
3417
          and reads more events.
 
3418
          The SQL thread decides when the I/O thread should take log_space_limit
 
3419
          into account again : ignore_log_space_limit is reset to 0
 
3420
          in purge_first_log (when the SQL thread purges the just-read relay
 
3421
          log), and also when the SQL thread starts. We should also reset
 
3422
          ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
 
3423
          fact, no need as RESET SLAVE requires that the slave
 
3424
          be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
 
3425
          it stops.
 
3426
        */
 
3427
        pthread_mutex_lock(&rli->log_space_lock);
 
3428
        // prevent the I/O thread from blocking next times
 
3429
        rli->ignore_log_space_limit= 1;
 
3430
        /*
 
3431
          If the I/O thread is blocked, unblock it.  Ok to broadcast
 
3432
          after unlock, because the mutex is only destroyed in
 
3433
          ~Relay_log_info(), i.e. when rli is destroyed, and rli will
 
3434
          not be destroyed before we exit the present function.
 
3435
        */
 
3436
        pthread_mutex_unlock(&rli->log_space_lock);
 
3437
        pthread_cond_broadcast(&rli->log_space_cond);
 
3438
        // Note that wait_for_update_relay_log unlocks lock_log !
 
3439
        rli->relay_log.wait_for_update_relay_log(rli->sql_session);
 
3440
        // re-acquire data lock since we released it earlier
 
3441
        pthread_mutex_lock(&rli->data_lock);
 
3442
        rli->last_master_timestamp= save_timestamp;
 
3443
        continue;
 
3444
      }
 
3445
      /*
 
3446
        If the log was not hot, we need to move to the next log in
 
3447
        sequence. The next log could be hot or cold, we deal with both
 
3448
        cases separately after doing some common initialization
 
3449
      */
 
3450
      end_io_cache(cur_log);
 
3451
      assert(rli->cur_log_fd >= 0);
 
3452
      my_close(rli->cur_log_fd, MYF(MY_WME));
 
3453
      rli->cur_log_fd = -1;
 
3454
 
 
3455
      if (relay_log_purge)
 
3456
      {
 
3457
        /*
 
3458
          purge_first_log will properly set up relay log coordinates in rli.
 
3459
          If the group's coordinates are equal to the event's coordinates
 
3460
          (i.e. the relay log was not rotated in the middle of a group),
 
3461
          we can purge this relay log too.
 
3462
          We do uint64_t and string comparisons, this may be slow but
 
3463
          - purging the last relay log is nice (it can save 1GB of disk), so we
 
3464
          like to detect the case where we can do it, and given this,
 
3465
          - I see no better detection method
 
3466
          - purge_first_log is not called that often
 
3467
        */
 
3468
        if (rli->relay_log.purge_first_log
 
3469
            (rli,
 
3470
             rli->group_relay_log_pos == rli->event_relay_log_pos
 
3471
             && !strcmp(rli->group_relay_log_name.c_str(), rli->event_relay_log_name.c_str())))
 
3472
        {
 
3473
          errmsg = "Error purging processed logs";
 
3474
          goto err;
 
3475
        }
 
3476
      }
 
3477
      else
 
3478
      {
 
3479
        /*
 
3480
          If hot_log is set, then we already have a lock on
 
3481
          LOCK_log.  If not, we have to get the lock.
 
3482
 
 
3483
          According to Sasha, the only time this code will ever be executed
 
3484
          is if we are recovering from a bug.
 
3485
        */
 
3486
        if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
 
3487
        {
 
3488
          errmsg = "error switching to the next log";
 
3489
          goto err;
 
3490
        }
 
3491
        rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
 
3492
        rli->event_relay_log_name.assign(rli->linfo.log_file_name);
 
3493
        flush_relay_log_info(rli);
 
3494
      }
 
3495
 
 
3496
      /*
 
3497
        Now we want to open this next log. To know if it's a hot log (the one
 
3498
        being written by the I/O thread now) or a cold log, we can use
 
3499
        is_active(); if it is hot, we use the I/O cache; if it's cold we open
 
3500
        the file normally. But if is_active() reports that the log is hot, this
 
3501
        may change between the test and the consequence of the test. So we may
 
3502
        open the I/O cache whereas the log is now cold, which is nonsense.
 
3503
        To guard against this, we need to have LOCK_log.
 
3504
      */
 
3505
 
 
3506
      if (!hot_log) /* if hot_log, we already have this mutex */
 
3507
        pthread_mutex_lock(log_lock);
 
3508
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
 
3509
      {
 
3510
#ifdef EXTRA_DEBUG
 
3511
        if (global_system_variables.log_warnings)
 
3512
          sql_print_information(_("next log '%s' is currently active"),
 
3513
                                rli->linfo.log_file_name);
 
3514
#endif
 
3515
        rli->cur_log= cur_log= rli->relay_log.get_log_file();
 
3516
        rli->cur_log_old_open_count= rli->relay_log.get_open_count();
 
3517
        assert(rli->cur_log_fd == -1);
 
3518
 
 
3519
        /*
 
3520
          Read pointer has to be at the start since we are the only
 
3521
          reader.
 
3522
          We must keep the LOCK_log to read the 4 first bytes, as this is a hot
 
3523
          log (same as when we call read_log_event() above: for a hot log we
 
3524
          take the mutex).
 
3525
        */
 
3526
        if (check_binlog_magic(cur_log,&errmsg))
 
3527
        {
 
3528
          if (!hot_log) pthread_mutex_unlock(log_lock);
 
3529
          goto err;
 
3530
        }
 
3531
        if (!hot_log) pthread_mutex_unlock(log_lock);
 
3532
        continue;
 
3533
      }
 
3534
      if (!hot_log) pthread_mutex_unlock(log_lock);
 
3535
      /*
 
3536
        if we get here, the log was not hot, so we will have to open it
 
3537
        ourselves. We are sure that the log is still not hot now (a log can get
 
3538
        from hot to cold, but not from cold to hot). No need for LOCK_log.
 
3539
      */
 
3540
#ifdef EXTRA_DEBUG
 
3541
      if (global_system_variables.log_warnings)
 
3542
        sql_print_information(_("next log '%s' is not active"),
 
3543
                              rli->linfo.log_file_name);
 
3544
#endif
 
3545
      // open_binlog() will check the magic header
 
3546
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
 
3547
                                       &errmsg)) <0)
 
3548
        goto err;
 
3549
    }
 
3550
    else
 
3551
    {
 
3552
      /*
 
3553
        Read failed with a non-EOF error.
 
3554
        TODO: come up with something better to handle this error
 
3555
      */
 
3556
      if (hot_log)
 
3557
        pthread_mutex_unlock(log_lock);
 
3558
      sql_print_error(_("Slave SQL thread: I/O error reading "
 
3559
                        "event(errno: %d  cur_log->error: %d)"),
 
3560
                      my_errno,cur_log->error);
 
3561
      // set read position to the beginning of the event
 
3562
      my_b_seek(cur_log,rli->event_relay_log_pos);
 
3563
      /* otherwise, we have had a partial read */
 
3564
      errmsg = _("Aborting slave SQL thread because of partial event read");
 
3565
      break;                                    // To end of function
 
3566
    }
 
3567
  }
 
3568
  if (!errmsg && global_system_variables.log_warnings)
 
3569
  {
 
3570
    sql_print_information(_("Error reading relay log event: %s"),
 
3571
                          _("slave SQL thread was killed"));
 
3572
    return(0);
 
3573
  }
 
3574
 
 
3575
err:
 
3576
  if (errmsg)
 
3577
    sql_print_error(_("Error reading relay log event: %s"), errmsg);
 
3578
  return(0);
 
3579
}
 
3580
 
 
3581
/*
 
3582
  Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
 
3583
  because of size is simpler because when we do it we already have all relevant
 
3584
  locks; here we don't, so this function is mainly taking locks).
 
3585
  Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
 
3586
  is void).
 
3587
*/
 
3588
 
 
3589
void rotate_relay_log(Master_info* mi)
 
3590
{
 
3591
  Relay_log_info* rli= &mi->rli;
 
3592
 
 
3593
  /* We don't lock rli->run_lock. This would lead to deadlocks. */
 
3594
  pthread_mutex_lock(&mi->run_lock);
 
3595
 
 
3596
  /*
 
3597
     We need to test inited because otherwise, new_file() will attempt to lock
 
3598
     LOCK_log, which may not be inited (if we're not a slave).
 
3599
  */
 
3600
  if (!rli->inited)
 
3601
  {
 
3602
    goto end;
 
3603
  }
 
3604
 
 
3605
  /* If the relay log is closed, new_file() will do nothing. */
 
3606
  rli->relay_log.new_file();
 
3607
 
 
3608
  /*
 
3609
    We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
 
3610
    be counted, so imagine a succession of FLUSH LOGS  and assume the slave
 
3611
    threads are started:
 
3612
    relay_log_space decreases by the size of the deleted relay log, but does
 
3613
    not increase, so flush-after-flush we may become negative, which is wrong.
 
3614
    Even if this will be corrected as soon as a query is replicated on the
 
3615
    slave (because the I/O thread will then call harvest_bytes_written() which
 
3616
    will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
 
3617
    output in SHOW SLAVE STATUS meanwhile. So we harvest now.
 
3618
    If the log is closed, then this will just harvest the last writes, probably
 
3619
    0 as they probably have been harvested.
 
3620
  */
 
3621
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3622
end:
 
3623
  pthread_mutex_unlock(&mi->run_lock);
 
3624
  return;
 
3625
}
 
3626
 
 
3627
 
 
3628
/**
 
3629
   Detects, based on master's version (as found in the relay log), if master
 
3630
   has a certain bug.
 
3631
   @param rli Relay_log_info which tells the master's version
 
3632
   @param bug_id Number of the bug as found in bugs.mysql.com
 
3633
   @param report bool report error message, default TRUE
 
3634
   @return true if master has the bug, FALSE if it does not.
 
3635
*/
 
3636
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
 
3637
{
 
3638
  struct st_version_range_for_one_bug {
 
3639
    uint32_t        bug_id;
 
3640
    const unsigned char introduced_in[3]; // first version with bug
 
3641
    const unsigned char fixed_in[3];      // first version with fix
 
3642
  };
 
3643
  static struct st_version_range_for_one_bug versions_for_all_bugs[]=
 
3644
  {
 
3645
    {24432, { 5, 0, 24 }, { 5, 0, 38 } },
 
3646
    {24432, { 5, 1, 12 }, { 5, 1, 17 } },
 
3647
    {33029, { 5, 0,  0 }, { 5, 0, 58 } },
 
3648
    {33029, { 5, 1,  0 }, { 5, 1, 12 } },
 
3649
  };
 
3650
  const unsigned char *master_ver=
 
3651
    rli->relay_log.description_event_for_exec->server_version_split;
 
3652
 
 
3653
  assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
 
3654
 
 
3655
  for (uint32_t i= 0;
 
3656
       i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
 
3657
  {
 
3658
    const unsigned char *introduced_in= versions_for_all_bugs[i].introduced_in,
 
3659
      *fixed_in= versions_for_all_bugs[i].fixed_in;
 
3660
    if ((versions_for_all_bugs[i].bug_id == bug_id) &&
 
3661
        (memcmp(introduced_in, master_ver, 3) <= 0) &&
 
3662
        (memcmp(fixed_in,      master_ver, 3) >  0))
 
3663
    {
 
3664
      if (!report)
 
3665
        return true;
 
3666
 
 
3667
      // a short message for SHOW SLAVE STATUS (message length constraints)
 
3668
      my_printf_error(ER_UNKNOWN_ERROR,
 
3669
                      _("master may suffer from"
 
3670
                        " http://bugs.mysql.com/bug.php?id=%u"
 
3671
                        " so slave stops; check error log on slave"
 
3672
                        " for more info"), MYF(0), bug_id);
 
3673
      // a verbose message for the error log
 
3674
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
 
3675
                  _("According to the master's version ('%s'),"
 
3676
                    " it is probable that master suffers from this bug:"
 
3677
                    " http://bugs.mysql.com/bug.php?id=%u"
 
3678
                    " and thus replicating the current binary log event"
 
3679
                    " may make the slave's data become different from the"
 
3680
                    " master's data."
 
3681
                    " To take no risk, slave refuses to replicate"
 
3682
                    " this event and stops."
 
3683
                    " We recommend that all updates be stopped on the"
 
3684
                    " master and slave, that the data of both be"
 
3685
                    " manually synchronized,"
 
3686
                    " that master's binary logs be deleted,"
 
3687
                    " that master be upgraded to a version at least"
 
3688
                    " equal to '%d.%d.%d'. Then replication can be"
 
3689
                    " restarted."),
 
3690
                  rli->relay_log.description_event_for_exec->server_version,
 
3691
                  bug_id,
 
3692
                  fixed_in[0], fixed_in[1], fixed_in[2]);
 
3693
      return true;
 
3694
    }
 
3695
  }
 
3696
  return false;
 
3697
}
 
3698
 
 
3699
/**
 
3700
   BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
 
3701
   exclusive, if one statement in a SP generated AUTO_INCREMENT value
 
3702
   by the top statement, all statements after it would be considered
 
3703
   generated AUTO_INCREMENT value by the top statement, and a
 
3704
   erroneous INSERT_ID value might be associated with these statement,
 
3705
   which could cause duplicate entry error and stop the slave.
 
3706
 
 
3707
   Detect buggy master to work around.
 
3708
 */
 
3709
bool rpl_master_erroneous_autoinc(Session *session)
 
3710
{
 
3711
  if (active_mi && active_mi->rli.sql_session == session)
 
3712
  {
 
3713
    Relay_log_info *rli= &active_mi->rli;
 
3714
    return rpl_master_has_bug(rli, 33029, false);
 
3715
  }
 
3716
  return false;
 
3717
}
 
3718
 
 
3719
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
3720
template class I_List_iterator<i_string>;
 
3721
template class I_List_iterator<i_string_pair>;
 
3722
#endif
 
3723
 
 
3724
/**
 
3725
  @} (end of group Replication)
 
3726
*/