~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/slave.cc

  • Committer: Lee
  • Date: 2009-01-01 03:07:33 UTC
  • mto: (758.1.3 devel)
  • mto: This revision was merged to the branch mainline in revision 759.
  • Revision ID: lbieber@lbieber-desktop-20090101030733-fb411b55f07vij8q
more header file cleanup

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2003 DRIZZLE AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/**
 
18
  @addtogroup Replication
 
19
  @{
 
20
 
 
21
  @file
 
22
 
 
23
  @brief Code to run the io thread and the sql thread on the
 
24
  replication slave.
 
25
*/
 
26
#include <drizzled/server_includes.h>
 
27
 
 
28
#include <storage/myisam/myisam.h>
 
29
#include <drizzled/replication/mi.h>
 
30
#include <drizzled/replication/rli.h>
 
31
#include <drizzled/replication/replication.h>
 
32
#include <libdrizzle/libdrizzle.h>
 
33
#include <mysys/hash.h>
 
34
#include <mysys/thr_alarm.h>
 
35
#include <libdrizzle/errmsg.h>
 
36
#include <mysys/mysys_err.h>
 
37
#include <drizzled/error.h>
 
38
#include <drizzled/sql_parse.h>
 
39
#include <drizzled/gettext.h>
 
40
#include <signal.h>
 
41
#include <drizzled/session.h>
 
42
#include <drizzled/log_event.h>
 
43
#include <drizzled/item/empty_string.h>
 
44
#include <drizzled/item/return_int.h>
 
45
 
 
46
#if TIME_WITH_SYS_TIME
 
47
# include <sys/time.h>
 
48
# include <time.h>
 
49
#else
 
50
# if HAVE_SYS_TIME_H
 
51
#  include <sys/time.h>
 
52
# else
 
53
#  include <time.h>
 
54
# endif
 
55
#endif
 
56
 
 
57
#include <drizzled/tztime.h>
 
58
 
 
59
#include <drizzled/replication/tblmap.h>
 
60
 
 
61
#define MAX_SLAVE_RETRY_PAUSE 5
 
62
bool use_slave_mask = 0;
 
63
MY_BITMAP slave_error_mask;
 
64
 
 
65
typedef bool (*CHECK_KILLED_FUNC)(Session*,void*);
 
66
 
 
67
char* slave_load_tmpdir = 0;
 
68
Master_info *active_mi= 0;
 
69
bool replicate_same_server_id;
 
70
uint64_t relay_log_space_limit = 0;
 
71
 
 
72
/*
 
73
  When slave thread exits, we need to remember the temporary tables so we
 
74
  can re-use them on slave start.
 
75
 
 
76
  TODO: move the vars below under Master_info
 
77
*/
 
78
 
 
79
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
 
80
int32_t events_till_abort = -1;
 
81
 
 
82
enum enum_slave_reconnect_actions
 
83
{
 
84
  SLAVE_RECON_ACT_REG= 0,
 
85
  SLAVE_RECON_ACT_DUMP= 1,
 
86
  SLAVE_RECON_ACT_EVENT= 2,
 
87
  SLAVE_RECON_ACT_MAX
 
88
};
 
89
 
 
90
enum enum_slave_reconnect_messages
 
91
{
 
92
  SLAVE_RECON_MSG_WAIT= 0,
 
93
  SLAVE_RECON_MSG_KILLED_WAITING= 1,
 
94
  SLAVE_RECON_MSG_AFTER= 2,
 
95
  SLAVE_RECON_MSG_FAILED= 3,
 
96
  SLAVE_RECON_MSG_COMMAND= 4,
 
97
  SLAVE_RECON_MSG_KILLED_AFTER= 5,
 
98
  SLAVE_RECON_MSG_MAX
 
99
};
 
100
 
 
101
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
 
102
{
 
103
  {
 
104
    N_("Waiting to reconnect after a failed registration on master"),
 
105
    N_("Slave I/O thread killed while waiting to reconnect after a "
 
106
                 "failed registration on master"),
 
107
    N_("Reconnecting after a failed registration on master"),
 
108
    N_("failed registering on master, reconnecting to try again, "
 
109
                 "log '%s' at position %s"),
 
110
    "COM_REGISTER_SLAVE",
 
111
    N_("Slave I/O thread killed during or after reconnect")
 
112
  },
 
113
  {
 
114
    N_("Waiting to reconnect after a failed binlog dump request"),
 
115
    N_("Slave I/O thread killed while retrying master dump"),
 
116
    N_("Reconnecting after a failed binlog dump request"),
 
117
    N_("failed dump request, reconnecting to try again, "
 
118
                 "log '%s' at position %s"),
 
119
    "COM_BINLOG_DUMP",
 
120
    N_("Slave I/O thread killed during or after reconnect")
 
121
  },
 
122
  {
 
123
    N_("Waiting to reconnect after a failed master event read"),
 
124
    N_("Slave I/O thread killed while waiting to reconnect "
 
125
                 "after a failed read"),
 
126
    N_("Reconnecting after a failed master event read"),
 
127
    N_("Slave I/O thread: Failed reading log event, "
 
128
                 "reconnecting to retry, log '%s' at position %s"),
 
129
    "",
 
130
    N_("Slave I/O thread killed during or after a "
 
131
                 "reconnect done to recover from failed read")
 
132
  }
 
133
};
 
134
 
 
135
 
 
136
typedef enum { SLAVE_Session_IO, SLAVE_Session_SQL} SLAVE_Session_TYPE;
 
137
 
 
138
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
 
139
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
 
140
static bool wait_for_relay_log_space(Relay_log_info* rli);
 
141
static inline bool io_slave_killed(Session* session,Master_info* mi);
 
142
static inline bool sql_slave_killed(Session* session,Relay_log_info* rli);
 
143
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type);
 
144
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi);
 
145
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
 
146
                          bool suppress_warnings);
 
147
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
 
148
                             bool reconnect, bool suppress_warnings);
 
149
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
150
                      void* thread_killed_arg);
 
151
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
 
152
static Log_event* next_event(Relay_log_info* rli);
 
153
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
 
154
static int32_t terminate_slave_thread(Session *session,
 
155
                                  pthread_mutex_t* term_lock,
 
156
                                  pthread_cond_t* term_cond,
 
157
                                  volatile uint32_t *slave_running,
 
158
                                  bool skip_lock);
 
159
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info);
 
160
 
 
161
/*
 
162
  Find out which replications threads are running
 
163
 
 
164
  SYNOPSIS
 
165
    init_thread_mask()
 
166
    mask                Return value here
 
167
    mi                  master_info for slave
 
168
    inverse             If set, returns which threads are not running
 
169
 
 
170
  IMPLEMENTATION
 
171
    Get a bit mask for which threads are running so that we can later restart
 
172
    these threads.
 
173
 
 
174
  RETURN
 
175
    mask        If inverse == 0, running threads
 
176
                If inverse == 1, stopped threads
 
177
*/
 
178
 
 
179
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
 
180
{
 
181
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
 
182
  register int32_t tmp_mask=0;
 
183
 
 
184
  if (set_io)
 
185
    tmp_mask |= SLAVE_IO;
 
186
  if (set_sql)
 
187
    tmp_mask |= SLAVE_SQL;
 
188
  if (inverse)
 
189
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
 
190
  *mask = tmp_mask;
 
191
  return;
 
192
}
 
193
 
 
194
 
 
195
/*
 
196
  lock_slave_threads()
 
197
*/
 
198
 
 
199
void lock_slave_threads(Master_info* mi)
 
200
{
 
201
  //TODO: see if we can do this without dual mutex
 
202
  pthread_mutex_lock(&mi->run_lock);
 
203
  pthread_mutex_lock(&mi->rli.run_lock);
 
204
  return;
 
205
}
 
206
 
 
207
 
 
208
/*
 
209
  unlock_slave_threads()
 
210
*/
 
211
 
 
212
void unlock_slave_threads(Master_info* mi)
 
213
{
 
214
  //TODO: see if we can do this without dual mutex
 
215
  pthread_mutex_unlock(&mi->rli.run_lock);
 
216
  pthread_mutex_unlock(&mi->run_lock);
 
217
  return;
 
218
}
 
219
 
 
220
 
 
221
/* Initialize slave structures */
 
222
 
 
223
int32_t init_slave()
 
224
{
 
225
  /*
 
226
    This is called when mysqld starts. Before client connections are
 
227
    accepted. However bootstrap may conflict with us if it does START SLAVE.
 
228
    So it's safer to take the lock.
 
229
  */
 
230
  pthread_mutex_lock(&LOCK_active_mi);
 
231
  /*
 
232
    TODO: re-write this to interate through the list of files
 
233
    for multi-master
 
234
  */
 
235
  active_mi= new Master_info;
 
236
 
 
237
  /*
 
238
    If master_host is not specified, try to read it from the master_info file.
 
239
    If master_host is specified, create the master_info file if it doesn't
 
240
    exists.
 
241
  */
 
242
  if (!active_mi)
 
243
  {
 
244
    sql_print_error(_("Failed to allocate memory for the master info structure"));
 
245
    goto err;
 
246
  }
 
247
 
 
248
  if (active_mi->init_master_info(master_info_file, relay_log_info_file, (SLAVE_IO | SLAVE_SQL)))
 
249
  {
 
250
    sql_print_error(_("Failed to initialize the master info structure"));
 
251
    goto err;
 
252
  }
 
253
 
 
254
  /* If server id is not set, start_slave_thread() will say it */
 
255
 
 
256
  if (active_mi->host[0] && !opt_skip_slave_start)
 
257
  {
 
258
    if (start_slave_threads(1 /* need mutex */,
 
259
                            0 /* no wait for start*/,
 
260
                            active_mi,
 
261
                            master_info_file,
 
262
                            relay_log_info_file,
 
263
                            SLAVE_IO | SLAVE_SQL))
 
264
    {
 
265
      sql_print_error(_("Failed to create slave threads"));
 
266
      goto err;
 
267
    }
 
268
  }
 
269
  pthread_mutex_unlock(&LOCK_active_mi);
 
270
  return(0);
 
271
 
 
272
err:
 
273
  pthread_mutex_unlock(&LOCK_active_mi);
 
274
  return(1);
 
275
}
 
276
 
 
277
 
 
278
/*
 
279
  Init function to set up array for errors that should be skipped for slave
 
280
 
 
281
  SYNOPSIS
 
282
    init_slave_skip_errors()
 
283
    arg         List of errors numbers to skip, separated with ','
 
284
 
 
285
  NOTES
 
286
    Called from get_options() in mysqld.cc on start-up
 
287
*/
 
288
 
 
289
void init_slave_skip_errors(const char* arg)
 
290
{
 
291
  const char *p;
 
292
 
 
293
  if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
 
294
  {
 
295
    fprintf(stderr, "Badly out of memory, please check your system status\n");
 
296
    exit(1);
 
297
  }
 
298
  use_slave_mask = 1;
 
299
  for (;my_isspace(system_charset_info,*arg);++arg)
 
300
    /* empty */;
 
301
  if (!my_strnncoll(system_charset_info,(unsigned char*)arg,4,(const unsigned char*)"all",4))
 
302
  {
 
303
    bitmap_set_all(&slave_error_mask);
 
304
    return;
 
305
  }
 
306
  for (p= arg ; *p; )
 
307
  {
 
308
    long err_code;
 
309
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
 
310
      break;
 
311
    if (err_code < MAX_SLAVE_ERROR)
 
312
       bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
 
313
    while (!my_isdigit(system_charset_info,*p) && *p)
 
314
      p++;
 
315
  }
 
316
  return;
 
317
}
 
318
 
 
319
 
 
320
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
 
321
{
 
322
  if (!mi->inited)
 
323
    return(0); /* successfully do nothing */
 
324
  int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
 
325
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
 
326
 
 
327
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
 
328
  {
 
329
    mi->abort_slave=1;
 
330
    if ((error=terminate_slave_thread(mi->io_session,io_lock,
 
331
                                      &mi->stop_cond,
 
332
                                      &mi->slave_running,
 
333
                                      skip_lock)) &&
 
334
        !force_all)
 
335
      return(error);
 
336
  }
 
337
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
 
338
  {
 
339
    mi->rli.abort_slave=1;
 
340
    if ((error=terminate_slave_thread(mi->rli.sql_session,sql_lock,
 
341
                                      &mi->rli.stop_cond,
 
342
                                      &mi->rli.slave_running,
 
343
                                      skip_lock)) &&
 
344
        !force_all)
 
345
      return(error);
 
346
  }
 
347
  return(0);
 
348
}
 
349
 
 
350
 
 
351
/**
 
352
   Wait for a slave thread to terminate.
 
353
 
 
354
   This function is called after requesting the thread to terminate
 
355
   (by setting @c abort_slave member of @c Relay_log_info or @c
 
356
   Master_info structure to 1). Termination of the thread is
 
357
   controlled with the the predicate <code>*slave_running</code>.
 
358
 
 
359
   Function will acquire @c term_lock before waiting on the condition
 
360
   unless @c skip_lock is true in which case the mutex should be owned
 
361
   by the caller of this function and will remain acquired after
 
362
   return from the function.
 
363
 
 
364
   @param term_lock
 
365
          Associated lock to use when waiting for @c term_cond
 
366
 
 
367
   @param term_cond
 
368
          Condition that is signalled when the thread has terminated
 
369
 
 
370
   @param slave_running
 
371
          Pointer to predicate to check for slave thread termination
 
372
 
 
373
   @param skip_lock
 
374
          If @c true the lock will not be acquired before waiting on
 
375
          the condition. In this case, it is assumed that the calling
 
376
          function acquires the lock before calling this function.
 
377
 
 
378
   @retval 0 All OK
 
379
 */
 
380
static int32_t
 
381
terminate_slave_thread(Session *session,
 
382
                       pthread_mutex_t* term_lock,
 
383
                       pthread_cond_t* term_cond,
 
384
                       volatile uint32_t *slave_running,
 
385
                       bool skip_lock)
 
386
{
 
387
  int32_t error;
 
388
 
 
389
  if (!skip_lock)
 
390
    pthread_mutex_lock(term_lock);
 
391
 
 
392
  safe_mutex_assert_owner(term_lock);
 
393
 
 
394
  if (!*slave_running)
 
395
  {
 
396
    if (!skip_lock)
 
397
      pthread_mutex_unlock(term_lock);
 
398
    return(ER_SLAVE_NOT_RUNNING);
 
399
  }
 
400
  assert(session != 0);
 
401
  Session_CHECK_SENTRY(session);
 
402
 
 
403
  /*
 
404
    Is is critical to test if the slave is running. Otherwise, we might
 
405
    be referening freed memory trying to kick it
 
406
  */
 
407
 
 
408
  while (*slave_running)                        // Should always be true
 
409
  {
 
410
    pthread_mutex_lock(&session->LOCK_delete);
 
411
#ifndef DONT_USE_THR_ALARM
 
412
    /*
 
413
      Error codes from pthread_kill are:
 
414
      EINVAL: invalid signal number (can't happen)
 
415
      ESRCH: thread already killed (can happen, should be ignored)
 
416
    */
 
417
    int32_t err= pthread_kill(session->real_id, thr_client_alarm);
 
418
    assert(err != EINVAL);
 
419
#endif
 
420
    session->awake(Session::NOT_KILLED);
 
421
    pthread_mutex_unlock(&session->LOCK_delete);
 
422
 
 
423
    /*
 
424
      There is a small chance that slave thread might miss the first
 
425
      alarm. To protect againts it, resend the signal until it reacts
 
426
    */
 
427
    struct timespec abstime;
 
428
    set_timespec(abstime,2);
 
429
    error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
 
430
    assert(error == ETIMEDOUT || error == 0);
 
431
  }
 
432
 
 
433
  assert(*slave_running == 0);
 
434
 
 
435
  if (!skip_lock)
 
436
    pthread_mutex_unlock(term_lock);
 
437
  return(0);
 
438
}
 
439
 
 
440
 
 
441
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
 
442
                           pthread_mutex_t *cond_lock,
 
443
                           pthread_cond_t *start_cond,
 
444
                           volatile uint32_t *slave_running,
 
445
                           volatile uint32_t *slave_run_id,
 
446
                           Master_info* mi,
 
447
                           bool high_priority)
 
448
{
 
449
  pthread_t th;
 
450
  uint32_t start_id;
 
451
 
 
452
  assert(mi->inited);
 
453
 
 
454
  if (start_lock)
 
455
    pthread_mutex_lock(start_lock);
 
456
  if (!server_id)
 
457
  {
 
458
    if (start_cond)
 
459
      pthread_cond_broadcast(start_cond);
 
460
    if (start_lock)
 
461
      pthread_mutex_unlock(start_lock);
 
462
    sql_print_error(_("Server id not set, will not start slave"));
 
463
    return(ER_BAD_SLAVE);
 
464
  }
 
465
 
 
466
  if (*slave_running)
 
467
  {
 
468
    if (start_cond)
 
469
      pthread_cond_broadcast(start_cond);
 
470
    if (start_lock)
 
471
      pthread_mutex_unlock(start_lock);
 
472
    return(ER_SLAVE_MUST_STOP);
 
473
  }
 
474
  start_id= *slave_run_id;
 
475
  if (high_priority)
 
476
  {
 
477
    struct sched_param tmp_sched_param;
 
478
 
 
479
    memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
 
480
    tmp_sched_param.sched_priority= CONNECT_PRIOR;
 
481
    (void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
 
482
  }
 
483
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
 
484
  {
 
485
    if (start_lock)
 
486
      pthread_mutex_unlock(start_lock);
 
487
    return(ER_SLAVE_THREAD);
 
488
  }
 
489
  if (start_cond && cond_lock) // caller has cond_lock
 
490
  {
 
491
    Session* session = current_session;
 
492
    while (start_id == *slave_run_id)
 
493
    {
 
494
      const char* old_msg = session->enter_cond(start_cond,cond_lock,
 
495
                                            "Waiting for slave thread to start");
 
496
      pthread_cond_wait(start_cond,cond_lock);
 
497
      session->exit_cond(old_msg);
 
498
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
 
499
      if (session->killed)
 
500
        return(session->killed_errno());
 
501
    }
 
502
  }
 
503
  if (start_lock)
 
504
    pthread_mutex_unlock(start_lock);
 
505
  return(0);
 
506
}
 
507
 
 
508
 
 
509
/*
 
510
  start_slave_threads()
 
511
 
 
512
  NOTES
 
513
    SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
 
514
    sense to do that for starting a slave--we always care if it actually
 
515
    started the threads that were not previously running
 
516
*/
 
517
 
 
518
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
 
519
                        Master_info* mi, const char*, const char*,
 
520
                        int32_t thread_mask)
 
521
{
 
522
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
 
523
  pthread_cond_t* cond_io=0,*cond_sql=0;
 
524
  int32_t error=0;
 
525
 
 
526
  if (need_slave_mutex)
 
527
  {
 
528
    lock_io = &mi->run_lock;
 
529
    lock_sql = &mi->rli.run_lock;
 
530
  }
 
531
  if (wait_for_start)
 
532
  {
 
533
    cond_io = &mi->start_cond;
 
534
    cond_sql = &mi->rli.start_cond;
 
535
    lock_cond_io = &mi->run_lock;
 
536
    lock_cond_sql = &mi->rli.run_lock;
 
537
  }
 
538
 
 
539
  if (thread_mask & SLAVE_IO)
 
540
    error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
 
541
                              cond_io,
 
542
                              &mi->slave_running, &mi->slave_run_id,
 
543
                              mi, 1); //high priority, to read the most possible
 
544
  if (!error && (thread_mask & SLAVE_SQL))
 
545
  {
 
546
    error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
 
547
                              cond_sql,
 
548
                              &mi->rli.slave_running, &mi->rli.slave_run_id,
 
549
                              mi, 0);
 
550
    if (error)
 
551
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
 
552
  }
 
553
  return(error);
 
554
}
 
555
 
 
556
 
 
557
#ifdef NOT_USED_YET
 
558
static int32_t end_slave_on_walk(Master_info* mi, unsigned char* /*unused*/)
 
559
{
 
560
  end_master_info(mi);
 
561
  return(0);
 
562
}
 
563
#endif
 
564
 
 
565
 
 
566
/*
 
567
  Free all resources used by slave
 
568
 
 
569
  SYNOPSIS
 
570
    end_slave()
 
571
*/
 
572
 
 
573
void end_slave()
 
574
{
 
575
  /*
 
576
    This is called when the server terminates, in close_connections().
 
577
    It terminates slave threads. However, some CHANGE MASTER etc may still be
 
578
    running presently. If a START SLAVE was in progress, the mutex lock below
 
579
    will make us wait until slave threads have started, and START SLAVE
 
580
    returns, then we terminate them here.
 
581
  */
 
582
  pthread_mutex_lock(&LOCK_active_mi);
 
583
  if (active_mi)
 
584
  {
 
585
    /*
 
586
      TODO: replace the line below with
 
587
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
 
588
      once multi-master code is ready.
 
589
    */
 
590
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
 
591
    active_mi->end_master_info();
 
592
    delete active_mi;
 
593
    active_mi= 0;
 
594
  }
 
595
  pthread_mutex_unlock(&LOCK_active_mi);
 
596
  return;
 
597
}
 
598
 
 
599
 
 
600
static bool io_slave_killed(Session* session, Master_info* mi)
 
601
{
 
602
  assert(mi->io_session == session);
 
603
  assert(mi->slave_running); // tracking buffer overrun
 
604
  return(mi->abort_slave || abort_loop || session->killed);
 
605
}
 
606
 
 
607
 
 
608
static bool sql_slave_killed(Session* session, Relay_log_info* rli)
 
609
{
 
610
  assert(rli->sql_session == session);
 
611
  assert(rli->slave_running == 1);// tracking buffer overrun
 
612
  if (abort_loop || session->killed || rli->abort_slave)
 
613
  {
 
614
    /*
 
615
      If we are in an unsafe situation (stopping could corrupt replication),
 
616
      we give one minute to the slave SQL thread of grace before really
 
617
      terminating, in the hope that it will be able to read more events and
 
618
      the unsafe situation will soon be left. Note that this one minute starts
 
619
      from the last time anything happened in the slave SQL thread. So it's
 
620
      really one minute of idleness, we don't timeout if the slave SQL thread
 
621
      is actively working.
 
622
    */
 
623
    if (rli->last_event_start_time == 0)
 
624
      return(1);
 
625
    if (difftime(time(NULL), rli->last_event_start_time) > 60)
 
626
    {
 
627
      rli->report(ERROR_LEVEL, 0,
 
628
                  _("SQL thread had to stop in an unsafe situation, in "
 
629
                  "the middle of applying updates to a "
 
630
                  "non-transactional table without any primary key. "
 
631
                  "There is a risk of duplicate updates when the slave "
 
632
                  "SQL thread is restarted. Please check your tables' "
 
633
                  "contents after restart."));
 
634
      return(1);
 
635
    }
 
636
  }
 
637
  return(0);
 
638
}
 
639
 
 
640
 
 
641
/*
 
642
  skip_load_data_infile()
 
643
 
 
644
  NOTES
 
645
    This is used to tell a 3.23 master to break send_file()
 
646
*/
 
647
 
 
648
void skip_load_data_infile(NET *net)
 
649
{
 
650
  (void)net_request_file(net, "/dev/null");
 
651
  (void)my_net_read(net);                               // discard response
 
652
  (void)net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0); // ok
 
653
  return;
 
654
}
 
655
 
 
656
 
 
657
bool net_request_file(NET* net, const char* fname)
 
658
{
 
659
  return(net_write_command(net, 251, (unsigned char*) fname, strlen(fname),
 
660
                                (unsigned char*) "", 0));
 
661
}
 
662
 
 
663
/*
 
664
  From other comments and tests in code, it looks like
 
665
  sometimes Query_log_event and Load_log_event can have db == 0
 
666
  (see rewrite_db() above for example)
 
667
  (cases where this happens are unclear; it may be when the master is 3.23).
 
668
*/
 
669
 
 
670
const char *print_slave_db_safe(const char* db)
 
671
{
 
672
  return((db ? db : ""));
 
673
}
 
674
 
 
675
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
 
676
                                 const char *default_val)
 
677
{
 
678
  uint32_t length;
 
679
 
 
680
  if ((length=my_b_gets(f,var, max_size)))
 
681
  {
 
682
    char* last_p = var + length -1;
 
683
    if (*last_p == '\n')
 
684
      *last_p = 0; // if we stopped on newline, kill it
 
685
    else
 
686
    {
 
687
      /*
 
688
        If we truncated a line or stopped on last char, remove all chars
 
689
        up to and including newline.
 
690
      */
 
691
      int32_t c;
 
692
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
 
693
    }
 
694
    return(0);
 
695
  }
 
696
  else if (default_val)
 
697
  {
 
698
    strncpy(var,  default_val, max_size-1);
 
699
    return(0);
 
700
  }
 
701
  return(1);
 
702
}
 
703
 
 
704
 
 
705
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
 
706
{
 
707
  char buf[32];
 
708
 
 
709
 
 
710
  if (my_b_gets(f, buf, sizeof(buf)))
 
711
  {
 
712
    *var = atoi(buf);
 
713
    return(0);
 
714
  }
 
715
  else if (default_val)
 
716
  {
 
717
    *var = default_val;
 
718
    return(0);
 
719
  }
 
720
  return(1);
 
721
}
 
722
 
 
723
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
 
724
{
 
725
  char buf[16];
 
726
 
 
727
 
 
728
  if (my_b_gets(f, buf, sizeof(buf)))
 
729
  {
 
730
    if (sscanf(buf, "%f", var) != 1)
 
731
      return(1);
 
732
    else
 
733
      return(0);
 
734
  }
 
735
  else if (default_val != 0.0)
 
736
  {
 
737
    *var = default_val;
 
738
    return(0);
 
739
  }
 
740
  return(1);
 
741
}
 
742
 
 
743
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info)
 
744
{
 
745
  if (io_slave_killed(session, mi))
 
746
  {
 
747
    if (info && global_system_variables.log_warnings)
 
748
      sql_print_information("%s",info);
 
749
    return true;
 
750
  }
 
751
  return false;
 
752
}
 
753
 
 
754
 
 
755
/*
 
756
  Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
 
757
  relying on the binlog's version. This is not perfect: imagine an upgrade
 
758
  of the master without waiting that all slaves are in sync with the master;
 
759
  then a slave could be fooled about the binlog's format. This is what happens
 
760
  when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
 
761
  slaves are fooled. So we do this only to distinguish between 3.23 and more
 
762
  recent masters (it's too late to change things for 3.23).
 
763
 
 
764
  RETURNS
 
765
  0       ok
 
766
  1       error
 
767
*/
 
768
 
 
769
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
 
770
{
 
771
  char error_buf[512];
 
772
  String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
 
773
  char err_buff[MAX_SLAVE_ERRMSG];
 
774
  const char* errmsg= 0;
 
775
  int32_t err_code= 0;
 
776
  DRIZZLE_RES *master_res= 0;
 
777
  DRIZZLE_ROW master_row;
 
778
 
 
779
  err_msg.length(0);
 
780
  /*
 
781
    Free old description_event_for_queue (that is needed if we are in
 
782
    a reconnection).
 
783
  */
 
784
  delete mi->rli.relay_log.description_event_for_queue;
 
785
  mi->rli.relay_log.description_event_for_queue= 0;
 
786
 
 
787
  if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
 
788
  {
 
789
    errmsg = _("Master reported unrecognized DRIZZLE version");
 
790
    err_code= ER_SLAVE_FATAL_ERROR;
 
791
    sprintf(err_buff, ER(err_code), errmsg);
 
792
    err_msg.append(err_buff);
 
793
  }
 
794
  else
 
795
  {
 
796
    /*
 
797
      Note the following switch will bug when we have DRIZZLE branch 30 ;)
 
798
    */
 
799
    switch (*drizzle->server_version)
 
800
    {
 
801
    case '0':
 
802
    case '1':
 
803
    case '2':
 
804
      errmsg = _("Master reported unrecognized DRIZZLE version");
 
805
      err_code= ER_SLAVE_FATAL_ERROR;
 
806
      sprintf(err_buff, ER(err_code), errmsg);
 
807
      err_msg.append(err_buff);
 
808
      break;
 
809
    case '3':
 
810
      mi->rli.relay_log.description_event_for_queue= new
 
811
        Format_description_log_event(1, drizzle->server_version);
 
812
      break;
 
813
    case '4':
 
814
      mi->rli.relay_log.description_event_for_queue= new
 
815
        Format_description_log_event(3, drizzle->server_version);
 
816
      break;
 
817
    default:
 
818
      /*
 
819
        Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
 
820
        take the early steps (like tests for "is this a 3.23 master") which we
 
821
        have to take before we receive the real master's Format_desc which will
 
822
        override this one. Note that the Format_desc we create below is garbage
 
823
        (it has the format of the *slave*); it's only good to help know if the
 
824
        master is 3.23, 4.0, etc.
 
825
      */
 
826
      mi->rli.relay_log.description_event_for_queue= new
 
827
        Format_description_log_event(4, drizzle->server_version);
 
828
      break;
 
829
    }
 
830
  }
 
831
 
 
832
  /*
 
833
     This does not mean that a 5.0 slave will be able to read a 6.0 master; but
 
834
     as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
 
835
     can't read a 6.0 master, this will show up when the slave can't read some
 
836
     events sent by the master, and there will be error messages.
 
837
  */
 
838
 
 
839
  if (err_msg.length() != 0)
 
840
    goto err;
 
841
 
 
842
  /* as we are here, we tried to allocate the event */
 
843
  if (!mi->rli.relay_log.description_event_for_queue)
 
844
  {
 
845
    errmsg= _("default Format_description_log_event");
 
846
    err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
 
847
    sprintf(err_buff, ER(err_code), errmsg);
 
848
    err_msg.append(err_buff);
 
849
    goto err;
 
850
  }
 
851
 
 
852
  /*
 
853
    Compare the master and slave's clock. Do not die if master's clock is
 
854
    unavailable (very old master not supporting UNIX_TIMESTAMP()?).
 
855
  */
 
856
 
 
857
  if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
 
858
      (master_res= drizzle_store_result(drizzle)) &&
 
859
      (master_row= drizzle_fetch_row(master_res)))
 
860
  {
 
861
    mi->clock_diff_with_master=
 
862
      (long) (time(NULL) - strtoul(master_row[0], 0, 10));
 
863
  }
 
864
  else if (!check_io_slave_killed(mi->io_session, mi, NULL))
 
865
  {
 
866
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
 
867
    sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
 
868
                        "do not trust column Seconds_Behind_Master of SHOW "
 
869
                        "SLAVE STATUS. Error: %s (%d)"),
 
870
                      drizzle_error(drizzle), drizzle_errno(drizzle));
 
871
  }
 
872
  if (master_res)
 
873
    drizzle_free_result(master_res);
 
874
 
 
875
  /*
 
876
    Check that the master's server id and ours are different. Because if they
 
877
    are equal (which can result from a simple copy of master's datadir to slave,
 
878
    thus copying some drizzle.cnf), replication will work but all events will be
 
879
    skipped.
 
880
    Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
 
881
    master?).
 
882
    Note: we could have put a @@SERVER_ID in the previous SELECT
 
883
    UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
 
884
  */
 
885
  if (!drizzle_real_query(drizzle,
 
886
                        STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
 
887
      (master_res= drizzle_store_result(drizzle)))
 
888
  {
 
889
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
890
        (::server_id == strtoul(master_row[1], 0, 10)) &&
 
891
        !mi->rli.replicate_same_server_id)
 
892
    {
 
893
      errmsg=
 
894
        _("The slave I/O thread stops because master and slave have equal "
 
895
          "DRIZZLE server ids; these ids must be different "
 
896
          "for replication to work (or "
 
897
          "the --replicate-same-server-id option must be used "
 
898
          "on slave but this does "
 
899
          "not always make sense; please check the manual before using it).");
 
900
      err_code= ER_SLAVE_FATAL_ERROR;
 
901
      sprintf(err_buff, ER(err_code), errmsg);
 
902
      err_msg.append(err_buff);
 
903
    }
 
904
    drizzle_free_result(master_res);
 
905
    if (errmsg)
 
906
      goto err;
 
907
  }
 
908
 
 
909
  /*
 
910
    Check that the master's global character_set_server and ours are the same.
 
911
    Not fatal if query fails (old master?).
 
912
    Note that we don't check for equality of global character_set_client and
 
913
    collation_connection (neither do we prevent their setting in
 
914
    set_var.cc). That's because from what I (Guilhem) have tested, the global
 
915
    values of these 2 are never used (new connections don't use them).
 
916
    We don't test equality of global collation_database either as it's is
 
917
    going to be deprecated (made read-only) in 4.1 very soon.
 
918
    The test is only relevant if master < 5.0.3 (we'll test only if it's older
 
919
    than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
 
920
    charset info in each binlog event.
 
921
    We don't do it for 3.23 because masters <3.23.50 hang on
 
922
    SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
 
923
    test only if master is 4.x.
 
924
  */
 
925
 
 
926
  /* redundant with rest of code but safer against later additions */
 
927
  if (*drizzle->server_version == '3')
 
928
    goto err;
 
929
 
 
930
  if ((*drizzle->server_version == '4') &&
 
931
      !drizzle_real_query(drizzle,
 
932
                        STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
 
933
      (master_res= drizzle_store_result(drizzle)))
 
934
  {
 
935
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
936
        strcmp(master_row[0], global_system_variables.collation_server->name))
 
937
    {
 
938
      errmsg=
 
939
        _("The slave I/O thread stops because master and slave have"
 
940
          " different values for the COLLATION_SERVER global variable."
 
941
          " The values must be equal for replication to work");
 
942
      err_code= ER_SLAVE_FATAL_ERROR;
 
943
      sprintf(err_buff, ER(err_code), errmsg);
 
944
      err_msg.append(err_buff);
 
945
    }
 
946
    drizzle_free_result(master_res);
 
947
    if (errmsg)
 
948
      goto err;
 
949
  }
 
950
 
 
951
  /*
 
952
    Perform analogous check for time zone. Theoretically we also should
 
953
    perform check here to verify that SYSTEM time zones are the same on
 
954
    slave and master, but we can't rely on value of @@system_time_zone
 
955
    variable (it is time zone abbreviation) since it determined at start
 
956
    time and so could differ for slave and master even if they are really
 
957
    in the same system time zone. So we are omiting this check and just
 
958
    relying on documentation. Also according to Monty there are many users
 
959
    who are using replication between servers in various time zones. Hence
 
960
    such check will broke everything for them. (And now everything will
 
961
    work for them because by default both their master and slave will have
 
962
    'SYSTEM' time zone).
 
963
    This check is only necessary for 4.x masters (and < 5.0.4 masters but
 
964
    those were alpha).
 
965
  */
 
966
  if ((*drizzle->server_version == '4') &&
 
967
      !drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
 
968
      (master_res= drizzle_store_result(drizzle)))
 
969
  {
 
970
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
971
        strcmp(master_row[0],
 
972
               global_system_variables.time_zone->get_name()->ptr()))
 
973
    {
 
974
      errmsg=
 
975
        _("The slave I/O thread stops because master and slave have"
 
976
          " different values for the TIME_ZONE global variable."
 
977
          " The values must be equal for replication to work");
 
978
      err_code= ER_SLAVE_FATAL_ERROR;
 
979
      sprintf(err_buff, ER(err_code), errmsg);
 
980
      err_msg.append(err_buff);
 
981
    }
 
982
    drizzle_free_result(master_res);
 
983
 
 
984
    if (errmsg)
 
985
      goto err;
 
986
  }
 
987
 
 
988
  if (mi->heartbeat_period != 0.0)
 
989
  {
 
990
    char llbuf[22];
 
991
    const char query_format[]= "SET @master_heartbeat_period= %s";
 
992
    char query[sizeof(query_format) - 2 + sizeof(llbuf)];
 
993
    /*
 
994
       the period is an uint64_t of nano-secs.
 
995
    */
 
996
    llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
 
997
    sprintf(query, query_format, llbuf);
 
998
 
 
999
    if (drizzle_real_query(drizzle, query, strlen(query))
 
1000
        && !check_io_slave_killed(mi->io_session, mi, NULL))
 
1001
    {
 
1002
      err_msg.append("The slave I/O thread stops because querying master with '");
 
1003
      err_msg.append(query);
 
1004
      err_msg.append("' failed;");
 
1005
      err_msg.append(" error: ");
 
1006
      err_code= drizzle_errno(drizzle);
 
1007
      err_msg.qs_append(err_code);
 
1008
      err_msg.append("  '");
 
1009
      err_msg.append(drizzle_error(drizzle));
 
1010
      err_msg.append("'");
 
1011
      drizzle_free_result(drizzle_store_result(drizzle));
 
1012
      goto err;
 
1013
    }
 
1014
    drizzle_free_result(drizzle_store_result(drizzle));
 
1015
  }
 
1016
 
 
1017
err:
 
1018
  if (err_msg.length() != 0)
 
1019
  {
 
1020
    sql_print_error("%s",err_msg.ptr());
 
1021
    assert(err_code != 0);
 
1022
    mi->report(ERROR_LEVEL, err_code, "%s",err_msg.ptr());
 
1023
    return(1);
 
1024
  }
 
1025
 
 
1026
  return(0);
 
1027
}
 
1028
 
 
1029
 
 
1030
static bool wait_for_relay_log_space(Relay_log_info* rli)
 
1031
{
 
1032
  bool slave_killed=0;
 
1033
  Master_info* mi = rli->mi;
 
1034
  const char *save_proc_info;
 
1035
  Session* session = mi->io_session;
 
1036
 
 
1037
  pthread_mutex_lock(&rli->log_space_lock);
 
1038
  save_proc_info= session->enter_cond(&rli->log_space_cond,
 
1039
                                  &rli->log_space_lock,
 
1040
                                  _("Waiting for the slave SQL thread "
 
1041
                                    "to free enough relay log space"));
 
1042
  while (rli->log_space_limit < rli->log_space_total &&
 
1043
         !(slave_killed=io_slave_killed(session,mi)) &&
 
1044
         !rli->ignore_log_space_limit)
 
1045
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
 
1046
  session->exit_cond(save_proc_info);
 
1047
  return(slave_killed);
 
1048
}
 
1049
 
 
1050
 
 
1051
/*
 
1052
  Builds a Rotate from the ignored events' info and writes it to relay log.
 
1053
 
 
1054
  SYNOPSIS
 
1055
  write_ignored_events_info_to_relay_log()
 
1056
    session             pointer to I/O thread's session
 
1057
    mi
 
1058
 
 
1059
  DESCRIPTION
 
1060
    Slave I/O thread, going to die, must leave a durable trace of the
 
1061
    ignored events' end position for the use of the slave SQL thread, by
 
1062
    calling this function. Only that thread can call it (see assertion).
 
1063
 */
 
1064
static void write_ignored_events_info_to_relay_log(Session *session,
 
1065
                                                   Master_info *mi)
 
1066
{
 
1067
  Relay_log_info *rli= &mi->rli;
 
1068
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
 
1069
 
 
1070
  assert(session == mi->io_session);
 
1071
  pthread_mutex_lock(log_lock);
 
1072
  if (rli->ign_master_log_name_end[0])
 
1073
  {
 
1074
    Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
 
1075
                                               0, rli->ign_master_log_pos_end,
 
1076
                                               Rotate_log_event::DUP_NAME);
 
1077
    rli->ign_master_log_name_end[0]= 0;
 
1078
    /* can unlock before writing as slave SQL session will soon see our Rotate */
 
1079
    pthread_mutex_unlock(log_lock);
 
1080
    if (likely((bool)ev))
 
1081
    {
 
1082
      ev->server_id= 0; // don't be ignored by slave SQL thread
 
1083
      if (unlikely(rli->relay_log.append(ev)))
 
1084
        mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
1085
                   ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
1086
                   _("failed to write a Rotate event"
 
1087
                     " to the relay log, SHOW SLAVE STATUS may be"
 
1088
                     " inaccurate"));
 
1089
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
1090
      if (mi->flush())
 
1091
        sql_print_error(_("Failed to flush master info file"));
 
1092
      delete ev;
 
1093
    }
 
1094
    else
 
1095
      mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
 
1096
                 ER(ER_SLAVE_CREATE_EVENT_FAILURE),
 
1097
                 _("Rotate_event (out of memory?),"
 
1098
                   " SHOW SLAVE STATUS may be inaccurate"));
 
1099
  }
 
1100
  else
 
1101
    pthread_mutex_unlock(log_lock);
 
1102
  return;
 
1103
}
 
1104
 
 
1105
 
 
1106
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
 
1107
                             bool *suppress_warnings)
 
1108
{
 
1109
  unsigned char buf[1024], *pos= buf;
 
1110
  uint32_t report_host_len, report_user_len=0, report_password_len=0;
 
1111
 
 
1112
  *suppress_warnings= false;
 
1113
  if (!report_host)
 
1114
    return(0);
 
1115
  report_host_len= strlen(report_host);
 
1116
  /* 30 is a good safety margin */
 
1117
  if (report_host_len + report_user_len + report_password_len + 30 >
 
1118
      sizeof(buf))
 
1119
    return(0);                                     // safety
 
1120
 
 
1121
  int4store(pos, server_id); pos+= 4;
 
1122
  pos= net_store_data(pos, (unsigned char*) report_host, report_host_len);
 
1123
  pos= net_store_data(pos, NULL, report_user_len);
 
1124
  pos= net_store_data(pos, NULL, report_password_len);
 
1125
  int2store(pos, (uint16_t) report_port); pos+= 2;
 
1126
  int4store(pos, 0);    pos+= 4;
 
1127
  /* The master will fill in master_id */
 
1128
  int4store(pos, 0);                    pos+= 4;
 
1129
 
 
1130
  if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
 
1131
  {
 
1132
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1133
    {
 
1134
      *suppress_warnings= true;                 // Suppress reconnect warning
 
1135
    }
 
1136
    else if (!check_io_slave_killed(mi->io_session, mi, NULL))
 
1137
    {
 
1138
      char buf[256];
 
1139
      snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle),
 
1140
               drizzle_errno(drizzle));
 
1141
      mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
 
1142
                 ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
 
1143
    }
 
1144
    return(1);
 
1145
  }
 
1146
  return(0);
 
1147
}
 
1148
 
 
1149
 
 
1150
bool show_master_info(Session* session, Master_info* mi)
 
1151
{
 
1152
  // TODO: fix this for multi-master
 
1153
  List<Item> field_list;
 
1154
  Protocol *protocol= session->protocol;
 
1155
 
 
1156
  field_list.push_back(new Item_empty_string("Slave_IO_State",
 
1157
                                                     14));
 
1158
  field_list.push_back(new Item_empty_string("Master_Host",
 
1159
                                                     sizeof(mi->host)));
 
1160
  field_list.push_back(new Item_empty_string("Master_User",
 
1161
                                                     sizeof(mi->user)));
 
1162
  field_list.push_back(new Item_return_int("Master_Port", 7,
 
1163
                                           DRIZZLE_TYPE_LONG));
 
1164
  field_list.push_back(new Item_return_int("Connect_Retry", 10,
 
1165
                                           DRIZZLE_TYPE_LONG));
 
1166
  field_list.push_back(new Item_empty_string("Master_Log_File",
 
1167
                                             FN_REFLEN));
 
1168
  field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
 
1169
                                           DRIZZLE_TYPE_LONGLONG));
 
1170
  field_list.push_back(new Item_empty_string("Relay_Log_File",
 
1171
                                             FN_REFLEN));
 
1172
  field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
 
1173
                                           DRIZZLE_TYPE_LONGLONG));
 
1174
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
 
1175
                                             FN_REFLEN));
 
1176
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
 
1177
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
 
1178
  field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
 
1179
  field_list.push_back(new Item_empty_string("Last_Error", 20));
 
1180
  field_list.push_back(new Item_return_int("Skip_Counter", 10,
 
1181
                                           DRIZZLE_TYPE_LONG));
 
1182
  field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
 
1183
                                           DRIZZLE_TYPE_LONGLONG));
 
1184
  field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
 
1185
                                           DRIZZLE_TYPE_LONGLONG));
 
1186
  field_list.push_back(new Item_empty_string("Until_Condition", 6));
 
1187
  field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
 
1188
  field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
 
1189
                                           DRIZZLE_TYPE_LONGLONG));
 
1190
  field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
 
1191
                                           DRIZZLE_TYPE_LONGLONG));
 
1192
  field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
 
1193
  field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
 
1194
  field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
 
1195
  field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
 
1196
 
 
1197
  if (protocol->send_fields(&field_list,
 
1198
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1199
    return(true);
 
1200
 
 
1201
  if (mi->host[0])
 
1202
  {
 
1203
    String *packet= &session->packet;
 
1204
    protocol->prepare_for_resend();
 
1205
 
 
1206
    /*
 
1207
      slave_running can be accessed without run_lock but not other
 
1208
      non-volotile members like mi->io_session, which is guarded by the mutex.
 
1209
    */
 
1210
    pthread_mutex_lock(&mi->run_lock);
 
1211
    protocol->store(mi->io_session ? mi->io_session->get_proc_info() : "", &my_charset_bin);
 
1212
    pthread_mutex_unlock(&mi->run_lock);
 
1213
 
 
1214
    pthread_mutex_lock(&mi->data_lock);
 
1215
    pthread_mutex_lock(&mi->rli.data_lock);
 
1216
    protocol->store(mi->getHostname(), &my_charset_bin);
 
1217
    protocol->store(mi->getUsername(), &my_charset_bin);
 
1218
    protocol->store((uint32_t) mi->getPort());
 
1219
    protocol->store(mi->getConnectionRetry());
 
1220
    protocol->store(mi->getLogName(), &my_charset_bin);
 
1221
    protocol->store((uint64_t) mi->getLogPosition());
 
1222
    protocol->store(mi->rli.group_relay_log_name.c_str() +
 
1223
                    dirname_length(mi->rli.group_relay_log_name.c_str()),
 
1224
                    &my_charset_bin);
 
1225
    protocol->store((uint64_t) mi->rli.group_relay_log_pos);
 
1226
    protocol->store(mi->rli.group_master_log_name.c_str(), &my_charset_bin);
 
1227
    protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
 
1228
                    "Yes" : "No", &my_charset_bin);
 
1229
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
 
1230
 
 
1231
    protocol->store(mi->rli.last_error().number);
 
1232
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
 
1233
    protocol->store((uint32_t) mi->rli.slave_skip_counter);
 
1234
    protocol->store((uint64_t) mi->rli.group_master_log_pos);
 
1235
    protocol->store((uint64_t) mi->rli.log_space_total);
 
1236
 
 
1237
    protocol->store(
 
1238
      mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
 
1239
        ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
 
1240
          "Relay"), &my_charset_bin);
 
1241
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
 
1242
    protocol->store((uint64_t) mi->rli.until_log_pos);
 
1243
 
 
1244
    /*
 
1245
      Seconds_Behind_Master: if SQL thread is running and I/O thread is
 
1246
      connected, we can compute it otherwise show NULL (i.e. unknown).
 
1247
    */
 
1248
    if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
 
1249
        mi->rli.slave_running)
 
1250
    {
 
1251
      long time_diff= ((long)(time(NULL) - mi->rli.last_master_timestamp)
 
1252
                       - mi->clock_diff_with_master);
 
1253
      /*
 
1254
        Apparently on some systems time_diff can be <0. Here are possible
 
1255
        reasons related to MySQL:
 
1256
        - the master is itself a slave of another master whose time is ahead.
 
1257
        - somebody used an explicit SET TIMESTAMP on the master.
 
1258
        Possible reason related to granularity-to-second of time functions
 
1259
        (nothing to do with MySQL), which can explain a value of -1:
 
1260
        assume the master's and slave's time are perfectly synchronized, and
 
1261
        that at slave's connection time, when the master's timestamp is read,
 
1262
        it is at the very end of second 1, and (a very short time later) when
 
1263
        the slave's timestamp is read it is at the very beginning of second
 
1264
        2. Then the recorded value for master is 1 and the recorded value for
 
1265
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
 
1266
        between timestamp of slave and rli->last_master_timestamp is 0
 
1267
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
 
1268
        This confuses users, so we don't go below 0: hence the cmax().
 
1269
 
 
1270
        last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
 
1271
        special marker to say "consider we have caught up".
 
1272
      */
 
1273
      protocol->store((int64_t)(mi->rli.last_master_timestamp ?
 
1274
                                 cmax((long)0, time_diff) : 0));
 
1275
    }
 
1276
    else
 
1277
    {
 
1278
      protocol->store_null();
 
1279
    }
 
1280
 
 
1281
    // Last_IO_Errno
 
1282
    protocol->store(mi->last_error().number);
 
1283
    // Last_IO_Error
 
1284
    protocol->store(mi->last_error().message, &my_charset_bin);
 
1285
    // Last_SQL_Errno
 
1286
    protocol->store(mi->rli.last_error().number);
 
1287
    // Last_SQL_Error
 
1288
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
 
1289
 
 
1290
    pthread_mutex_unlock(&mi->rli.data_lock);
 
1291
    pthread_mutex_unlock(&mi->data_lock);
 
1292
 
 
1293
    if (my_net_write(&session->net, (unsigned char*) session->packet.ptr(), packet->length()))
 
1294
      return(true);
 
1295
  }
 
1296
  my_eof(session);
 
1297
  return(false);
 
1298
}
 
1299
 
 
1300
 
 
1301
void set_slave_thread_options(Session* session)
 
1302
{
 
1303
  /*
 
1304
     It's nonsense to constrain the slave threads with max_join_size; if a
 
1305
     query succeeded on master, we HAVE to execute it. So set
 
1306
     OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
 
1307
     (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
 
1308
     SELECT examining more than 4 billion rows would still fail (yes, because
 
1309
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
 
1310
     only for client threads.
 
1311
  */
 
1312
  uint64_t options= session->options | OPTION_BIG_SELECTS;
 
1313
  if (opt_log_slave_updates)
 
1314
    options|= OPTION_BIN_LOG;
 
1315
  else
 
1316
    options&= ~OPTION_BIN_LOG;
 
1317
  session->options= options;
 
1318
  session->variables.completion_type= 0;
 
1319
  return;
 
1320
}
 
1321
 
 
1322
/*
 
1323
  init_slave_thread()
 
1324
*/
 
1325
 
 
1326
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type)
 
1327
{
 
1328
  int32_t simulate_error= 0;
 
1329
  session->system_thread = (session_type == SLAVE_Session_SQL) ?
 
1330
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
 
1331
  session->security_ctx.skip_grants();
 
1332
  my_net_init(&session->net, 0);
 
1333
/*
 
1334
  Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
 
1335
  slave threads, since a replication event can become this much larger
 
1336
  than the corresponding packet (query) sent from client to master.
 
1337
*/
 
1338
  session->variables.max_allowed_packet= global_system_variables.max_allowed_packet
 
1339
    + MAX_LOG_EVENT_HEADER;  /* note, incr over the global not session var */
 
1340
  session->slave_thread = 1;
 
1341
  set_slave_thread_options(session);
 
1342
  session->client_capabilities = CLIENT_LOCAL_FILES;
 
1343
  pthread_mutex_lock(&LOCK_thread_count);
 
1344
  session->thread_id= session->variables.pseudo_thread_id= thread_id++;
 
1345
  pthread_mutex_unlock(&LOCK_thread_count);
 
1346
 
 
1347
 simulate_error|= (1 << SLAVE_Session_IO);
 
1348
 simulate_error|= (1 << SLAVE_Session_SQL);
 
1349
  if (init_thr_lock() || session->store_globals() || simulate_error & (1<< session_type))
 
1350
  {
 
1351
    session->cleanup();
 
1352
    return(-1);
 
1353
  }
 
1354
  lex_start(session);
 
1355
 
 
1356
  if (session_type == SLAVE_Session_SQL)
 
1357
    session->set_proc_info("Waiting for the next event in relay log");
 
1358
  else
 
1359
    session->set_proc_info("Waiting for master update");
 
1360
  session->version=refresh_version;
 
1361
  session->set_time();
 
1362
  return(0);
 
1363
}
 
1364
 
 
1365
/* Returns non zero on error */
 
1366
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
1367
                          void* thread_killed_arg)
 
1368
{
 
1369
  int32_t nap_time;
 
1370
  thr_alarm_t alarmed;
 
1371
 
 
1372
  thr_alarm_init(&alarmed);
 
1373
  time_t start_time, end_time;
 
1374
 
 
1375
  start_time= time(NULL);
 
1376
  if (start_time == (time_t)-1)
 
1377
    return -1;
 
1378
  end_time= start_time+sec;
 
1379
 
 
1380
  while ((nap_time= (int32_t) (end_time - start_time)) > 0)
 
1381
  {
 
1382
    ALARM alarm_buff;
 
1383
    /*
 
1384
      The only reason we are asking for alarm is so that
 
1385
      we will be woken up in case of murder, so if we do not get killed,
 
1386
      set the alarm so it goes off after we wake up naturally
 
1387
    */
 
1388
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
 
1389
    sleep(nap_time);
 
1390
    thr_end_alarm(&alarmed);
 
1391
 
 
1392
    if ((*thread_killed)(session,thread_killed_arg))
 
1393
      return(1);
 
1394
 
 
1395
    start_time= time(NULL);
 
1396
    if (start_time == (time_t)-1)
 
1397
      return -1;
 
1398
  }
 
1399
  return(0);
 
1400
}
 
1401
 
 
1402
 
 
1403
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
 
1404
                        bool *suppress_warnings)
 
1405
{
 
1406
  unsigned char buf[FN_REFLEN + 10];
 
1407
  int32_t len;
 
1408
  int32_t binlog_flags = 0; // for now
 
1409
  const char* logname = mi->getLogName();
 
1410
 
 
1411
  *suppress_warnings= false;
 
1412
 
 
1413
  // TODO if big log files: Change next to int8store()
 
1414
  int4store(buf, (uint32_t) mi->getLogPosition());
 
1415
  int2store(buf + 4, binlog_flags);
 
1416
  int4store(buf + 6, server_id);
 
1417
  len = (uint32_t) strlen(logname);
 
1418
  memcpy(buf + 10, logname,len);
 
1419
  if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
 
1420
  {
 
1421
    /*
 
1422
      Something went wrong, so we will just reconnect and retry later
 
1423
      in the future, we should do a better error analysis, but for
 
1424
      now we just fill up the error log :-)
 
1425
    */
 
1426
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1427
      *suppress_warnings= true;                 // Suppress reconnect warning
 
1428
    else
 
1429
      sql_print_error(_("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs"),
 
1430
                      drizzle_errno(drizzle), drizzle_error(drizzle),
 
1431
                      mi->connect_retry);
 
1432
    return(1);
 
1433
  }
 
1434
 
 
1435
  return(0);
 
1436
}
 
1437
 
 
1438
/*
 
1439
  Read one event from the master
 
1440
 
 
1441
  SYNOPSIS
 
1442
    read_event()
 
1443
    DRIZZLE               DRIZZLE connection
 
1444
    mi                  Master connection information
 
1445
    suppress_warnings   TRUE when a normal net read timeout has caused us to
 
1446
                        try a reconnect.  We do not want to print anything to
 
1447
                        the error log in this case because this a anormal
 
1448
                        event in an idle server.
 
1449
 
 
1450
    RETURN VALUES
 
1451
    'packet_error'      Error
 
1452
    number              Length of packet
 
1453
*/
 
1454
 
 
1455
static uint32_t read_event(DRIZZLE *drizzle,
 
1456
                           Master_info *mi,
 
1457
                           bool* suppress_warnings)
 
1458
{
 
1459
  uint32_t len;
 
1460
 
 
1461
  *suppress_warnings= false;
 
1462
  /*
 
1463
    my_real_read() will time us out
 
1464
    We check if we were told to die, and if not, try reading again
 
1465
  */
 
1466
  if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
 
1467
    return(packet_error);
 
1468
 
 
1469
  len = cli_safe_read(drizzle);
 
1470
  if (len == packet_error || (int32_t) len < 1)
 
1471
  {
 
1472
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1473
    {
 
1474
      /*
 
1475
        We are trying a normal reconnect after a read timeout;
 
1476
        we suppress prints to .err file as long as the reconnect
 
1477
        happens without problems
 
1478
      */
 
1479
      *suppress_warnings= true;
 
1480
    }
 
1481
    else
 
1482
      sql_print_error(_("Error reading packet from server: %s ( server_errno=%d)"),
 
1483
                      drizzle_error(drizzle), drizzle_errno(drizzle));
 
1484
    return(packet_error);
 
1485
  }
 
1486
 
 
1487
  /* Check if eof packet */
 
1488
  if (len < 8 && drizzle->net.read_pos[0] == 254)
 
1489
  {
 
1490
    sql_print_information(_("Slave: received end packet from server, apparent "
 
1491
                            "master shutdown: %s"),
 
1492
                     drizzle_error(drizzle));
 
1493
     return(packet_error);
 
1494
  }
 
1495
 
 
1496
  return(len - 1);
 
1497
}
 
1498
 
 
1499
 
 
1500
int32_t check_expected_error(Session*, Relay_log_info const *,
 
1501
                             int32_t expected_error)
 
1502
{
 
1503
  switch (expected_error) {
 
1504
  case ER_NET_READ_ERROR:
 
1505
  case ER_NET_ERROR_ON_WRITE:
 
1506
  case ER_QUERY_INTERRUPTED:
 
1507
  case ER_SERVER_SHUTDOWN:
 
1508
  case ER_NEW_ABORTING_CONNECTION:
 
1509
    return(1);
 
1510
  default:
 
1511
    return(0);
 
1512
  }
 
1513
}
 
1514
 
 
1515
 
 
1516
/*
 
1517
  Check if the current error is of temporary nature of not.
 
1518
  Some errors are temporary in nature, such as
 
1519
  ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT.  Ndb also signals
 
1520
  that the error is temporary by pushing a warning with the error code
 
1521
  ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
 
1522
*/
 
1523
static int32_t has_temporary_error(Session *session)
 
1524
{
 
1525
  if (session->is_fatal_error)
 
1526
    return(0);
 
1527
 
 
1528
  if (session->main_da.is_error())
 
1529
  {
 
1530
    session->clear_error();
 
1531
    my_error(ER_LOCK_DEADLOCK, MYF(0));
 
1532
  }
 
1533
 
 
1534
  /*
 
1535
    If there is no message in Session, we can't say if it's a temporary
 
1536
    error or not. This is currently the case for Incident_log_event,
 
1537
    which sets no message. Return FALSE.
 
1538
  */
 
1539
  if (!session->is_error())
 
1540
    return(0);
 
1541
 
 
1542
  /*
 
1543
    Temporary error codes:
 
1544
    currently, InnoDB deadlock detected by InnoDB or lock
 
1545
    wait timeout (innodb_lock_wait_timeout exceeded
 
1546
  */
 
1547
  if (session->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
 
1548
      session->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
 
1549
    return(1);
 
1550
 
 
1551
  return(0);
 
1552
}
 
1553
 
 
1554
 
 
1555
/**
 
1556
  Applies the given event and advances the relay log position.
 
1557
 
 
1558
  In essence, this function does:
 
1559
 
 
1560
  @code
 
1561
    ev->apply_event(rli);
 
1562
    ev->update_pos(rli);
 
1563
  @endcode
 
1564
 
 
1565
  But it also does some maintainance, such as skipping events if
 
1566
  needed and reporting errors.
 
1567
 
 
1568
  If the @c skip flag is set, then it is tested whether the event
 
1569
  should be skipped, by looking at the slave_skip_counter and the
 
1570
  server id.  The skip flag should be set when calling this from a
 
1571
  replication thread but not set when executing an explicit BINLOG
 
1572
  statement.
 
1573
 
 
1574
  @retval 0 OK.
 
1575
 
 
1576
  @retval 1 Error calling ev->apply_event().
 
1577
 
 
1578
  @retval 2 No error calling ev->apply_event(), but error calling
 
1579
  ev->update_pos().
 
1580
*/
 
1581
int32_t apply_event_and_update_pos(Log_event* ev, Session* session, Relay_log_info* rli,
 
1582
                               bool skip)
 
1583
{
 
1584
  int32_t exec_res= 0;
 
1585
 
 
1586
  /*
 
1587
    Execute the event to change the database and update the binary
 
1588
    log coordinates, but first we set some data that is needed for
 
1589
    the thread.
 
1590
 
 
1591
    The event will be executed unless it is supposed to be skipped.
 
1592
 
 
1593
    Queries originating from this server must be skipped.  Low-level
 
1594
    events (Format_description_log_event, Rotate_log_event,
 
1595
    Stop_log_event) from this server must also be skipped. But for
 
1596
    those we don't want to modify 'group_master_log_pos', because
 
1597
    these events did not exist on the master.
 
1598
    Format_description_log_event is not completely skipped.
 
1599
 
 
1600
    Skip queries specified by the user in 'slave_skip_counter'.  We
 
1601
    can't however skip events that has something to do with the log
 
1602
    files themselves.
 
1603
 
 
1604
    Filtering on own server id is extremely important, to ignore
 
1605
    execution of events created by the creation/rotation of the relay
 
1606
    log (remember that now the relay log starts with its Format_desc,
 
1607
    has a Rotate etc).
 
1608
  */
 
1609
 
 
1610
  session->server_id = ev->server_id; // use the original server id for logging
 
1611
  session->set_time();                            // time the query
 
1612
  session->lex->current_select= 0;
 
1613
  if (!ev->when)
 
1614
  {
 
1615
    ev->when= time(NULL);
 
1616
    if(ev->when == (time_t)-1)
 
1617
      return 2;
 
1618
  }
 
1619
 
 
1620
  ev->session = session; // because up to this point, ev->session == 0
 
1621
 
 
1622
  if (skip)
 
1623
  {
 
1624
    int32_t reason= ev->shall_skip(rli);
 
1625
    if (reason == Log_event::EVENT_SKIP_COUNT)
 
1626
      --rli->slave_skip_counter;
 
1627
    pthread_mutex_unlock(&rli->data_lock);
 
1628
    if (reason == Log_event::EVENT_SKIP_NOT)
 
1629
      exec_res= ev->apply_event(rli);
 
1630
  }
 
1631
  else
 
1632
    exec_res= ev->apply_event(rli);
 
1633
 
 
1634
  if (exec_res == 0)
 
1635
  {
 
1636
    int32_t error= ev->update_pos(rli);
 
1637
    /*
 
1638
      The update should not fail, so print an error message and
 
1639
      return an error code.
 
1640
 
 
1641
      TODO: Replace this with a decent error message when merged
 
1642
      with BUG#24954 (which adds several new error message).
 
1643
    */
 
1644
    if (error)
 
1645
    {
 
1646
      char buf[22];
 
1647
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
 
1648
                  _("It was not possible to update the positions"
 
1649
                  " of the relay log information: the slave may"
 
1650
                  " be in an inconsistent state."
 
1651
                  " Stopped in %s position %s"),
 
1652
                  rli->group_relay_log_name.c_str(),
 
1653
                  llstr(rli->group_relay_log_pos, buf));
 
1654
      return(2);
 
1655
    }
 
1656
  }
 
1657
 
 
1658
  return(exec_res ? 1 : 0);
 
1659
}
 
1660
 
 
1661
 
 
1662
/**
 
1663
  Top-level function for executing the next event from the relay log.
 
1664
 
 
1665
  This function reads the event from the relay log, executes it, and
 
1666
  advances the relay log position.  It also handles errors, etc.
 
1667
 
 
1668
  This function may fail to apply the event for the following reasons:
 
1669
 
 
1670
   - The position specfied by the UNTIL condition of the START SLAVE
 
1671
     command is reached.
 
1672
 
 
1673
   - It was not possible to read the event from the log.
 
1674
 
 
1675
   - The slave is killed.
 
1676
 
 
1677
   - An error occurred when applying the event, and the event has been
 
1678
     tried slave_trans_retries times.  If the event has been retried
 
1679
     fewer times, 0 is returned.
 
1680
 
 
1681
   - init_master_info or init_relay_log_pos failed. (These are called
 
1682
     if a failure occurs when applying the event.)</li>
 
1683
 
 
1684
   - An error occurred when updating the binlog position.
 
1685
 
 
1686
  @retval 0 The event was applied.
 
1687
 
 
1688
  @retval 1 The event was not applied.
 
1689
*/
 
1690
static int32_t exec_relay_log_event(Session* session, Relay_log_info* rli)
 
1691
{
 
1692
  /*
 
1693
     We acquire this mutex since we need it for all operations except
 
1694
     event execution. But we will release it in places where we will
 
1695
     wait for something for example inside of next_event().
 
1696
   */
 
1697
  pthread_mutex_lock(&rli->data_lock);
 
1698
 
 
1699
  Log_event * ev = next_event(rli);
 
1700
 
 
1701
  assert(rli->sql_session==session);
 
1702
 
 
1703
  if (sql_slave_killed(session,rli))
 
1704
  {
 
1705
    pthread_mutex_unlock(&rli->data_lock);
 
1706
    delete ev;
 
1707
    return(1);
 
1708
  }
 
1709
  if (ev)
 
1710
  {
 
1711
    int32_t exec_res;
 
1712
 
 
1713
    /*
 
1714
      This tests if the position of the beginning of the current event
 
1715
      hits the UNTIL barrier.
 
1716
    */
 
1717
    if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
 
1718
        rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
 
1719
                                rli->group_master_log_pos :
 
1720
                                ev->log_pos - ev->data_written))
 
1721
    {
 
1722
      char buf[22];
 
1723
      sql_print_information(_("Slave SQL thread stopped because it reached its"
 
1724
                              " UNTIL position %s"),
 
1725
                            llstr(rli->until_pos(), buf));
 
1726
      /*
 
1727
        Setting abort_slave flag because we do not want additional message about
 
1728
        error in query execution to be printed.
 
1729
      */
 
1730
      rli->abort_slave= 1;
 
1731
      pthread_mutex_unlock(&rli->data_lock);
 
1732
      delete ev;
 
1733
      return(1);
 
1734
    }
 
1735
    exec_res= apply_event_and_update_pos(ev, session, rli, true);
 
1736
 
 
1737
    /*
 
1738
      Format_description_log_event should not be deleted because it will be
 
1739
      used to read info about the relay log's format; it will be deleted when
 
1740
      the SQL thread does not need it, i.e. when this thread terminates.
 
1741
    */
 
1742
    if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
 
1743
    {
 
1744
      delete ev;
 
1745
    }
 
1746
 
 
1747
    /*
 
1748
      update_log_pos failed: this should not happen, so we don't
 
1749
      retry.
 
1750
    */
 
1751
    if (exec_res == 2)
 
1752
      return(1);
 
1753
 
 
1754
    if (slave_trans_retries)
 
1755
    {
 
1756
      int32_t temp_err= 0;
 
1757
      if (exec_res && (temp_err= has_temporary_error(session)))
 
1758
      {
 
1759
        const char *errmsg;
 
1760
        /*
 
1761
          We were in a transaction which has been rolled back because of a
 
1762
          temporary error;
 
1763
          let's seek back to BEGIN log event and retry it all again.
 
1764
          Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
 
1765
          there is no rollback since 5.0.13 (ref: manual).
 
1766
          We have to not only seek but also
 
1767
          a) init_master_info(), to seek back to hot relay log's start for later
 
1768
          (for when we will come back to this hot log after re-processing the
 
1769
          possibly existing old logs where BEGIN is: check_binlog_magic() will
 
1770
          then need the cache to be at position 0 (see comments at beginning of
 
1771
          init_master_info()).
 
1772
          b) init_relay_log_pos(), because the BEGIN may be an older relay log.
 
1773
        */
 
1774
        if (rli->trans_retries < slave_trans_retries)
 
1775
        {
 
1776
          if (rli->mi->init_master_info(0, 0, SLAVE_SQL))
 
1777
            sql_print_error(_("Failed to initialize the master info structure"));
 
1778
          else if (init_relay_log_pos(rli,
 
1779
                                      rli->group_relay_log_name.c_str(),
 
1780
                                      rli->group_relay_log_pos,
 
1781
                                      1, &errmsg, 1))
 
1782
            sql_print_error(_("Error initializing relay log position: %s"),
 
1783
                            errmsg);
 
1784
          else
 
1785
          {
 
1786
            exec_res= 0;
 
1787
            end_trans(session, ROLLBACK);
 
1788
            /* chance for concurrent connection to get more locks */
 
1789
            safe_sleep(session, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
 
1790
                       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
 
1791
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
 
1792
            rli->trans_retries++;
 
1793
            rli->retried_trans++;
 
1794
            pthread_mutex_unlock(&rli->data_lock);
 
1795
          }
 
1796
        }
 
1797
        else
 
1798
          sql_print_error(_("Slave SQL thread retried transaction %"PRIu64" time(s) "
 
1799
                            "in vain, giving up. Consider raising the value of "
 
1800
                            "the slave_transaction_retries variable."),
 
1801
                          slave_trans_retries);
 
1802
      }
 
1803
      else if ((exec_res && !temp_err) ||
 
1804
               (opt_using_transactions &&
 
1805
                rli->group_relay_log_pos == rli->event_relay_log_pos))
 
1806
      {
 
1807
        /*
 
1808
          Only reset the retry counter if the entire group succeeded
 
1809
          or failed with a non-transient error.  On a successful
 
1810
          event, the execution will proceed as usual; in the case of a
 
1811
          non-transient error, the slave will stop with an error.
 
1812
         */
 
1813
        rli->trans_retries= 0; // restart from fresh
 
1814
      }
 
1815
    }
 
1816
    return(exec_res);
 
1817
  }
 
1818
  pthread_mutex_unlock(&rli->data_lock);
 
1819
  rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
 
1820
              ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
 
1821
              _("Could not parse relay log event entry. The possible reasons "
 
1822
                "are: the master's binary log is corrupted (you can check this "
 
1823
                "by running 'mysqlbinlog' on the binary log), the slave's "
 
1824
                "relay log is corrupted (you can check this by running "
 
1825
                "'mysqlbinlog' on the relay log), a network problem, or a bug "
 
1826
                "in the master's or slave's DRIZZLE code. If you want to check "
 
1827
                "the master's binary log or slave's relay log, you will be "
 
1828
                "able to know their names by issuing 'SHOW SLAVE STATUS' "
 
1829
                "on this slave."));
 
1830
  return(1);
 
1831
}
 
1832
 
 
1833
 
 
1834
/**
 
1835
  @brief Try to reconnect slave IO thread.
 
1836
 
 
1837
  @details Terminates current connection to master, sleeps for
 
1838
  @c mi->connect_retry msecs and initiates new connection with
 
1839
  @c safe_reconnect(). Variable pointed by @c retry_count is increased -
 
1840
  if it exceeds @c master_retry_count then connection is not re-established
 
1841
  and function signals error.
 
1842
  Unless @c suppres_warnings is TRUE, a warning is put in the server error log
 
1843
  when reconnecting. The warning message and messages used to report errors
 
1844
  are taken from @c messages array. In case @c master_retry_count is exceeded,
 
1845
  no messages are added to the log.
 
1846
 
 
1847
  @param[in]     session                 Thread context.
 
1848
  @param[in]     DRIZZLE               DRIZZLE connection.
 
1849
  @param[in]     mi                  Master connection information.
 
1850
  @param[in,out] retry_count         Number of attempts to reconnect.
 
1851
  @param[in]     suppress_warnings   TRUE when a normal net read timeout
 
1852
                                     has caused to reconnecting.
 
1853
  @param[in]     messages            Messages to print/log, see
 
1854
                                     reconnect_messages[] array.
 
1855
 
 
1856
  @retval        0                   OK.
 
1857
  @retval        1                   There was an error.
 
1858
*/
 
1859
 
 
1860
static int32_t try_to_reconnect(Session *session, DRIZZLE *drizzle, Master_info *mi,
 
1861
                                uint32_t *retry_count, bool suppress_warnings,
 
1862
                                const char *messages[SLAVE_RECON_MSG_MAX])
 
1863
{
 
1864
  mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
 
1865
  session->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
 
1866
  drizzle_disconnect(drizzle);
 
1867
  if ((*retry_count)++)
 
1868
  {
 
1869
    if (*retry_count > master_retry_count)
 
1870
      return 1;                             // Don't retry forever
 
1871
    safe_sleep(session, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
 
1872
               (void *) mi);
 
1873
  }
 
1874
  if (check_io_slave_killed(session, mi,
 
1875
                            _(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
 
1876
    return 1;
 
1877
  session->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
 
1878
  if (!suppress_warnings)
 
1879
  {
 
1880
    char buf[256], llbuff[22];
 
1881
    snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
 
1882
             IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
 
1883
    /*
 
1884
      Raise a warining during registering on master/requesting dump.
 
1885
      Log a message reading event.
 
1886
    */
 
1887
    if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
 
1888
    {
 
1889
      mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
 
1890
                 ER(ER_SLAVE_MASTER_COM_FAILURE),
 
1891
                 _(messages[SLAVE_RECON_MSG_COMMAND]), buf);
 
1892
    }
 
1893
    else
 
1894
    {
 
1895
      sql_print_information("%s",buf);
 
1896
    }
 
1897
  }
 
1898
  if (safe_reconnect(session, drizzle, mi, 1) || io_slave_killed(session, mi))
 
1899
  {
 
1900
    if (global_system_variables.log_warnings)
 
1901
      sql_print_information("%s",_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
 
1902
    return 1;
 
1903
  }
 
1904
  return 0;
 
1905
}
 
1906
 
 
1907
 
 
1908
/* Slave I/O Thread entry point */
 
1909
 
 
1910
pthread_handler_t handle_slave_io(void *arg)
 
1911
{
 
1912
  Session *session; // needs to be first for thread_stack
 
1913
  DRIZZLE *drizzle;
 
1914
  Master_info *mi = (Master_info*)arg;
 
1915
  Relay_log_info *rli= &mi->rli;
 
1916
  char llbuff[22];
 
1917
  uint32_t retry_count;
 
1918
  bool suppress_warnings;
 
1919
  uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
 
1920
  my_thread_init();
 
1921
 
 
1922
  assert(mi->inited);
 
1923
  drizzle= NULL ;
 
1924
  retry_count= 0;
 
1925
 
 
1926
  pthread_mutex_lock(&mi->run_lock);
 
1927
  /* Inform waiting threads that slave has started */
 
1928
  mi->slave_run_id++;
 
1929
 
 
1930
  mi->events_till_disconnect = disconnect_slave_event_count;
 
1931
 
 
1932
  session= new Session;
 
1933
  Session_CHECK_SENTRY(session);
 
1934
  mi->io_session = session;
 
1935
 
 
1936
  pthread_detach_this_thread();
 
1937
  session->thread_stack= (char*) &session; // remember where our stack is
 
1938
  if (init_slave_thread(session, SLAVE_Session_IO))
 
1939
  {
 
1940
    pthread_cond_broadcast(&mi->start_cond);
 
1941
    pthread_mutex_unlock(&mi->run_lock);
 
1942
    sql_print_error(_("Failed during slave I/O thread initialization"));
 
1943
    goto err;
 
1944
  }
 
1945
  pthread_mutex_lock(&LOCK_thread_count);
 
1946
  threads.append(session);
 
1947
  pthread_mutex_unlock(&LOCK_thread_count);
 
1948
  mi->slave_running = 1;
 
1949
  mi->abort_slave = 0;
 
1950
  pthread_mutex_unlock(&mi->run_lock);
 
1951
  pthread_cond_broadcast(&mi->start_cond);
 
1952
 
 
1953
  if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
 
1954
  {
 
1955
    mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
 
1956
               ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
 
1957
    goto err;
 
1958
  }
 
1959
 
 
1960
  session->set_proc_info("Connecting to master");
 
1961
  // we can get killed during safe_connect
 
1962
  if (!safe_connect(session, drizzle, mi))
 
1963
  {
 
1964
    sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
 
1965
                            "replication started in log '%s' at position %s"),
 
1966
                          mi->getUsername(), mi->getHostname(), mi->getPort(),
 
1967
                          IO_RPL_LOG_NAME,
 
1968
                          llstr(mi->getLogPosition(), llbuff));
 
1969
  /*
 
1970
    Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
 
1971
    thread, since a replication event can become this much larger than
 
1972
    the corresponding packet (query) sent from client to master.
 
1973
  */
 
1974
    drizzle->net.max_packet_size= session->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
 
1975
  }
 
1976
  else
 
1977
  {
 
1978
    sql_print_information(_("Slave I/O thread killed while connecting to master"));
 
1979
    goto err;
 
1980
  }
 
1981
 
 
1982
connected:
 
1983
 
 
1984
  // TODO: the assignment below should be under mutex (5.0)
 
1985
  mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
 
1986
  session->slave_net = &drizzle->net;
 
1987
  session->set_proc_info("Checking master version");
 
1988
  if (get_master_version_and_clock(drizzle, mi))
 
1989
    goto err;
 
1990
 
 
1991
  if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
 
1992
  {
 
1993
    /*
 
1994
      Register ourselves with the master.
 
1995
    */
 
1996
    session->set_proc_info("Registering slave on master");
 
1997
    if (register_slave_on_master(drizzle, mi, &suppress_warnings))
 
1998
    {
 
1999
      if (!check_io_slave_killed(session, mi, "Slave I/O thread killed "
 
2000
                                 "while registering slave on master"))
 
2001
      {
 
2002
        sql_print_error(_("Slave I/O thread couldn't register on master"));
 
2003
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2004
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
 
2005
          goto err;
 
2006
      }
 
2007
      else
 
2008
        goto err;
 
2009
      goto connected;
 
2010
    }
 
2011
    if (!retry_count_reg)
 
2012
    {
 
2013
      retry_count_reg++;
 
2014
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2015
      if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2016
                           reconnect_messages[SLAVE_RECON_ACT_REG]))
 
2017
        goto err;
 
2018
      goto connected;
 
2019
    }
 
2020
  }
 
2021
 
 
2022
  while (!io_slave_killed(session,mi))
 
2023
  {
 
2024
    session->set_proc_info("Requesting binlog dump");
 
2025
    if (request_dump(drizzle, mi, &suppress_warnings))
 
2026
    {
 
2027
      sql_print_error(_("Failed on request_dump()"));
 
2028
      if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while \
 
2029
requesting master dump")) ||
 
2030
          try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2031
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
 
2032
        goto err;
 
2033
      goto connected;
 
2034
    }
 
2035
    if (!retry_count_dump)
 
2036
    {
 
2037
      retry_count_dump++;
 
2038
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2039
      if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2040
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
 
2041
        goto err;
 
2042
      goto connected;
 
2043
    }
 
2044
 
 
2045
    while (!io_slave_killed(session,mi))
 
2046
    {
 
2047
      uint32_t event_len;
 
2048
      /*
 
2049
        We say "waiting" because read_event() will wait if there's nothing to
 
2050
        read. But if there's something to read, it will not wait. The
 
2051
        important thing is to not confuse users by saying "reading" whereas
 
2052
        we're in fact receiving nothing.
 
2053
      */
 
2054
      session->set_proc_info(_("Waiting for master to send event"));
 
2055
      event_len= read_event(drizzle, mi, &suppress_warnings);
 
2056
      if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while "
 
2057
                                           "reading event")))
 
2058
        goto err;
 
2059
      if (!retry_count_event)
 
2060
      {
 
2061
        retry_count_event++;
 
2062
        sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2063
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2064
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
 
2065
          goto err;
 
2066
        goto connected;
 
2067
      }
 
2068
 
 
2069
      if (event_len == packet_error)
 
2070
      {
 
2071
        uint32_t drizzle_error_number= drizzle_errno(drizzle);
 
2072
        switch (drizzle_error_number) {
 
2073
        case CR_NET_PACKET_TOO_LARGE:
 
2074
          sql_print_error(_("Log entry on master is longer than "
 
2075
                            "max_allowed_packet (%u) on "
 
2076
                            "slave. If the entry is correct, restart the "
 
2077
                            "server with a higher value of "
 
2078
                            "max_allowed_packet"),
 
2079
                          session->variables.max_allowed_packet);
 
2080
          goto err;
 
2081
        case ER_MASTER_FATAL_ERROR_READING_BINLOG:
 
2082
          sql_print_error(ER(drizzle_error_number), drizzle_error_number,
 
2083
                          drizzle_error(drizzle));
 
2084
          goto err;
 
2085
        case EE_OUTOFMEMORY:
 
2086
        case ER_OUTOFMEMORY:
 
2087
          sql_print_error(
 
2088
       _("Stopping slave I/O thread due to out-of-memory error from master"));
 
2089
          goto err;
 
2090
        }
 
2091
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2092
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
 
2093
          goto err;
 
2094
        goto connected;
 
2095
      } // if (event_len == packet_error)
 
2096
 
 
2097
      retry_count=0;                    // ok event, reset retry counter
 
2098
      session->set_proc_info(_("Queuing master event to the relay log"));
 
2099
      if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
 
2100
      {
 
2101
        goto err;
 
2102
      }
 
2103
      if (mi->flush())
 
2104
      {
 
2105
        sql_print_error(_("Failed to flush master info file"));
 
2106
        goto err;
 
2107
      }
 
2108
      /*
 
2109
        See if the relay logs take too much space.
 
2110
        We don't lock mi->rli.log_space_lock here; this dirty read saves time
 
2111
        and does not introduce any problem:
 
2112
        - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
 
2113
        the clean value is 0), then we are reading only one more event as we
 
2114
        should, and we'll block only at the next event. No big deal.
 
2115
        - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
 
2116
        the clean value is 1), then we are going into wait_for_relay_log_space()
 
2117
        for no reason, but this function will do a clean read, notice the clean
 
2118
        value and exit immediately.
 
2119
      */
 
2120
      if (rli->log_space_limit && rli->log_space_limit <
 
2121
          rli->log_space_total &&
 
2122
          !rli->ignore_log_space_limit)
 
2123
        if (wait_for_relay_log_space(rli))
 
2124
        {
 
2125
          sql_print_error(_("Slave I/O thread aborted while waiting for "
 
2126
                            "relay log space"));
 
2127
          goto err;
 
2128
        }
 
2129
    }
 
2130
  }
 
2131
 
 
2132
// error = 0;
 
2133
err:
 
2134
// print the current replication position
 
2135
  sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
 
2136
                          "position %s"),
 
2137
                        IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
 
2138
  pthread_mutex_lock(&LOCK_thread_count);
 
2139
  session->query = session->db = 0; // extra safety
 
2140
  session->query_length= session->db_length= 0;
 
2141
  pthread_mutex_unlock(&LOCK_thread_count);
 
2142
  if (drizzle)
 
2143
  {
 
2144
    /*
 
2145
      Here we need to clear the active VIO before closing the
 
2146
      connection with the master.  The reason is that Session::awake()
 
2147
      might be called from terminate_slave_thread() because somebody
 
2148
      issued a STOP SLAVE.  If that happends, the close_active_vio()
 
2149
      can be called in the middle of closing the VIO associated with
 
2150
      the 'mysql' object, causing a crash.
 
2151
    */
 
2152
    drizzle_close(drizzle);
 
2153
    mi->drizzle=0;
 
2154
  }
 
2155
  write_ignored_events_info_to_relay_log(session, mi);
 
2156
  session->set_proc_info(_("Waiting for slave mutex on exit"));
 
2157
  pthread_mutex_lock(&mi->run_lock);
 
2158
 
 
2159
  /* Forget the relay log's format */
 
2160
  delete mi->rli.relay_log.description_event_for_queue;
 
2161
  mi->rli.relay_log.description_event_for_queue= 0;
 
2162
  assert(session->net.buff != 0);
 
2163
  net_end(&session->net); // destructor will not free it, because net.vio is 0
 
2164
  close_thread_tables(session);
 
2165
  pthread_mutex_lock(&LOCK_thread_count);
 
2166
  Session_CHECK_SENTRY(session);
 
2167
  delete session;
 
2168
  pthread_mutex_unlock(&LOCK_thread_count);
 
2169
  mi->abort_slave= 0;
 
2170
  mi->slave_running= 0;
 
2171
  mi->io_session= 0;
 
2172
  /*
 
2173
    Note: the order of the two following calls (first broadcast, then unlock)
 
2174
    is important. Otherwise a killer_thread can execute between the calls and
 
2175
    delete the mi structure leading to a crash! (see BUG#25306 for details)
 
2176
   */
 
2177
  pthread_cond_broadcast(&mi->stop_cond);       // tell the world we are done
 
2178
  pthread_mutex_unlock(&mi->run_lock);
 
2179
  my_thread_end();
 
2180
  pthread_exit(0);
 
2181
  return(0);                               // Can't return anything here
 
2182
}
 
2183
 
 
2184
 
 
2185
/* Slave SQL Thread entry point */
 
2186
 
 
2187
pthread_handler_t handle_slave_sql(void *arg)
 
2188
{
 
2189
  Session *session;                     /* needs to be first for thread_stack */
 
2190
  char llbuff[22],llbuff1[22];
 
2191
 
 
2192
  Relay_log_info* rli = &((Master_info*)arg)->rli;
 
2193
  const char *errmsg;
 
2194
 
 
2195
  my_thread_init();
 
2196
 
 
2197
  assert(rli->inited);
 
2198
  pthread_mutex_lock(&rli->run_lock);
 
2199
  assert(!rli->slave_running);
 
2200
  errmsg= 0;
 
2201
  rli->events_till_abort = abort_slave_event_count;
 
2202
 
 
2203
  session = new Session;
 
2204
  session->thread_stack = (char*)&session; // remember where our stack is
 
2205
  rli->sql_session= session;
 
2206
 
 
2207
  /* Inform waiting threads that slave has started */
 
2208
  rli->slave_run_id++;
 
2209
  rli->slave_running = 1;
 
2210
 
 
2211
  pthread_detach_this_thread();
 
2212
  if (init_slave_thread(session, SLAVE_Session_SQL))
 
2213
  {
 
2214
    /*
 
2215
      TODO: this is currently broken - slave start and change master
 
2216
      will be stuck if we fail here
 
2217
    */
 
2218
    pthread_cond_broadcast(&rli->start_cond);
 
2219
    pthread_mutex_unlock(&rli->run_lock);
 
2220
    sql_print_error(_("Failed during slave thread initialization"));
 
2221
    goto err;
 
2222
  }
 
2223
  session->init_for_queries();
 
2224
  session->temporary_tables = rli->save_temporary_tables; // restore temp tables
 
2225
  pthread_mutex_lock(&LOCK_thread_count);
 
2226
  threads.append(session);
 
2227
  pthread_mutex_unlock(&LOCK_thread_count);
 
2228
  /*
 
2229
    We are going to set slave_running to 1. Assuming slave I/O thread is
 
2230
    alive and connected, this is going to make Seconds_Behind_Master be 0
 
2231
    i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
 
2232
    the moment we start we can think we are caught up, and the next second we
 
2233
    start receiving data so we realize we are not caught up and
 
2234
    Seconds_Behind_Master grows. No big deal.
 
2235
  */
 
2236
  rli->abort_slave = 0;
 
2237
  pthread_mutex_unlock(&rli->run_lock);
 
2238
  pthread_cond_broadcast(&rli->start_cond);
 
2239
 
 
2240
  /*
 
2241
    Reset errors for a clean start (otherwise, if the master is idle, the SQL
 
2242
    thread may execute no Query_log_event, so the error will remain even
 
2243
    though there's no problem anymore). Do not reset the master timestamp
 
2244
    (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
 
2245
    as we are not sure that we are going to receive a query, we want to
 
2246
    remember the last master timestamp (to say how many seconds behind we are
 
2247
    now.
 
2248
    But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
 
2249
  */
 
2250
  rli->clear_error();
 
2251
 
 
2252
  //tell the I/O thread to take relay_log_space_limit into account from now on
 
2253
  pthread_mutex_lock(&rli->log_space_lock);
 
2254
  rli->ignore_log_space_limit= 0;
 
2255
  pthread_mutex_unlock(&rli->log_space_lock);
 
2256
  rli->trans_retries= 0; // start from "no error"
 
2257
 
 
2258
  if (init_relay_log_pos(rli,
 
2259
                         rli->group_relay_log_name.c_str(),
 
2260
                         rli->group_relay_log_pos,
 
2261
                         1 /*need data lock*/, &errmsg,
 
2262
                         1 /*look for a description_event*/))
 
2263
  {
 
2264
    sql_print_error(_("Error initializing relay log position: %s"),
 
2265
                    errmsg);
 
2266
    goto err;
 
2267
  }
 
2268
  Session_CHECK_SENTRY(session);
 
2269
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
 
2270
  /*
 
2271
    Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
 
2272
    correct position when it's called just after my_b_seek() (the questionable
 
2273
    stuff is those "seek is done on next read" comments in the my_b_seek()
 
2274
    source code).
 
2275
    The crude reality is that this assertion randomly fails whereas
 
2276
    replication seems to work fine. And there is no easy explanation why it
 
2277
    fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
 
2278
    init_relay_log_pos() called above). Maybe the assertion would be
 
2279
    meaningful if we held rli->data_lock between the my_b_seek() and the
 
2280
    assert().
 
2281
  */
 
2282
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
 
2283
  assert(rli->sql_session == session);
 
2284
 
 
2285
  if (global_system_variables.log_warnings)
 
2286
    sql_print_information(_("Slave SQL thread initialized, "
 
2287
                            "starting replication in log '%s' at "
 
2288
                            "position %s, relay log '%s' position: %s"),
 
2289
                            RPL_LOG_NAME,
 
2290
                          llstr(rli->group_master_log_pos,llbuff),
 
2291
                          rli->group_relay_log_name.c_str(),
 
2292
                          llstr(rli->group_relay_log_pos,llbuff1));
 
2293
 
 
2294
  /* execute init_slave variable */
 
2295
  if (sys_init_slave.value_length)
 
2296
  {
 
2297
    execute_init_command(session, &sys_init_slave, &LOCK_sys_init_slave);
 
2298
    if (session->is_slave_error)
 
2299
    {
 
2300
      sql_print_error(_("Slave SQL thread aborted. "
 
2301
                        "Can't execute init_slave query"));
 
2302
      goto err;
 
2303
    }
 
2304
  }
 
2305
 
 
2306
  /*
 
2307
    First check until condition - probably there is nothing to execute. We
 
2308
    do not want to wait for next event in this case.
 
2309
  */
 
2310
  pthread_mutex_lock(&rli->data_lock);
 
2311
  if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
 
2312
      rli->is_until_satisfied(rli->group_master_log_pos))
 
2313
  {
 
2314
    char buf[22];
 
2315
    sql_print_information(_("Slave SQL thread stopped because it reached its"
 
2316
                            " UNTIL position %s"), llstr(rli->until_pos(), buf));
 
2317
    pthread_mutex_unlock(&rli->data_lock);
 
2318
    goto err;
 
2319
  }
 
2320
  pthread_mutex_unlock(&rli->data_lock);
 
2321
 
 
2322
  /* Read queries from the IO/THREAD until this thread is killed */
 
2323
 
 
2324
  while (!sql_slave_killed(session,rli))
 
2325
  {
 
2326
    session->set_proc_info(_("Reading event from the relay log"));
 
2327
    assert(rli->sql_session == session);
 
2328
    Session_CHECK_SENTRY(session);
 
2329
    if (exec_relay_log_event(session,rli))
 
2330
    {
 
2331
      // do not scare the user if SQL thread was simply killed or stopped
 
2332
      if (!sql_slave_killed(session,rli))
 
2333
      {
 
2334
        /*
 
2335
          retrieve as much info as possible from the session and, error
 
2336
          codes and warnings and print this to the error log as to
 
2337
          allow the user to locate the error
 
2338
        */
 
2339
        uint32_t const last_errno= rli->last_error().number;
 
2340
 
 
2341
        if (session->is_error())
 
2342
        {
 
2343
          char const *const errmsg= session->main_da.message();
 
2344
 
 
2345
          if (last_errno == 0)
 
2346
          {
 
2347
            rli->report(ERROR_LEVEL, session->main_da.sql_errno(), "%s", errmsg);
 
2348
          }
 
2349
          else if (last_errno != session->main_da.sql_errno())
 
2350
          {
 
2351
            sql_print_error(_("Slave (additional info): %s Error_code: %d"),
 
2352
                            errmsg, session->main_da.sql_errno());
 
2353
          }
 
2354
        }
 
2355
 
 
2356
        /* Print any warnings issued */
 
2357
        List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
 
2358
        DRIZZLE_ERROR *err;
 
2359
        /*
 
2360
          Added controlled slave thread cancel for replication
 
2361
          of user-defined variables.
 
2362
        */
 
2363
        bool udf_error = false;
 
2364
        while ((err= it++))
 
2365
        {
 
2366
          if (err->code == ER_CANT_OPEN_LIBRARY)
 
2367
            udf_error = true;
 
2368
          sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
 
2369
        }
 
2370
        if (udf_error)
 
2371
          sql_print_error(_("Error loading user-defined library, slave SQL "
 
2372
                            "thread aborted. Install the missing library, "
 
2373
                            "and restart the slave SQL thread with "
 
2374
                            "\"SLAVE START\". We stopped at log '%s' "
 
2375
                            "position %s"),
 
2376
                          RPL_LOG_NAME, llstr(rli->group_master_log_pos,
 
2377
            llbuff));
 
2378
        else
 
2379
          sql_print_error(_("Error running query, slave SQL thread aborted. "
 
2380
                            "Fix the problem, and restart "
 
2381
                            "the slave SQL thread with \"SLAVE START\". "
 
2382
                            "We stopped at log '%s' position %s"),
 
2383
                          RPL_LOG_NAME,
 
2384
                          llstr(rli->group_master_log_pos, llbuff));
 
2385
      }
 
2386
      goto err;
 
2387
    }
 
2388
  }
 
2389
 
 
2390
  /* Thread stopped. Print the current replication position to the log */
 
2391
  sql_print_information(_("Slave SQL thread exiting, replication stopped in "
 
2392
                          "log '%s' at position %s"),
 
2393
                        RPL_LOG_NAME,
 
2394
                        llstr(rli->group_master_log_pos,llbuff));
 
2395
 
 
2396
 err:
 
2397
 
 
2398
  /*
 
2399
    Some events set some playgrounds, which won't be cleared because thread
 
2400
    stops. Stopping of this thread may not be known to these events ("stop"
 
2401
    request is detected only by the present function, not by events), so we
 
2402
    must "proactively" clear playgrounds:
 
2403
  */
 
2404
  rli->cleanup_context(session, 1);
 
2405
  pthread_mutex_lock(&LOCK_thread_count);
 
2406
  /*
 
2407
    Some extra safety, which should not been needed (normally, event deletion
 
2408
    should already have done these assignments (each event which sets these
 
2409
    variables is supposed to set them to 0 before terminating)).
 
2410
  */
 
2411
  session->query= session->db= session->catalog= 0;
 
2412
  session->query_length= session->db_length= 0;
 
2413
  pthread_mutex_unlock(&LOCK_thread_count);
 
2414
  session->set_proc_info("Waiting for slave mutex on exit");
 
2415
  pthread_mutex_lock(&rli->run_lock);
 
2416
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
 
2417
  pthread_mutex_lock(&rli->data_lock);
 
2418
  assert(rli->slave_running == 1); // tracking buffer overrun
 
2419
  /* When master_pos_wait() wakes up it will check this and terminate */
 
2420
  rli->slave_running= 0;
 
2421
  /* Forget the relay log's format */
 
2422
  delete rli->relay_log.description_event_for_exec;
 
2423
  rli->relay_log.description_event_for_exec= 0;
 
2424
  /* Wake up master_pos_wait() */
 
2425
  pthread_mutex_unlock(&rli->data_lock);
 
2426
  pthread_cond_broadcast(&rli->data_cond);
 
2427
  rli->ignore_log_space_limit= 0; /* don't need any lock */
 
2428
  rli->save_temporary_tables = session->temporary_tables;
 
2429
 
 
2430
  /*
 
2431
    TODO: see if we can do this conditionally in next_event() instead
 
2432
    to avoid unneeded position re-init
 
2433
  */
 
2434
  session->temporary_tables = 0; // remove tempation from destructor to close them
 
2435
  assert(session->net.buff != 0);
 
2436
  net_end(&session->net); // destructor will not free it, because we are weird
 
2437
  assert(rli->sql_session == session);
 
2438
  Session_CHECK_SENTRY(session);
 
2439
  rli->sql_session= 0;
 
2440
  pthread_mutex_lock(&LOCK_thread_count);
 
2441
  Session_CHECK_SENTRY(session);
 
2442
  delete session;
 
2443
  pthread_mutex_unlock(&LOCK_thread_count);
 
2444
 /*
 
2445
  Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
 
2446
  is important. Otherwise a killer_thread can execute between the calls and
 
2447
  delete the mi structure leading to a crash! (see BUG#25306 for details)
 
2448
 */
 
2449
  pthread_cond_broadcast(&rli->stop_cond);
 
2450
  pthread_mutex_unlock(&rli->run_lock);  // tell the world we are done
 
2451
 
 
2452
  my_thread_end();
 
2453
  pthread_exit(0);
 
2454
  return(0);                               // Can't return anything here
 
2455
}
 
2456
 
 
2457
 
 
2458
/*
 
2459
  process_io_create_file()
 
2460
*/
 
2461
 
 
2462
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
 
2463
{
 
2464
  int32_t error = 1;
 
2465
  uint32_t num_bytes;
 
2466
  bool cev_not_written;
 
2467
  Session *session = mi->io_session;
 
2468
  NET *net = &mi->drizzle->net;
 
2469
 
 
2470
  if (unlikely(!cev->is_valid()))
 
2471
    return(1);
 
2472
 
 
2473
  assert(cev->inited_from_old);
 
2474
  session->file_id = cev->file_id = mi->file_id++;
 
2475
  session->server_id = cev->server_id;
 
2476
  cev_not_written = 1;
 
2477
 
 
2478
  if (unlikely(net_request_file(net,cev->fname)))
 
2479
  {
 
2480
    sql_print_error(_("Slave I/O: failed requesting download of '%s'"),
 
2481
                    cev->fname);
 
2482
    goto err;
 
2483
  }
 
2484
 
 
2485
  /*
 
2486
    This dummy block is so we could instantiate Append_block_log_event
 
2487
    once and then modify it slightly instead of doing it multiple times
 
2488
    in the loop
 
2489
  */
 
2490
  {
 
2491
    Append_block_log_event aev(session,0,0,0,0);
 
2492
 
 
2493
    for (;;)
 
2494
    {
 
2495
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
 
2496
      {
 
2497
        sql_print_error(_("Network read error downloading '%s' from master"),
 
2498
                        cev->fname);
 
2499
        goto err;
 
2500
      }
 
2501
      if (unlikely(!num_bytes)) /* eof */
 
2502
      {
 
2503
        /* 3.23 master wants it */
 
2504
        net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0);
 
2505
        /*
 
2506
          If we wrote Create_file_log_event, then we need to write
 
2507
          Execute_load_log_event. If we did not write Create_file_log_event,
 
2508
          then this is an empty file and we can just do as if the LOAD DATA
 
2509
          INFILE had not existed, i.e. write nothing.
 
2510
        */
 
2511
        if (unlikely(cev_not_written))
 
2512
          break;
 
2513
        Execute_load_log_event xev(session,0,0);
 
2514
        xev.log_pos = cev->log_pos;
 
2515
        if (unlikely(mi->rli.relay_log.append(&xev)))
 
2516
        {
 
2517
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2518
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2519
                     _("error writing Exec_load event to relay log"));
 
2520
          goto err;
 
2521
        }
 
2522
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
 
2523
        break;
 
2524
      }
 
2525
      if (unlikely(cev_not_written))
 
2526
      {
 
2527
        cev->block = net->read_pos;
 
2528
        cev->block_len = num_bytes;
 
2529
        if (unlikely(mi->rli.relay_log.append(cev)))
 
2530
        {
 
2531
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2532
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2533
                     _("error writing Create_file event to relay log"));
 
2534
          goto err;
 
2535
        }
 
2536
        cev_not_written=0;
 
2537
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
 
2538
      }
 
2539
      else
 
2540
      {
 
2541
        aev.block = net->read_pos;
 
2542
        aev.block_len = num_bytes;
 
2543
        aev.log_pos = cev->log_pos;
 
2544
        if (unlikely(mi->rli.relay_log.append(&aev)))
 
2545
        {
 
2546
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2547
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2548
                     _("error writing Append_block event to relay log"));
 
2549
          goto err;
 
2550
        }
 
2551
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
 
2552
      }
 
2553
    }
 
2554
  }
 
2555
  error=0;
 
2556
err:
 
2557
  return(error);
 
2558
}
 
2559
 
 
2560
 
 
2561
/*
 
2562
  Start using a new binary log on the master
 
2563
 
 
2564
  SYNOPSIS
 
2565
    process_io_rotate()
 
2566
    mi                  master_info for the slave
 
2567
    rev                 The rotate log event read from the binary log
 
2568
 
 
2569
  DESCRIPTION
 
2570
    Updates the master info with the place in the next binary
 
2571
    log where we should start reading.
 
2572
    Rotate the relay log to avoid mixed-format relay logs.
 
2573
 
 
2574
  NOTES
 
2575
    We assume we already locked mi->data_lock
 
2576
 
 
2577
  RETURN VALUES
 
2578
    0           ok
 
2579
    1           Log event is illegal
 
2580
 
 
2581
*/
 
2582
 
 
2583
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
 
2584
{
 
2585
  safe_mutex_assert_owner(&mi->data_lock);
 
2586
 
 
2587
  if (unlikely(!rev->is_valid()))
 
2588
    return(1);
 
2589
 
 
2590
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
 
2591
  mi->setLogName(rev->new_log_ident.c_str());
 
2592
  mi->setLogPosition(rev->pos);
 
2593
  /*
 
2594
    If we do not do this, we will be getting the first
 
2595
    rotate event forever, so we need to not disconnect after one.
 
2596
  */
 
2597
  if (disconnect_slave_event_count)
 
2598
    mi->events_till_disconnect++;
 
2599
 
 
2600
  /*
 
2601
    If description_event_for_queue is format <4, there is conversion in the
 
2602
    relay log to the slave's format (4). And Rotate can mean upgrade or
 
2603
    nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
 
2604
    no need to reset description_event_for_queue now. And if it's nothing (same
 
2605
    master version as before), no need (still using the slave's format).
 
2606
  */
 
2607
  if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
 
2608
  {
 
2609
    delete mi->rli.relay_log.description_event_for_queue;
 
2610
    /* start from format 3 (DRIZZLE 4.0) again */
 
2611
    mi->rli.relay_log.description_event_for_queue= new
 
2612
      Format_description_log_event(3);
 
2613
  }
 
2614
  /*
 
2615
    Rotate the relay log makes binlog format detection easier (at next slave
 
2616
    start or mysqlbinlog)
 
2617
  */
 
2618
  rotate_relay_log(mi); /* will take the right mutexes */
 
2619
  return(0);
 
2620
}
 
2621
 
 
2622
/*
 
2623
  Reads a 3.23 event and converts it to the slave's format. This code was
 
2624
  copied from DRIZZLE 4.0.
 
2625
*/
 
2626
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
 
2627
                           uint32_t event_len)
 
2628
{
 
2629
  const char *errmsg = 0;
 
2630
  uint32_t inc_pos;
 
2631
  bool ignore_event= 0;
 
2632
  char *tmp_buf = 0;
 
2633
  Relay_log_info *rli= &mi->rli;
 
2634
 
 
2635
  /*
 
2636
    If we get Load event, we need to pass a non-reusable buffer
 
2637
    to read_log_event, so we do a trick
 
2638
  */
 
2639
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
 
2640
  {
 
2641
    if (unlikely(!(tmp_buf=(char*)malloc(event_len+1))))
 
2642
    {
 
2643
      mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
 
2644
                 ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
 
2645
      return(1);
 
2646
    }
 
2647
    memcpy(tmp_buf,buf,event_len);
 
2648
    /*
 
2649
      Create_file constructor wants a 0 as last char of buffer, this 0 will
 
2650
      serve as the string-termination char for the file's name (which is at the
 
2651
      end of the buffer)
 
2652
      We must increment event_len, otherwise the event constructor will not see
 
2653
      this end 0, which leads to segfault.
 
2654
    */
 
2655
    tmp_buf[event_len++]=0;
 
2656
    int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
 
2657
    buf = (const char*)tmp_buf;
 
2658
  }
 
2659
  /*
 
2660
    This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
 
2661
    send the loaded file, and write it to the relay log in the form of
 
2662
    Append_block/Exec_load (the SQL thread needs the data, as that thread is not
 
2663
    connected to the master).
 
2664
  */
 
2665
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
 
2666
                                            mi->rli.relay_log.description_event_for_queue);
 
2667
  if (unlikely(!ev))
 
2668
  {
 
2669
    sql_print_error(_("Read invalid event from master: '%s', "
 
2670
                      "master could be corrupt but a more likely cause "
 
2671
                      "of this is a bug"),
 
2672
                    errmsg);
 
2673
    free((char*) tmp_buf);
 
2674
    return(1);
 
2675
  }
 
2676
 
 
2677
  pthread_mutex_lock(&mi->data_lock);
 
2678
  ev->log_pos= mi->getLogPosition(); /* 3.23 events don't contain log_pos */
 
2679
  switch (ev->get_type_code()) {
 
2680
  case STOP_EVENT:
 
2681
    ignore_event= 1;
 
2682
    inc_pos= event_len;
 
2683
    break;
 
2684
  case ROTATE_EVENT:
 
2685
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
 
2686
    {
 
2687
      delete ev;
 
2688
      pthread_mutex_unlock(&mi->data_lock);
 
2689
      return(1);
 
2690
    }
 
2691
    inc_pos= 0;
 
2692
    break;
 
2693
  case CREATE_FILE_EVENT:
 
2694
    /*
 
2695
      Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
 
2696
      queue_old_event() which is for 3.23 events which don't comprise
 
2697
      CREATE_FILE_EVENT. This is because read_log_event() above has just
 
2698
      transformed LOAD_EVENT into CREATE_FILE_EVENT.
 
2699
    */
 
2700
  {
 
2701
    /* We come here when and only when tmp_buf != 0 */
 
2702
    assert(tmp_buf != 0);
 
2703
    inc_pos=event_len;
 
2704
    ev->log_pos+= inc_pos;
 
2705
    int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
 
2706
    delete ev;
 
2707
    mi->incrementLogPosition(inc_pos);
 
2708
    pthread_mutex_unlock(&mi->data_lock);
 
2709
    free((char*)tmp_buf);
 
2710
    return(error);
 
2711
  }
 
2712
  default:
 
2713
    inc_pos= event_len;
 
2714
    break;
 
2715
  }
 
2716
  if (likely(!ignore_event))
 
2717
  {
 
2718
    if (ev->log_pos)
 
2719
      /*
 
2720
         Don't do it for fake Rotate events (see comment in
 
2721
      Log_event::Log_event(const char* buf...) in log_event.cc).
 
2722
      */
 
2723
      ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
 
2724
    if (unlikely(rli->relay_log.append(ev)))
 
2725
    {
 
2726
      delete ev;
 
2727
      pthread_mutex_unlock(&mi->data_lock);
 
2728
      return(1);
 
2729
    }
 
2730
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
2731
  }
 
2732
  delete ev;
 
2733
  mi->incrementLogPosition(inc_pos);
 
2734
  pthread_mutex_unlock(&mi->data_lock);
 
2735
  return(0);
 
2736
}
 
2737
 
 
2738
/*
 
2739
  Reads a 4.0 event and converts it to the slave's format. This code was copied
 
2740
  from queue_binlog_ver_1_event(), with some affordable simplifications.
 
2741
*/
 
2742
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
 
2743
                           uint32_t event_len)
 
2744
{
 
2745
  const char *errmsg = 0;
 
2746
  uint32_t inc_pos;
 
2747
  char *tmp_buf = 0;
 
2748
  Relay_log_info *rli= &mi->rli;
 
2749
 
 
2750
  /* read_log_event() will adjust log_pos to be end_log_pos */
 
2751
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
 
2752
                                            mi->rli.relay_log.description_event_for_queue);
 
2753
  if (unlikely(!ev))
 
2754
  {
 
2755
    sql_print_error(_("Read invalid event from master: '%s', "
 
2756
                      "master could be corrupt but a more likely cause of "
 
2757
                      "this is a bug"),
 
2758
                    errmsg);
 
2759
    free((char*) tmp_buf);
 
2760
    return(1);
 
2761
  }
 
2762
  pthread_mutex_lock(&mi->data_lock);
 
2763
  switch (ev->get_type_code()) {
 
2764
  case STOP_EVENT:
 
2765
    goto err;
 
2766
  case ROTATE_EVENT:
 
2767
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
 
2768
    {
 
2769
      delete ev;
 
2770
      pthread_mutex_unlock(&mi->data_lock);
 
2771
      return(1);
 
2772
    }
 
2773
    inc_pos= 0;
 
2774
    break;
 
2775
  default:
 
2776
    inc_pos= event_len;
 
2777
    break;
 
2778
  }
 
2779
  if (unlikely(rli->relay_log.append(ev)))
 
2780
  {
 
2781
    delete ev;
 
2782
    pthread_mutex_unlock(&mi->data_lock);
 
2783
    return(1);
 
2784
  }
 
2785
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
2786
  delete ev;
 
2787
  mi->incrementLogPosition(inc_pos);
 
2788
err:
 
2789
  pthread_mutex_unlock(&mi->data_lock);
 
2790
  return(0);
 
2791
}
 
2792
 
 
2793
/*
 
2794
  queue_old_event()
 
2795
 
 
2796
  Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
 
2797
  (exactly, slave's) format. To do the conversion, we create a 5.0 event from
 
2798
  the 3.23/4.0 bytes, then write this event to the relay log.
 
2799
 
 
2800
  TODO:
 
2801
    Test this code before release - it has to be tested on a separate
 
2802
    setup with 3.23 master or 4.0 master
 
2803
*/
 
2804
 
 
2805
static int32_t queue_old_event(Master_info *mi, const char *buf,
 
2806
                           uint32_t event_len)
 
2807
{
 
2808
  switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
 
2809
  {
 
2810
  case 1:
 
2811
      return(queue_binlog_ver_1_event(mi,buf,event_len));
 
2812
  case 3:
 
2813
      return(queue_binlog_ver_3_event(mi,buf,event_len));
 
2814
  default: /* unsupported format; eg version 2 */
 
2815
    return(1);
 
2816
  }
 
2817
}
 
2818
 
 
2819
/*
 
2820
  queue_event()
 
2821
 
 
2822
  If the event is 3.23/4.0, passes it to queue_old_event() which will convert
 
2823
  it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
 
2824
  no format conversion, it's pure read/write of bytes.
 
2825
  So a 5.0.0 slave's relay log can contain events in the slave's format or in
 
2826
  any >=5.0.0 format.
 
2827
*/
 
2828
 
 
2829
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
 
2830
{
 
2831
  int32_t error= 0;
 
2832
  String error_msg;
 
2833
  uint32_t inc_pos= 0;
 
2834
  Relay_log_info *rli= &mi->rli;
 
2835
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
 
2836
 
 
2837
 
 
2838
  if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
 
2839
      buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
 
2840
    return(queue_old_event(mi,buf,event_len));
 
2841
 
 
2842
  pthread_mutex_lock(&mi->data_lock);
 
2843
 
 
2844
  switch (buf[EVENT_TYPE_OFFSET]) {
 
2845
  case STOP_EVENT:
 
2846
    /*
 
2847
      We needn't write this event to the relay log. Indeed, it just indicates a
 
2848
      master server shutdown. The only thing this does is cleaning. But
 
2849
      cleaning is already done on a per-master-thread basis (as the master
 
2850
      server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
 
2851
      prepared statements' deletion are TODO only when we binlog prep stmts).
 
2852
 
 
2853
      We don't even increment mi->master_log_pos, because we may be just after
 
2854
      a Rotate event. Btw, in a few milliseconds we are going to have a Start
 
2855
      event from the next binlog (unless the master is presently running
 
2856
      without --log-bin).
 
2857
    */
 
2858
    goto err;
 
2859
  case ROTATE_EVENT:
 
2860
  {
 
2861
    Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
 
2862
    if (unlikely(process_io_rotate(mi,&rev)))
 
2863
    {
 
2864
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
2865
      goto err;
 
2866
    }
 
2867
    /*
 
2868
      Now the I/O thread has just changed its mi->master_log_name, so
 
2869
      incrementing mi->master_log_pos is nonsense.
 
2870
    */
 
2871
    inc_pos= 0;
 
2872
    break;
 
2873
  }
 
2874
  case FORMAT_DESCRIPTION_EVENT:
 
2875
  {
 
2876
    /*
 
2877
      Create an event, and save it (when we rotate the relay log, we will have
 
2878
      to write this event again).
 
2879
    */
 
2880
    /*
 
2881
      We are the only thread which reads/writes description_event_for_queue.
 
2882
      The relay_log struct does not move (though some members of it can
 
2883
      change), so we needn't any lock (no rli->data_lock, no log lock).
 
2884
    */
 
2885
    Format_description_log_event* tmp;
 
2886
    const char* errmsg;
 
2887
    if (!(tmp= (Format_description_log_event*)
 
2888
          Log_event::read_log_event(buf, event_len, &errmsg,
 
2889
                                    mi->rli.relay_log.description_event_for_queue)))
 
2890
    {
 
2891
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
2892
      goto err;
 
2893
    }
 
2894
    delete mi->rli.relay_log.description_event_for_queue;
 
2895
    mi->rli.relay_log.description_event_for_queue= tmp;
 
2896
    /*
 
2897
       Though this does some conversion to the slave's format, this will
 
2898
       preserve the master's binlog format version, and number of event types.
 
2899
    */
 
2900
    /*
 
2901
       If the event was not requested by the slave (the slave did not ask for
 
2902
       it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
 
2903
    */
 
2904
    inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
 
2905
  }
 
2906
  break;
 
2907
 
 
2908
  case HEARTBEAT_LOG_EVENT:
 
2909
  {
 
2910
    /*
 
2911
      HB (heartbeat) cannot come before RL (Relay)
 
2912
    */
 
2913
    char  llbuf[22];
 
2914
    Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
 
2915
    if (!hb.is_valid())
 
2916
    {
 
2917
      error= ER_SLAVE_HEARTBEAT_FAILURE;
 
2918
      error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
 
2919
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
 
2920
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
 
2921
      error_msg.append(STRING_WITH_LEN(" log_pos "));
 
2922
      llstr(hb.log_pos, llbuf);
 
2923
      error_msg.append(llbuf, strlen(llbuf));
 
2924
      goto err;
 
2925
    }
 
2926
    mi->received_heartbeats++;
 
2927
    /*
 
2928
       compare local and event's versions of log_file, log_pos.
 
2929
 
 
2930
       Heartbeat is sent only after an event corresponding to the corrdinates
 
2931
       the heartbeat carries.
 
2932
       Slave can not have a difference in coordinates except in the only
 
2933
       special case when mi->master_log_name, master_log_pos have never
 
2934
       been updated by Rotate event i.e when slave does not have any history
 
2935
       with the master (and thereafter mi->master_log_pos is NULL).
 
2936
 
 
2937
       TODO: handling `when' for SHOW SLAVE STATUS' snds behind
 
2938
    */
 
2939
    if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
 
2940
        || mi->getLogPosition() != hb.log_pos)
 
2941
    {
 
2942
      /* missed events of heartbeat from the past */
 
2943
      error= ER_SLAVE_HEARTBEAT_FAILURE;
 
2944
      error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
 
2945
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
 
2946
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
 
2947
      error_msg.append(STRING_WITH_LEN(" log_pos "));
 
2948
      llstr(hb.log_pos, llbuf);
 
2949
      error_msg.append(llbuf, strlen(llbuf));
 
2950
      goto err;
 
2951
    }
 
2952
    goto skip_relay_logging;
 
2953
  }
 
2954
  break;
 
2955
 
 
2956
  default:
 
2957
    inc_pos= event_len;
 
2958
    break;
 
2959
  }
 
2960
 
 
2961
  /*
 
2962
     If this event is originating from this server, don't queue it.
 
2963
     We don't check this for 3.23 events because it's simpler like this; 3.23
 
2964
     will be filtered anyway by the SQL slave thread which also tests the
 
2965
     server id (we must also keep this test in the SQL thread, in case somebody
 
2966
     upgrades a 4.0 slave which has a not-filtered relay log).
 
2967
 
 
2968
     ANY event coming from ourselves can be ignored: it is obvious for queries;
 
2969
     for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
 
2970
     (--log-slave-updates would not log that) unless this slave is also its
 
2971
     direct master (an unsupported, useless setup!).
 
2972
  */
 
2973
 
 
2974
  pthread_mutex_lock(log_lock);
 
2975
 
 
2976
  if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
 
2977
      !mi->rli.replicate_same_server_id)
 
2978
  {
 
2979
    /*
 
2980
      Do not write it to the relay log.
 
2981
      a) We still want to increment mi->master_log_pos, so that we won't
 
2982
      re-read this event from the master if the slave IO thread is now
 
2983
      stopped/restarted (more efficient if the events we are ignoring are big
 
2984
      LOAD DATA INFILE).
 
2985
      b) We want to record that we are skipping events, for the information of
 
2986
      the slave SQL thread, otherwise that thread may let
 
2987
      rli->group_relay_log_pos stay too small if the last binlog's event is
 
2988
      ignored.
 
2989
      But events which were generated by this slave and which do not exist in
 
2990
      the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
 
2991
      mi->master_log_pos.
 
2992
    */
 
2993
    if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
 
2994
        buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
 
2995
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
 
2996
    {
 
2997
      mi->incrementLogPosition(inc_pos);
 
2998
      memcpy(rli->ign_master_log_name_end, mi->getLogName(), FN_REFLEN);
 
2999
      assert(rli->ign_master_log_name_end[0]);
 
3000
      rli->ign_master_log_pos_end= mi->getLogPosition();
 
3001
    }
 
3002
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
 
3003
  }
 
3004
  else
 
3005
  {
 
3006
    /* write the event to the relay log */
 
3007
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
 
3008
    {
 
3009
      mi->incrementLogPosition(inc_pos);
 
3010
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3011
    }
 
3012
    else
 
3013
    {
 
3014
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
3015
    }
 
3016
    rli->ign_master_log_name_end[0]= 0; // last event is not ignored
 
3017
  }
 
3018
  pthread_mutex_unlock(log_lock);
 
3019
 
 
3020
skip_relay_logging:
 
3021
 
 
3022
err:
 
3023
  pthread_mutex_unlock(&mi->data_lock);
 
3024
  if (error)
 
3025
    mi->report(ERROR_LEVEL, error, ER(error),
 
3026
               (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
 
3027
               _("could not queue event from master") :
 
3028
               error_msg.ptr());
 
3029
  return(error);
 
3030
}
 
3031
 
 
3032
 
 
3033
void end_relay_log_info(Relay_log_info* rli)
 
3034
{
 
3035
  if (!rli->inited)
 
3036
    return;
 
3037
  if (rli->info_fd >= 0)
 
3038
  {
 
3039
    end_io_cache(&rli->info_file);
 
3040
    (void) my_close(rli->info_fd, MYF(MY_WME));
 
3041
    rli->info_fd = -1;
 
3042
  }
 
3043
  if (rli->cur_log_fd >= 0)
 
3044
  {
 
3045
    end_io_cache(&rli->cache_buf);
 
3046
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
 
3047
    rli->cur_log_fd = -1;
 
3048
  }
 
3049
  rli->inited = 0;
 
3050
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
3051
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3052
  /*
 
3053
    Delete the slave's temporary tables from memory.
 
3054
    In the future there will be other actions than this, to ensure persistance
 
3055
    of slave's temp tables after shutdown.
 
3056
  */
 
3057
  rli->close_temporary_tables();
 
3058
  return;
 
3059
}
 
3060
 
 
3061
/*
 
3062
  Try to connect until successful or slave killed
 
3063
 
 
3064
  SYNPOSIS
 
3065
    safe_connect()
 
3066
    session                 Thread handler for slave
 
3067
    DRIZZLE               DRIZZLE connection handle
 
3068
    mi                  Replication handle
 
3069
 
 
3070
  RETURN
 
3071
    0   ok
 
3072
    #   Error
 
3073
*/
 
3074
 
 
3075
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi)
 
3076
{
 
3077
  return(connect_to_master(session, drizzle, mi, 0, 0));
 
3078
}
 
3079
 
 
3080
 
 
3081
/*
 
3082
  SYNPOSIS
 
3083
    connect_to_master()
 
3084
 
 
3085
  IMPLEMENTATION
 
3086
    Try to connect until successful or slave killed or we have retried
 
3087
    master_retry_count times
 
3088
*/
 
3089
 
 
3090
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
 
3091
                                 bool reconnect, bool suppress_warnings)
 
3092
{
 
3093
  int32_t slave_was_killed;
 
3094
  int32_t last_errno= -2;                           // impossible error
 
3095
  uint32_t err_count=0;
 
3096
  char llbuff[22];
 
3097
 
 
3098
  mi->events_till_disconnect = disconnect_slave_event_count;
 
3099
  uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
 
3100
  if (opt_slave_compressed_protocol)
 
3101
    client_flag=CLIENT_COMPRESS;                /* We will use compression */
 
3102
 
 
3103
  drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
 
3104
  drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
 
3105
 
 
3106
  while (!(slave_was_killed = io_slave_killed(session,mi)) &&
 
3107
         (reconnect ? drizzle_reconnect(drizzle) != 0 :
 
3108
          drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
 
3109
                             mi->getPort(), 0, client_flag) == 0))
 
3110
  {
 
3111
    /* Don't repeat last error */
 
3112
    if ((int32_t)drizzle_errno(drizzle) != last_errno)
 
3113
    {
 
3114
      last_errno=drizzle_errno(drizzle);
 
3115
      suppress_warnings= 0;
 
3116
      mi->report(ERROR_LEVEL, last_errno,
 
3117
                 _("error %s to master '%s@%s:%d'"
 
3118
                   " - retry-time: %d  retries: %u"),
 
3119
                 (reconnect ? _("reconnecting") : _("connecting")),
 
3120
                 mi->getUsername(), mi->getHostname(), mi->getPort(),
 
3121
                 mi->getConnectionRetry(), master_retry_count);
 
3122
    }
 
3123
    /*
 
3124
      By default we try forever. The reason is that failure will trigger
 
3125
      master election, so if the user did not set master_retry_count we
 
3126
      do not want to have election triggered on the first failure to
 
3127
      connect
 
3128
    */
 
3129
    if (++err_count == master_retry_count)
 
3130
    {
 
3131
      slave_was_killed=1;
 
3132
      break;
 
3133
    }
 
3134
    safe_sleep(session,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
 
3135
               (void*)mi);
 
3136
  }
 
3137
 
 
3138
  if (!slave_was_killed)
 
3139
  {
 
3140
    if (reconnect)
 
3141
    {
 
3142
      if (!suppress_warnings && global_system_variables.log_warnings)
 
3143
        sql_print_information(_("Slave: connected to master '%s@%s:%d', "
 
3144
                                "replication resumed in log '%s' at "
 
3145
                                "position %s"), mi->getUsername(),
 
3146
                                mi->getHostname(), mi->getPort(),
 
3147
                                IO_RPL_LOG_NAME,
 
3148
                                llstr(mi->getLogPosition(),llbuff));
 
3149
    }
 
3150
  }
 
3151
  drizzle->reconnect= 1;
 
3152
  return(slave_was_killed);
 
3153
}
 
3154
 
 
3155
 
 
3156
/*
 
3157
  safe_reconnect()
 
3158
 
 
3159
  IMPLEMENTATION
 
3160
    Try to connect until successful or slave killed or we have retried
 
3161
    master_retry_count times
 
3162
*/
 
3163
 
 
3164
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
 
3165
                          bool suppress_warnings)
 
3166
{
 
3167
  return(connect_to_master(session, drizzle, mi, 1, suppress_warnings));
 
3168
}
 
3169
 
 
3170
 
 
3171
/*
 
3172
  Store the file and position where the execute-slave thread are in the
 
3173
  relay log.
 
3174
 
 
3175
  SYNOPSIS
 
3176
    flush_relay_log_info()
 
3177
    rli                 Relay log information
 
3178
 
 
3179
  NOTES
 
3180
    - As this is only called by the slave thread, we don't need to
 
3181
      have a lock on this.
 
3182
    - If there is an active transaction, then we don't update the position
 
3183
      in the relay log.  This is to ensure that we re-execute statements
 
3184
      if we die in the middle of an transaction that was rolled back.
 
3185
    - As a transaction never spans binary logs, we don't have to handle the
 
3186
      case where we do a relay-log-rotation in the middle of the transaction.
 
3187
      If this would not be the case, we would have to ensure that we
 
3188
      don't delete the relay log file where the transaction started when
 
3189
      we switch to a new relay log file.
 
3190
 
 
3191
  TODO
 
3192
    - Change the log file information to a binary format to avoid calling
 
3193
      int64_t2str.
 
3194
 
 
3195
  RETURN VALUES
 
3196
    0   ok
 
3197
    1   write error
 
3198
*/
 
3199
 
 
3200
bool flush_relay_log_info(Relay_log_info* rli)
 
3201
{
 
3202
  bool error= 0;
 
3203
 
 
3204
  if (unlikely(rli->no_storage))
 
3205
    return(0);
 
3206
 
 
3207
  return(error);
 
3208
}
 
3209
 
 
3210
 
 
3211
/*
 
3212
  Called when we notice that the current "hot" log got rotated under our feet.
 
3213
*/
 
3214
 
 
3215
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
 
3216
{
 
3217
  assert(rli->cur_log != &rli->cache_buf);
 
3218
  assert(rli->cur_log_fd == -1);
 
3219
 
 
3220
  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
 
3221
  if ((rli->cur_log_fd=open_binlog(cur_log, rli->event_relay_log_name.c_str(), errmsg)) < 0)
 
3222
    return(0);
 
3223
  /*
 
3224
    We want to start exactly where we was before:
 
3225
    relay_log_pos       Current log pos
 
3226
    pending             Number of bytes already processed from the event
 
3227
  */
 
3228
  rli->event_relay_log_pos= cmax(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
 
3229
  my_b_seek(cur_log,rli->event_relay_log_pos);
 
3230
  return(cur_log);
 
3231
}
 
3232
 
 
3233
 
 
3234
static Log_event* next_event(Relay_log_info* rli)
 
3235
{
 
3236
  Log_event* ev;
 
3237
  IO_CACHE* cur_log = rli->cur_log;
 
3238
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
 
3239
  const char* errmsg=0;
 
3240
  Session* session = rli->sql_session;
 
3241
 
 
3242
  assert(session != 0);
 
3243
 
 
3244
  if (abort_slave_event_count && !rli->events_till_abort--)
 
3245
    return(0);
 
3246
 
 
3247
  /*
 
3248
    For most operations we need to protect rli members with data_lock,
 
3249
    so we assume calling function acquired this mutex for us and we will
 
3250
    hold it for the most of the loop below However, we will release it
 
3251
    whenever it is worth the hassle,  and in the cases when we go into a
 
3252
    pthread_cond_wait() with the non-data_lock mutex
 
3253
  */
 
3254
  safe_mutex_assert_owner(&rli->data_lock);
 
3255
 
 
3256
  while (!sql_slave_killed(session,rli))
 
3257
  {
 
3258
    /*
 
3259
      We can have two kinds of log reading:
 
3260
      hot_log:
 
3261
        rli->cur_log points at the IO_CACHE of relay_log, which
 
3262
        is actively being updated by the I/O thread. We need to be careful
 
3263
        in this case and make sure that we are not looking at a stale log that
 
3264
        has already been rotated. If it has been, we reopen the log.
 
3265
 
 
3266
      The other case is much simpler:
 
3267
        We just have a read only log that nobody else will be updating.
 
3268
    */
 
3269
    bool hot_log;
 
3270
    if ((hot_log = (cur_log != &rli->cache_buf)))
 
3271
    {
 
3272
      assert(rli->cur_log_fd == -1); // foreign descriptor
 
3273
      pthread_mutex_lock(log_lock);
 
3274
 
 
3275
      /*
 
3276
        Reading xxx_file_id is safe because the log will only
 
3277
        be rotated when we hold relay_log.LOCK_log
 
3278
      */
 
3279
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
 
3280
      {
 
3281
        // The master has switched to a new log file; Reopen the old log file
 
3282
        cur_log=reopen_relay_log(rli, &errmsg);
 
3283
        pthread_mutex_unlock(log_lock);
 
3284
        if (!cur_log)                           // No more log files
 
3285
          goto err;
 
3286
        hot_log=0;                              // Using old binary log
 
3287
      }
 
3288
    }
 
3289
    /*
 
3290
      As there is no guarantee that the relay is open (for example, an I/O
 
3291
      error during a write by the slave I/O thread may have closed it), we
 
3292
      have to test it.
 
3293
    */
 
3294
    if (!my_b_inited(cur_log))
 
3295
      goto err;
 
3296
    assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
 
3297
    assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
 
3298
 
 
3299
    /*
 
3300
      Relay log is always in new format - if the master is 3.23, the
 
3301
      I/O thread will convert the format for us.
 
3302
      A problem: the description event may be in a previous relay log. So if
 
3303
      the slave has been shutdown meanwhile, we would have to look in old relay
 
3304
      logs, which may even have been deleted. So we need to write this
 
3305
      description event at the beginning of the relay log.
 
3306
      When the relay log is created when the I/O thread starts, easy: the
 
3307
      master will send the description event and we will queue it.
 
3308
      But if the relay log is created by new_file(): then the solution is:
 
3309
      DRIZZLE_BIN_LOG::open() will write the buffered description event.
 
3310
    */
 
3311
    if ((ev=Log_event::read_log_event(cur_log,0,
 
3312
                                      rli->relay_log.description_event_for_exec)))
 
3313
 
 
3314
    {
 
3315
      assert(session==rli->sql_session);
 
3316
      /*
 
3317
        read it while we have a lock, to avoid a mutex lock in
 
3318
        inc_event_relay_log_pos()
 
3319
      */
 
3320
      rli->future_event_relay_log_pos= my_b_tell(cur_log);
 
3321
      if (hot_log)
 
3322
        pthread_mutex_unlock(log_lock);
 
3323
      return(ev);
 
3324
    }
 
3325
    assert(session==rli->sql_session);
 
3326
    if (opt_reckless_slave)                     // For mysql-test
 
3327
      cur_log->error = 0;
 
3328
    if (cur_log->error < 0)
 
3329
    {
 
3330
      errmsg = "slave SQL thread aborted because of I/O error";
 
3331
      if (hot_log)
 
3332
        pthread_mutex_unlock(log_lock);
 
3333
      goto err;
 
3334
    }
 
3335
    if (!cur_log->error) /* EOF */
 
3336
    {
 
3337
      /*
 
3338
        On a hot log, EOF means that there are no more updates to
 
3339
        process and we must block until I/O thread adds some and
 
3340
        signals us to continue
 
3341
      */
 
3342
      if (hot_log)
 
3343
      {
 
3344
        /*
 
3345
          We say in Seconds_Behind_Master that we have "caught up". Note that
 
3346
          for example if network link is broken but I/O slave thread hasn't
 
3347
          noticed it (slave_net_timeout not elapsed), then we'll say "caught
 
3348
          up" whereas we're not really caught up. Fixing that would require
 
3349
          internally cutting timeout in smaller pieces in network read, no
 
3350
          thanks. Another example: SQL has caught up on I/O, now I/O has read
 
3351
          a new event and is queuing it; the false "0" will exist until SQL
 
3352
          finishes executing the new event; it will be look abnormal only if
 
3353
          the events have old timestamps (then you get "many", 0, "many").
 
3354
 
 
3355
          Transient phases like this can be fixed with implemeting
 
3356
          Heartbeat event which provides the slave the status of the
 
3357
          master at time the master does not have any new update to send.
 
3358
          Seconds_Behind_Master would be zero only when master has no
 
3359
          more updates in binlog for slave. The heartbeat can be sent
 
3360
          in a (small) fraction of slave_net_timeout. Until it's done
 
3361
          rli->last_master_timestamp is temporarely (for time of
 
3362
          waiting for the following event) reset whenever EOF is
 
3363
          reached.
 
3364
        */
 
3365
        time_t save_timestamp= rli->last_master_timestamp;
 
3366
        rli->last_master_timestamp= 0;
 
3367
 
 
3368
        assert(rli->relay_log.get_open_count() ==
 
3369
                    rli->cur_log_old_open_count);
 
3370
 
 
3371
        if (rli->ign_master_log_name_end[0])
 
3372
        {
 
3373
          /* We generate and return a Rotate, to make our positions advance */
 
3374
          ev= new Rotate_log_event(rli->ign_master_log_name_end,
 
3375
                                   0, rli->ign_master_log_pos_end,
 
3376
                                   Rotate_log_event::DUP_NAME);
 
3377
          rli->ign_master_log_name_end[0]= 0;
 
3378
          pthread_mutex_unlock(log_lock);
 
3379
          if (unlikely(!ev))
 
3380
          {
 
3381
            errmsg= "Slave SQL thread failed to create a Rotate event "
 
3382
              "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
 
3383
            goto err;
 
3384
          }
 
3385
          ev->server_id= 0; // don't be ignored by slave SQL thread
 
3386
          return(ev);
 
3387
        }
 
3388
 
 
3389
        /*
 
3390
          We can, and should release data_lock while we are waiting for
 
3391
          update. If we do not, show slave status will block
 
3392
        */
 
3393
        pthread_mutex_unlock(&rli->data_lock);
 
3394
 
 
3395
        /*
 
3396
          Possible deadlock :
 
3397
          - the I/O thread has reached log_space_limit
 
3398
          - the SQL thread has read all relay logs, but cannot purge for some
 
3399
          reason:
 
3400
            * it has already purged all logs except the current one
 
3401
            * there are other logs than the current one but they're involved in
 
3402
            a transaction that finishes in the current one (or is not finished)
 
3403
          Solution :
 
3404
          Wake up the possibly waiting I/O thread, and set a boolean asking
 
3405
          the I/O thread to temporarily ignore the log_space_limit
 
3406
          constraint, because we do not want the I/O thread to block because of
 
3407
          space (it's ok if it blocks for any other reason (e.g. because the
 
3408
          master does not send anything). Then the I/O thread stops waiting
 
3409
          and reads more events.
 
3410
          The SQL thread decides when the I/O thread should take log_space_limit
 
3411
          into account again : ignore_log_space_limit is reset to 0
 
3412
          in purge_first_log (when the SQL thread purges the just-read relay
 
3413
          log), and also when the SQL thread starts. We should also reset
 
3414
          ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
 
3415
          fact, no need as RESET SLAVE requires that the slave
 
3416
          be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
 
3417
          it stops.
 
3418
        */
 
3419
        pthread_mutex_lock(&rli->log_space_lock);
 
3420
        // prevent the I/O thread from blocking next times
 
3421
        rli->ignore_log_space_limit= 1;
 
3422
        /*
 
3423
          If the I/O thread is blocked, unblock it.  Ok to broadcast
 
3424
          after unlock, because the mutex is only destroyed in
 
3425
          ~Relay_log_info(), i.e. when rli is destroyed, and rli will
 
3426
          not be destroyed before we exit the present function.
 
3427
        */
 
3428
        pthread_mutex_unlock(&rli->log_space_lock);
 
3429
        pthread_cond_broadcast(&rli->log_space_cond);
 
3430
        // Note that wait_for_update_relay_log unlocks lock_log !
 
3431
        rli->relay_log.wait_for_update_relay_log(rli->sql_session);
 
3432
        // re-acquire data lock since we released it earlier
 
3433
        pthread_mutex_lock(&rli->data_lock);
 
3434
        rli->last_master_timestamp= save_timestamp;
 
3435
        continue;
 
3436
      }
 
3437
      /*
 
3438
        If the log was not hot, we need to move to the next log in
 
3439
        sequence. The next log could be hot or cold, we deal with both
 
3440
        cases separately after doing some common initialization
 
3441
      */
 
3442
      end_io_cache(cur_log);
 
3443
      assert(rli->cur_log_fd >= 0);
 
3444
      my_close(rli->cur_log_fd, MYF(MY_WME));
 
3445
      rli->cur_log_fd = -1;
 
3446
 
 
3447
      if (relay_log_purge)
 
3448
      {
 
3449
        /*
 
3450
          purge_first_log will properly set up relay log coordinates in rli.
 
3451
          If the group's coordinates are equal to the event's coordinates
 
3452
          (i.e. the relay log was not rotated in the middle of a group),
 
3453
          we can purge this relay log too.
 
3454
          We do uint64_t and string comparisons, this may be slow but
 
3455
          - purging the last relay log is nice (it can save 1GB of disk), so we
 
3456
          like to detect the case where we can do it, and given this,
 
3457
          - I see no better detection method
 
3458
          - purge_first_log is not called that often
 
3459
        */
 
3460
        if (rli->relay_log.purge_first_log
 
3461
            (rli,
 
3462
             rli->group_relay_log_pos == rli->event_relay_log_pos
 
3463
             && !strcmp(rli->group_relay_log_name.c_str(), rli->event_relay_log_name.c_str())))
 
3464
        {
 
3465
          errmsg = "Error purging processed logs";
 
3466
          goto err;
 
3467
        }
 
3468
      }
 
3469
      else
 
3470
      {
 
3471
        /*
 
3472
          If hot_log is set, then we already have a lock on
 
3473
          LOCK_log.  If not, we have to get the lock.
 
3474
 
 
3475
          According to Sasha, the only time this code will ever be executed
 
3476
          is if we are recovering from a bug.
 
3477
        */
 
3478
        if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
 
3479
        {
 
3480
          errmsg = "error switching to the next log";
 
3481
          goto err;
 
3482
        }
 
3483
        rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
 
3484
        rli->event_relay_log_name.assign(rli->linfo.log_file_name);
 
3485
        flush_relay_log_info(rli);
 
3486
      }
 
3487
 
 
3488
      /*
 
3489
        Now we want to open this next log. To know if it's a hot log (the one
 
3490
        being written by the I/O thread now) or a cold log, we can use
 
3491
        is_active(); if it is hot, we use the I/O cache; if it's cold we open
 
3492
        the file normally. But if is_active() reports that the log is hot, this
 
3493
        may change between the test and the consequence of the test. So we may
 
3494
        open the I/O cache whereas the log is now cold, which is nonsense.
 
3495
        To guard against this, we need to have LOCK_log.
 
3496
      */
 
3497
 
 
3498
      if (!hot_log) /* if hot_log, we already have this mutex */
 
3499
        pthread_mutex_lock(log_lock);
 
3500
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
 
3501
      {
 
3502
#ifdef EXTRA_DEBUG
 
3503
        if (global_system_variables.log_warnings)
 
3504
          sql_print_information(_("next log '%s' is currently active"),
 
3505
                                rli->linfo.log_file_name);
 
3506
#endif
 
3507
        rli->cur_log= cur_log= rli->relay_log.get_log_file();
 
3508
        rli->cur_log_old_open_count= rli->relay_log.get_open_count();
 
3509
        assert(rli->cur_log_fd == -1);
 
3510
 
 
3511
        /*
 
3512
          Read pointer has to be at the start since we are the only
 
3513
          reader.
 
3514
          We must keep the LOCK_log to read the 4 first bytes, as this is a hot
 
3515
          log (same as when we call read_log_event() above: for a hot log we
 
3516
          take the mutex).
 
3517
        */
 
3518
        if (check_binlog_magic(cur_log,&errmsg))
 
3519
        {
 
3520
          if (!hot_log) pthread_mutex_unlock(log_lock);
 
3521
          goto err;
 
3522
        }
 
3523
        if (!hot_log) pthread_mutex_unlock(log_lock);
 
3524
        continue;
 
3525
      }
 
3526
      if (!hot_log) pthread_mutex_unlock(log_lock);
 
3527
      /*
 
3528
        if we get here, the log was not hot, so we will have to open it
 
3529
        ourselves. We are sure that the log is still not hot now (a log can get
 
3530
        from hot to cold, but not from cold to hot). No need for LOCK_log.
 
3531
      */
 
3532
#ifdef EXTRA_DEBUG
 
3533
      if (global_system_variables.log_warnings)
 
3534
        sql_print_information(_("next log '%s' is not active"),
 
3535
                              rli->linfo.log_file_name);
 
3536
#endif
 
3537
      // open_binlog() will check the magic header
 
3538
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
 
3539
                                       &errmsg)) <0)
 
3540
        goto err;
 
3541
    }
 
3542
    else
 
3543
    {
 
3544
      /*
 
3545
        Read failed with a non-EOF error.
 
3546
        TODO: come up with something better to handle this error
 
3547
      */
 
3548
      if (hot_log)
 
3549
        pthread_mutex_unlock(log_lock);
 
3550
      sql_print_error(_("Slave SQL thread: I/O error reading "
 
3551
                        "event(errno: %d  cur_log->error: %d)"),
 
3552
                      my_errno,cur_log->error);
 
3553
      // set read position to the beginning of the event
 
3554
      my_b_seek(cur_log,rli->event_relay_log_pos);
 
3555
      /* otherwise, we have had a partial read */
 
3556
      errmsg = _("Aborting slave SQL thread because of partial event read");
 
3557
      break;                                    // To end of function
 
3558
    }
 
3559
  }
 
3560
  if (!errmsg && global_system_variables.log_warnings)
 
3561
  {
 
3562
    sql_print_information(_("Error reading relay log event: %s"),
 
3563
                          _("slave SQL thread was killed"));
 
3564
    return(0);
 
3565
  }
 
3566
 
 
3567
err:
 
3568
  if (errmsg)
 
3569
    sql_print_error(_("Error reading relay log event: %s"), errmsg);
 
3570
  return(0);
 
3571
}
 
3572
 
 
3573
/*
 
3574
  Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
 
3575
  because of size is simpler because when we do it we already have all relevant
 
3576
  locks; here we don't, so this function is mainly taking locks).
 
3577
  Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
 
3578
  is void).
 
3579
*/
 
3580
 
 
3581
void rotate_relay_log(Master_info* mi)
 
3582
{
 
3583
  Relay_log_info* rli= &mi->rli;
 
3584
 
 
3585
  /* We don't lock rli->run_lock. This would lead to deadlocks. */
 
3586
  pthread_mutex_lock(&mi->run_lock);
 
3587
 
 
3588
  /*
 
3589
     We need to test inited because otherwise, new_file() will attempt to lock
 
3590
     LOCK_log, which may not be inited (if we're not a slave).
 
3591
  */
 
3592
  if (!rli->inited)
 
3593
  {
 
3594
    goto end;
 
3595
  }
 
3596
 
 
3597
  /* If the relay log is closed, new_file() will do nothing. */
 
3598
  rli->relay_log.new_file();
 
3599
 
 
3600
  /*
 
3601
    We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
 
3602
    be counted, so imagine a succession of FLUSH LOGS  and assume the slave
 
3603
    threads are started:
 
3604
    relay_log_space decreases by the size of the deleted relay log, but does
 
3605
    not increase, so flush-after-flush we may become negative, which is wrong.
 
3606
    Even if this will be corrected as soon as a query is replicated on the
 
3607
    slave (because the I/O thread will then call harvest_bytes_written() which
 
3608
    will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
 
3609
    output in SHOW SLAVE STATUS meanwhile. So we harvest now.
 
3610
    If the log is closed, then this will just harvest the last writes, probably
 
3611
    0 as they probably have been harvested.
 
3612
  */
 
3613
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3614
end:
 
3615
  pthread_mutex_unlock(&mi->run_lock);
 
3616
  return;
 
3617
}
 
3618
 
 
3619
 
 
3620
/**
 
3621
   Detects, based on master's version (as found in the relay log), if master
 
3622
   has a certain bug.
 
3623
   @param rli Relay_log_info which tells the master's version
 
3624
   @param bug_id Number of the bug as found in bugs.mysql.com
 
3625
   @param report bool report error message, default TRUE
 
3626
   @return true if master has the bug, FALSE if it does not.
 
3627
*/
 
3628
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
 
3629
{
 
3630
  struct st_version_range_for_one_bug {
 
3631
    uint32_t        bug_id;
 
3632
    const unsigned char introduced_in[3]; // first version with bug
 
3633
    const unsigned char fixed_in[3];      // first version with fix
 
3634
  };
 
3635
  static struct st_version_range_for_one_bug versions_for_all_bugs[]=
 
3636
  {
 
3637
    {24432, { 5, 0, 24 }, { 5, 0, 38 } },
 
3638
    {24432, { 5, 1, 12 }, { 5, 1, 17 } },
 
3639
    {33029, { 5, 0,  0 }, { 5, 0, 58 } },
 
3640
    {33029, { 5, 1,  0 }, { 5, 1, 12 } },
 
3641
  };
 
3642
  const unsigned char *master_ver=
 
3643
    rli->relay_log.description_event_for_exec->server_version_split;
 
3644
 
 
3645
  assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
 
3646
 
 
3647
  for (uint32_t i= 0;
 
3648
       i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
 
3649
  {
 
3650
    const unsigned char *introduced_in= versions_for_all_bugs[i].introduced_in,
 
3651
      *fixed_in= versions_for_all_bugs[i].fixed_in;
 
3652
    if ((versions_for_all_bugs[i].bug_id == bug_id) &&
 
3653
        (memcmp(introduced_in, master_ver, 3) <= 0) &&
 
3654
        (memcmp(fixed_in,      master_ver, 3) >  0))
 
3655
    {
 
3656
      if (!report)
 
3657
        return true;
 
3658
 
 
3659
      // a short message for SHOW SLAVE STATUS (message length constraints)
 
3660
      my_printf_error(ER_UNKNOWN_ERROR,
 
3661
                      _("master may suffer from"
 
3662
                        " http://bugs.mysql.com/bug.php?id=%u"
 
3663
                        " so slave stops; check error log on slave"
 
3664
                        " for more info"), MYF(0), bug_id);
 
3665
      // a verbose message for the error log
 
3666
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
 
3667
                  _("According to the master's version ('%s'),"
 
3668
                    " it is probable that master suffers from this bug:"
 
3669
                    " http://bugs.mysql.com/bug.php?id=%u"
 
3670
                    " and thus replicating the current binary log event"
 
3671
                    " may make the slave's data become different from the"
 
3672
                    " master's data."
 
3673
                    " To take no risk, slave refuses to replicate"
 
3674
                    " this event and stops."
 
3675
                    " We recommend that all updates be stopped on the"
 
3676
                    " master and slave, that the data of both be"
 
3677
                    " manually synchronized,"
 
3678
                    " that master's binary logs be deleted,"
 
3679
                    " that master be upgraded to a version at least"
 
3680
                    " equal to '%d.%d.%d'. Then replication can be"
 
3681
                    " restarted."),
 
3682
                  rli->relay_log.description_event_for_exec->server_version,
 
3683
                  bug_id,
 
3684
                  fixed_in[0], fixed_in[1], fixed_in[2]);
 
3685
      return true;
 
3686
    }
 
3687
  }
 
3688
  return false;
 
3689
}
 
3690
 
 
3691
/**
 
3692
   BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
 
3693
   exclusive, if one statement in a SP generated AUTO_INCREMENT value
 
3694
   by the top statement, all statements after it would be considered
 
3695
   generated AUTO_INCREMENT value by the top statement, and a
 
3696
   erroneous INSERT_ID value might be associated with these statement,
 
3697
   which could cause duplicate entry error and stop the slave.
 
3698
 
 
3699
   Detect buggy master to work around.
 
3700
 */
 
3701
bool rpl_master_erroneous_autoinc(Session *session)
 
3702
{
 
3703
  if (active_mi && active_mi->rli.sql_session == session)
 
3704
  {
 
3705
    Relay_log_info *rli= &active_mi->rli;
 
3706
    return rpl_master_has_bug(rli, 33029, false);
 
3707
  }
 
3708
  return false;
 
3709
}
 
3710
 
 
3711
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
3712
template class I_List_iterator<i_string>;
 
3713
template class I_List_iterator<i_string_pair>;
 
3714
#endif
 
3715
 
 
3716
/**
 
3717
  @} (end of group Replication)
 
3718
*/