~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/slave.cc

  • Committer: Monty Taylor
  • Date: 2009-04-26 19:40:09 UTC
  • mto: (997.2.25 mordred)
  • mto: This revision was merged to the branch mainline in revision 1003.
  • Revision ID: mordred@inaugust.com-20090426194009-dhnrewc3dyc2zv5a
Fixed an or tatement.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2000-2003 DRIZZLE AB
2
 
 
3
 
   This program is free software; you can redistribute it and/or modify
4
 
   it under the terms of the GNU General Public License as published by
5
 
   the Free Software Foundation; version 2 of the License.
6
 
 
7
 
   This program is distributed in the hope that it will be useful,
8
 
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 
   GNU General Public License for more details.
11
 
 
12
 
   You should have received a copy of the GNU General Public License
13
 
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
 
 
16
 
 
17
 
/**
18
 
  @addtogroup Replication
19
 
  @{
20
 
 
21
 
  @file
22
 
 
23
 
  @brief Code to run the io thread and the sql thread on the
24
 
  replication slave.
25
 
*/
26
 
#include <drizzled/server_includes.h>
27
 
 
28
 
#include <storage/myisam/myisam.h>
29
 
#include <drizzled/rpl_mi.h>
30
 
#include <drizzled/rpl_rli.h>
31
 
#include <drizzled/sql_repl.h>
32
 
#include <drizzled/rpl_filter.h>
33
 
#include <mysys/thr_alarm.h>
34
 
#include <libdrizzle/errmsg.h>
35
 
#include <mysys/mysys_err.h>
36
 
#include <drizzled/error.h>
37
 
#include <drizzled/sql_parse.h>
38
 
#include <drizzled/gettext.h>
39
 
#include <signal.h>
40
 
 
41
 
#if TIME_WITH_SYS_TIME
42
 
# include <sys/time.h>
43
 
# include <time.h>
44
 
#else
45
 
# if HAVE_SYS_TIME_H
46
 
#  include <sys/time.h>
47
 
# else
48
 
#  include <time.h>
49
 
# endif
50
 
#endif
51
 
 
52
 
#include <drizzled/tztime.h>
53
 
 
54
 
#include "rpl_tblmap.h"
55
 
 
56
 
#define MAX_SLAVE_RETRY_PAUSE 5
57
 
bool use_slave_mask = 0;
58
 
MY_BITMAP slave_error_mask;
59
 
 
60
 
typedef bool (*CHECK_KILLED_FUNC)(Session*,void*);
61
 
 
62
 
char* slave_load_tmpdir = 0;
63
 
Master_info *active_mi= 0;
64
 
bool replicate_same_server_id;
65
 
uint64_t relay_log_space_limit = 0;
66
 
 
67
 
/*
68
 
  When slave thread exits, we need to remember the temporary tables so we
69
 
  can re-use them on slave start.
70
 
 
71
 
  TODO: move the vars below under Master_info
72
 
*/
73
 
 
74
 
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
75
 
int32_t events_till_abort = -1;
76
 
 
77
 
enum enum_slave_reconnect_actions
78
 
{
79
 
  SLAVE_RECON_ACT_REG= 0,
80
 
  SLAVE_RECON_ACT_DUMP= 1,
81
 
  SLAVE_RECON_ACT_EVENT= 2,
82
 
  SLAVE_RECON_ACT_MAX
83
 
};
84
 
 
85
 
enum enum_slave_reconnect_messages
86
 
{
87
 
  SLAVE_RECON_MSG_WAIT= 0,
88
 
  SLAVE_RECON_MSG_KILLED_WAITING= 1,
89
 
  SLAVE_RECON_MSG_AFTER= 2,
90
 
  SLAVE_RECON_MSG_FAILED= 3,
91
 
  SLAVE_RECON_MSG_COMMAND= 4,
92
 
  SLAVE_RECON_MSG_KILLED_AFTER= 5,
93
 
  SLAVE_RECON_MSG_MAX
94
 
};
95
 
 
96
 
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
97
 
{
98
 
  {
99
 
    N_("Waiting to reconnect after a failed registration on master"),
100
 
    N_("Slave I/O thread killed while waitnig to reconnect after a "
101
 
                 "failed registration on master"),
102
 
    N_("Reconnecting after a failed registration on master"),
103
 
    N_("failed registering on master, reconnecting to try again, "
104
 
                 "log '%s' at postion %s"),
105
 
    "COM_REGISTER_SLAVE",
106
 
    N_("Slave I/O thread killed during or after reconnect")
107
 
  },
108
 
  {
109
 
    N_("Waiting to reconnect after a failed binlog dump request"),
110
 
    N_("Slave I/O thread killed while retrying master dump"),
111
 
    N_("Reconnecting after a failed binlog dump request"),
112
 
    N_("failed dump request, reconnecting to try again, "
113
 
                 "log '%s' at postion %s"),
114
 
    "COM_BINLOG_DUMP",
115
 
    N_("Slave I/O thread killed during or after reconnect")
116
 
  },
117
 
  {
118
 
    N_("Waiting to reconnect after a failed master event read"),
119
 
    N_("Slave I/O thread killed while waiting to reconnect "
120
 
                 "after a failed read"),
121
 
    N_("Reconnecting after a failed master event read"),
122
 
    N_("Slave I/O thread: Failed reading log event, "
123
 
                 "reconnecting to retry, log '%s' at postion %s"),
124
 
    "",
125
 
    N_("Slave I/O thread killed during or after a "
126
 
                 "reconnect done to recover from failed read")
127
 
  }
128
 
};
129
 
 
130
 
 
131
 
typedef enum { SLAVE_Session_IO, SLAVE_Session_SQL} SLAVE_Session_TYPE;
132
 
 
133
 
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
134
 
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
135
 
static bool wait_for_relay_log_space(Relay_log_info* rli);
136
 
static inline bool io_slave_killed(Session* session,Master_info* mi);
137
 
static inline bool sql_slave_killed(Session* session,Relay_log_info* rli);
138
 
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type);
139
 
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi);
140
 
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
141
 
                          bool suppress_warnings);
142
 
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
143
 
                             bool reconnect, bool suppress_warnings);
144
 
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
145
 
                      void* thread_killed_arg);
146
 
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
147
 
static Log_event* next_event(Relay_log_info* rli);
148
 
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
149
 
static int32_t terminate_slave_thread(Session *session,
150
 
                                  pthread_mutex_t* term_lock,
151
 
                                  pthread_cond_t* term_cond,
152
 
                                  volatile uint32_t *slave_running,
153
 
                                  bool skip_lock);
154
 
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info);
155
 
 
156
 
/*
157
 
  Find out which replications threads are running
158
 
 
159
 
  SYNOPSIS
160
 
    init_thread_mask()
161
 
    mask                Return value here
162
 
    mi                  master_info for slave
163
 
    inverse             If set, returns which threads are not running
164
 
 
165
 
  IMPLEMENTATION
166
 
    Get a bit mask for which threads are running so that we can later restart
167
 
    these threads.
168
 
 
169
 
  RETURN
170
 
    mask        If inverse == 0, running threads
171
 
                If inverse == 1, stopped threads
172
 
*/
173
 
 
174
 
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
175
 
{
176
 
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
177
 
  register int32_t tmp_mask=0;
178
 
 
179
 
  if (set_io)
180
 
    tmp_mask |= SLAVE_IO;
181
 
  if (set_sql)
182
 
    tmp_mask |= SLAVE_SQL;
183
 
  if (inverse)
184
 
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
185
 
  *mask = tmp_mask;
186
 
  return;
187
 
}
188
 
 
189
 
 
190
 
/*
191
 
  lock_slave_threads()
192
 
*/
193
 
 
194
 
void lock_slave_threads(Master_info* mi)
195
 
{
196
 
  //TODO: see if we can do this without dual mutex
197
 
  pthread_mutex_lock(&mi->run_lock);
198
 
  pthread_mutex_lock(&mi->rli.run_lock);
199
 
  return;
200
 
}
201
 
 
202
 
 
203
 
/*
204
 
  unlock_slave_threads()
205
 
*/
206
 
 
207
 
void unlock_slave_threads(Master_info* mi)
208
 
{
209
 
  //TODO: see if we can do this without dual mutex
210
 
  pthread_mutex_unlock(&mi->rli.run_lock);
211
 
  pthread_mutex_unlock(&mi->run_lock);
212
 
  return;
213
 
}
214
 
 
215
 
 
216
 
/* Initialize slave structures */
217
 
 
218
 
int32_t init_slave()
219
 
{
220
 
  /*
221
 
    This is called when mysqld starts. Before client connections are
222
 
    accepted. However bootstrap may conflict with us if it does START SLAVE.
223
 
    So it's safer to take the lock.
224
 
  */
225
 
  pthread_mutex_lock(&LOCK_active_mi);
226
 
  /*
227
 
    TODO: re-write this to interate through the list of files
228
 
    for multi-master
229
 
  */
230
 
  active_mi= new Master_info;
231
 
 
232
 
  /*
233
 
    If master_host is not specified, try to read it from the master_info file.
234
 
    If master_host is specified, create the master_info file if it doesn't
235
 
    exists.
236
 
  */
237
 
  if (!active_mi)
238
 
  {
239
 
    sql_print_error(_("Failed to allocate memory for the master info structure"));
240
 
    goto err;
241
 
  }
242
 
 
243
 
  if (active_mi->init_master_info(master_info_file, relay_log_info_file, (SLAVE_IO | SLAVE_SQL)))
244
 
  {
245
 
    sql_print_error(_("Failed to initialize the master info structure"));
246
 
    goto err;
247
 
  }
248
 
 
249
 
  /* If server id is not set, start_slave_thread() will say it */
250
 
 
251
 
  if (active_mi->host[0] && !opt_skip_slave_start)
252
 
  {
253
 
    if (start_slave_threads(1 /* need mutex */,
254
 
                            0 /* no wait for start*/,
255
 
                            active_mi,
256
 
                            master_info_file,
257
 
                            relay_log_info_file,
258
 
                            SLAVE_IO | SLAVE_SQL))
259
 
    {
260
 
      sql_print_error(_("Failed to create slave threads"));
261
 
      goto err;
262
 
    }
263
 
  }
264
 
  pthread_mutex_unlock(&LOCK_active_mi);
265
 
  return(0);
266
 
 
267
 
err:
268
 
  pthread_mutex_unlock(&LOCK_active_mi);
269
 
  return(1);
270
 
}
271
 
 
272
 
 
273
 
/*
274
 
  Init function to set up array for errors that should be skipped for slave
275
 
 
276
 
  SYNOPSIS
277
 
    init_slave_skip_errors()
278
 
    arg         List of errors numbers to skip, separated with ','
279
 
 
280
 
  NOTES
281
 
    Called from get_options() in mysqld.cc on start-up
282
 
*/
283
 
 
284
 
void init_slave_skip_errors(const char* arg)
285
 
{
286
 
  const char *p;
287
 
 
288
 
  if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
289
 
  {
290
 
    fprintf(stderr, "Badly out of memory, please check your system status\n");
291
 
    exit(1);
292
 
  }
293
 
  use_slave_mask = 1;
294
 
  for (;my_isspace(system_charset_info,*arg);++arg)
295
 
    /* empty */;
296
 
  if (!my_strnncoll(system_charset_info,(unsigned char*)arg,4,(const unsigned char*)"all",4))
297
 
  {
298
 
    bitmap_set_all(&slave_error_mask);
299
 
    return;
300
 
  }
301
 
  for (p= arg ; *p; )
302
 
  {
303
 
    long err_code;
304
 
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
305
 
      break;
306
 
    if (err_code < MAX_SLAVE_ERROR)
307
 
       bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
308
 
    while (!my_isdigit(system_charset_info,*p) && *p)
309
 
      p++;
310
 
  }
311
 
  return;
312
 
}
313
 
 
314
 
 
315
 
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
316
 
{
317
 
  if (!mi->inited)
318
 
    return(0); /* successfully do nothing */
319
 
  int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
320
 
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
321
 
 
322
 
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
323
 
  {
324
 
    mi->abort_slave=1;
325
 
    if ((error=terminate_slave_thread(mi->io_session,io_lock,
326
 
                                      &mi->stop_cond,
327
 
                                      &mi->slave_running,
328
 
                                      skip_lock)) &&
329
 
        !force_all)
330
 
      return(error);
331
 
  }
332
 
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
333
 
  {
334
 
    mi->rli.abort_slave=1;
335
 
    if ((error=terminate_slave_thread(mi->rli.sql_session,sql_lock,
336
 
                                      &mi->rli.stop_cond,
337
 
                                      &mi->rli.slave_running,
338
 
                                      skip_lock)) &&
339
 
        !force_all)
340
 
      return(error);
341
 
  }
342
 
  return(0);
343
 
}
344
 
 
345
 
 
346
 
/**
347
 
   Wait for a slave thread to terminate.
348
 
 
349
 
   This function is called after requesting the thread to terminate
350
 
   (by setting @c abort_slave member of @c Relay_log_info or @c
351
 
   Master_info structure to 1). Termination of the thread is
352
 
   controlled with the the predicate <code>*slave_running</code>.
353
 
 
354
 
   Function will acquire @c term_lock before waiting on the condition
355
 
   unless @c skip_lock is true in which case the mutex should be owned
356
 
   by the caller of this function and will remain acquired after
357
 
   return from the function.
358
 
 
359
 
   @param term_lock
360
 
          Associated lock to use when waiting for @c term_cond
361
 
 
362
 
   @param term_cond
363
 
          Condition that is signalled when the thread has terminated
364
 
 
365
 
   @param slave_running
366
 
          Pointer to predicate to check for slave thread termination
367
 
 
368
 
   @param skip_lock
369
 
          If @c true the lock will not be acquired before waiting on
370
 
          the condition. In this case, it is assumed that the calling
371
 
          function acquires the lock before calling this function.
372
 
 
373
 
   @retval 0 All OK
374
 
 */
375
 
static int32_t
376
 
terminate_slave_thread(Session *session,
377
 
                       pthread_mutex_t* term_lock,
378
 
                       pthread_cond_t* term_cond,
379
 
                       volatile uint32_t *slave_running,
380
 
                       bool skip_lock)
381
 
{
382
 
  int32_t error;
383
 
 
384
 
  if (!skip_lock)
385
 
    pthread_mutex_lock(term_lock);
386
 
 
387
 
  safe_mutex_assert_owner(term_lock);
388
 
 
389
 
  if (!*slave_running)
390
 
  {
391
 
    if (!skip_lock)
392
 
      pthread_mutex_unlock(term_lock);
393
 
    return(ER_SLAVE_NOT_RUNNING);
394
 
  }
395
 
  assert(session != 0);
396
 
  Session_CHECK_SENTRY(session);
397
 
 
398
 
  /*
399
 
    Is is critical to test if the slave is running. Otherwise, we might
400
 
    be referening freed memory trying to kick it
401
 
  */
402
 
 
403
 
  while (*slave_running)                        // Should always be true
404
 
  {
405
 
    pthread_mutex_lock(&session->LOCK_delete);
406
 
#ifndef DONT_USE_THR_ALARM
407
 
    /*
408
 
      Error codes from pthread_kill are:
409
 
      EINVAL: invalid signal number (can't happen)
410
 
      ESRCH: thread already killed (can happen, should be ignored)
411
 
    */
412
 
    int32_t err= pthread_kill(session->real_id, thr_client_alarm);
413
 
    assert(err != EINVAL);
414
 
#endif
415
 
    session->awake(Session::NOT_KILLED);
416
 
    pthread_mutex_unlock(&session->LOCK_delete);
417
 
 
418
 
    /*
419
 
      There is a small chance that slave thread might miss the first
420
 
      alarm. To protect againts it, resend the signal until it reacts
421
 
    */
422
 
    struct timespec abstime;
423
 
    set_timespec(abstime,2);
424
 
    error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
425
 
    assert(error == ETIMEDOUT || error == 0);
426
 
  }
427
 
 
428
 
  assert(*slave_running == 0);
429
 
 
430
 
  if (!skip_lock)
431
 
    pthread_mutex_unlock(term_lock);
432
 
  return(0);
433
 
}
434
 
 
435
 
 
436
 
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
437
 
                           pthread_mutex_t *cond_lock,
438
 
                           pthread_cond_t *start_cond,
439
 
                           volatile uint32_t *slave_running,
440
 
                           volatile uint32_t *slave_run_id,
441
 
                           Master_info* mi,
442
 
                           bool high_priority)
443
 
{
444
 
  pthread_t th;
445
 
  uint32_t start_id;
446
 
 
447
 
  assert(mi->inited);
448
 
 
449
 
  if (start_lock)
450
 
    pthread_mutex_lock(start_lock);
451
 
  if (!server_id)
452
 
  {
453
 
    if (start_cond)
454
 
      pthread_cond_broadcast(start_cond);
455
 
    if (start_lock)
456
 
      pthread_mutex_unlock(start_lock);
457
 
    sql_print_error(_("Server id not set, will not start slave"));
458
 
    return(ER_BAD_SLAVE);
459
 
  }
460
 
 
461
 
  if (*slave_running)
462
 
  {
463
 
    if (start_cond)
464
 
      pthread_cond_broadcast(start_cond);
465
 
    if (start_lock)
466
 
      pthread_mutex_unlock(start_lock);
467
 
    return(ER_SLAVE_MUST_STOP);
468
 
  }
469
 
  start_id= *slave_run_id;
470
 
  if (high_priority)
471
 
  {
472
 
    struct sched_param tmp_sched_param;
473
 
 
474
 
    memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
475
 
    tmp_sched_param.sched_priority= CONNECT_PRIOR;
476
 
    (void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
477
 
  }
478
 
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
479
 
  {
480
 
    if (start_lock)
481
 
      pthread_mutex_unlock(start_lock);
482
 
    return(ER_SLAVE_THREAD);
483
 
  }
484
 
  if (start_cond && cond_lock) // caller has cond_lock
485
 
  {
486
 
    Session* session = current_session;
487
 
    while (start_id == *slave_run_id)
488
 
    {
489
 
      const char* old_msg = session->enter_cond(start_cond,cond_lock,
490
 
                                            "Waiting for slave thread to start");
491
 
      pthread_cond_wait(start_cond,cond_lock);
492
 
      session->exit_cond(old_msg);
493
 
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
494
 
      if (session->killed)
495
 
        return(session->killed_errno());
496
 
    }
497
 
  }
498
 
  if (start_lock)
499
 
    pthread_mutex_unlock(start_lock);
500
 
  return(0);
501
 
}
502
 
 
503
 
 
504
 
/*
505
 
  start_slave_threads()
506
 
 
507
 
  NOTES
508
 
    SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
509
 
    sense to do that for starting a slave--we always care if it actually
510
 
    started the threads that were not previously running
511
 
*/
512
 
 
513
 
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
514
 
                        Master_info* mi, const char*, const char*,
515
 
                        int32_t thread_mask)
516
 
{
517
 
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
518
 
  pthread_cond_t* cond_io=0,*cond_sql=0;
519
 
  int32_t error=0;
520
 
 
521
 
  if (need_slave_mutex)
522
 
  {
523
 
    lock_io = &mi->run_lock;
524
 
    lock_sql = &mi->rli.run_lock;
525
 
  }
526
 
  if (wait_for_start)
527
 
  {
528
 
    cond_io = &mi->start_cond;
529
 
    cond_sql = &mi->rli.start_cond;
530
 
    lock_cond_io = &mi->run_lock;
531
 
    lock_cond_sql = &mi->rli.run_lock;
532
 
  }
533
 
 
534
 
  if (thread_mask & SLAVE_IO)
535
 
    error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
536
 
                              cond_io,
537
 
                              &mi->slave_running, &mi->slave_run_id,
538
 
                              mi, 1); //high priority, to read the most possible
539
 
  if (!error && (thread_mask & SLAVE_SQL))
540
 
  {
541
 
    error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
542
 
                              cond_sql,
543
 
                              &mi->rli.slave_running, &mi->rli.slave_run_id,
544
 
                              mi, 0);
545
 
    if (error)
546
 
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
547
 
  }
548
 
  return(error);
549
 
}
550
 
 
551
 
 
552
 
#ifdef NOT_USED_YET
553
 
static int32_t end_slave_on_walk(Master_info* mi, unsigned char* /*unused*/)
554
 
{
555
 
  end_master_info(mi);
556
 
  return(0);
557
 
}
558
 
#endif
559
 
 
560
 
 
561
 
/*
562
 
  Free all resources used by slave
563
 
 
564
 
  SYNOPSIS
565
 
    end_slave()
566
 
*/
567
 
 
568
 
void end_slave()
569
 
{
570
 
  /*
571
 
    This is called when the server terminates, in close_connections().
572
 
    It terminates slave threads. However, some CHANGE MASTER etc may still be
573
 
    running presently. If a START SLAVE was in progress, the mutex lock below
574
 
    will make us wait until slave threads have started, and START SLAVE
575
 
    returns, then we terminate them here.
576
 
  */
577
 
  pthread_mutex_lock(&LOCK_active_mi);
578
 
  if (active_mi)
579
 
  {
580
 
    /*
581
 
      TODO: replace the line below with
582
 
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
583
 
      once multi-master code is ready.
584
 
    */
585
 
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
586
 
    active_mi->end_master_info();
587
 
    delete active_mi;
588
 
    active_mi= 0;
589
 
  }
590
 
  pthread_mutex_unlock(&LOCK_active_mi);
591
 
  return;
592
 
}
593
 
 
594
 
 
595
 
static bool io_slave_killed(Session* session, Master_info* mi)
596
 
{
597
 
  assert(mi->io_session == session);
598
 
  assert(mi->slave_running); // tracking buffer overrun
599
 
  return(mi->abort_slave || abort_loop || session->killed);
600
 
}
601
 
 
602
 
 
603
 
static bool sql_slave_killed(Session* session, Relay_log_info* rli)
604
 
{
605
 
  assert(rli->sql_session == session);
606
 
  assert(rli->slave_running == 1);// tracking buffer overrun
607
 
  if (abort_loop || session->killed || rli->abort_slave)
608
 
  {
609
 
    /*
610
 
      If we are in an unsafe situation (stopping could corrupt replication),
611
 
      we give one minute to the slave SQL thread of grace before really
612
 
      terminating, in the hope that it will be able to read more events and
613
 
      the unsafe situation will soon be left. Note that this one minute starts
614
 
      from the last time anything happened in the slave SQL thread. So it's
615
 
      really one minute of idleness, we don't timeout if the slave SQL thread
616
 
      is actively working.
617
 
    */
618
 
    if (rli->last_event_start_time == 0)
619
 
      return(1);
620
 
    if (difftime(time(0), rli->last_event_start_time) > 60)
621
 
    {
622
 
      rli->report(ERROR_LEVEL, 0,
623
 
                  _("SQL thread had to stop in an unsafe situation, in "
624
 
                  "the middle of applying updates to a "
625
 
                  "non-transactional table without any primary key. "
626
 
                  "There is a risk of duplicate updates when the slave "
627
 
                  "SQL thread is restarted. Please check your tables' "
628
 
                  "contents after restart."));
629
 
      return(1);
630
 
    }
631
 
  }
632
 
  return(0);
633
 
}
634
 
 
635
 
 
636
 
/*
637
 
  skip_load_data_infile()
638
 
 
639
 
  NOTES
640
 
    This is used to tell a 3.23 master to break send_file()
641
 
*/
642
 
 
643
 
void skip_load_data_infile(NET *net)
644
 
{
645
 
  (void)net_request_file(net, "/dev/null");
646
 
  (void)my_net_read(net);                               // discard response
647
 
  (void)net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0); // ok
648
 
  return;
649
 
}
650
 
 
651
 
 
652
 
bool net_request_file(NET* net, const char* fname)
653
 
{
654
 
  return(net_write_command(net, 251, (unsigned char*) fname, strlen(fname),
655
 
                                (unsigned char*) "", 0));
656
 
}
657
 
 
658
 
/*
659
 
  From other comments and tests in code, it looks like
660
 
  sometimes Query_log_event and Load_log_event can have db == 0
661
 
  (see rewrite_db() above for example)
662
 
  (cases where this happens are unclear; it may be when the master is 3.23).
663
 
*/
664
 
 
665
 
const char *print_slave_db_safe(const char* db)
666
 
{
667
 
  return((db ? db : ""));
668
 
}
669
 
 
670
 
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
671
 
                                 const char *default_val)
672
 
{
673
 
  uint32_t length;
674
 
 
675
 
  if ((length=my_b_gets(f,var, max_size)))
676
 
  {
677
 
    char* last_p = var + length -1;
678
 
    if (*last_p == '\n')
679
 
      *last_p = 0; // if we stopped on newline, kill it
680
 
    else
681
 
    {
682
 
      /*
683
 
        If we truncated a line or stopped on last char, remove all chars
684
 
        up to and including newline.
685
 
      */
686
 
      int32_t c;
687
 
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
688
 
    }
689
 
    return(0);
690
 
  }
691
 
  else if (default_val)
692
 
  {
693
 
    strmake(var,  default_val, max_size-1);
694
 
    return(0);
695
 
  }
696
 
  return(1);
697
 
}
698
 
 
699
 
 
700
 
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
701
 
{
702
 
  char buf[32];
703
 
 
704
 
 
705
 
  if (my_b_gets(f, buf, sizeof(buf)))
706
 
  {
707
 
    *var = atoi(buf);
708
 
    return(0);
709
 
  }
710
 
  else if (default_val)
711
 
  {
712
 
    *var = default_val;
713
 
    return(0);
714
 
  }
715
 
  return(1);
716
 
}
717
 
 
718
 
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
719
 
{
720
 
  char buf[16];
721
 
 
722
 
 
723
 
  if (my_b_gets(f, buf, sizeof(buf)))
724
 
  {
725
 
    if (sscanf(buf, "%f", var) != 1)
726
 
      return(1);
727
 
    else
728
 
      return(0);
729
 
  }
730
 
  else if (default_val != 0.0)
731
 
  {
732
 
    *var = default_val;
733
 
    return(0);
734
 
  }
735
 
  return(1);
736
 
}
737
 
 
738
 
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info)
739
 
{
740
 
  if (io_slave_killed(session, mi))
741
 
  {
742
 
    if (info && global_system_variables.log_warnings)
743
 
      sql_print_information("%s",info);
744
 
    return true;
745
 
  }
746
 
  return false;
747
 
}
748
 
 
749
 
 
750
 
/*
751
 
  Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
752
 
  relying on the binlog's version. This is not perfect: imagine an upgrade
753
 
  of the master without waiting that all slaves are in sync with the master;
754
 
  then a slave could be fooled about the binlog's format. This is what happens
755
 
  when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
756
 
  slaves are fooled. So we do this only to distinguish between 3.23 and more
757
 
  recent masters (it's too late to change things for 3.23).
758
 
 
759
 
  RETURNS
760
 
  0       ok
761
 
  1       error
762
 
*/
763
 
 
764
 
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
765
 
{
766
 
  char error_buf[512];
767
 
  String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
768
 
  char err_buff[MAX_SLAVE_ERRMSG];
769
 
  const char* errmsg= 0;
770
 
  int32_t err_code= 0;
771
 
  DRIZZLE_RES *master_res= 0;
772
 
  DRIZZLE_ROW master_row;
773
 
 
774
 
  err_msg.length(0);
775
 
  /*
776
 
    Free old description_event_for_queue (that is needed if we are in
777
 
    a reconnection).
778
 
  */
779
 
  delete mi->rli.relay_log.description_event_for_queue;
780
 
  mi->rli.relay_log.description_event_for_queue= 0;
781
 
 
782
 
  if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
783
 
  {
784
 
    errmsg = _("Master reported unrecognized DRIZZLE version");
785
 
    err_code= ER_SLAVE_FATAL_ERROR;
786
 
    sprintf(err_buff, ER(err_code), errmsg);
787
 
    err_msg.append(err_buff);
788
 
  }
789
 
  else
790
 
  {
791
 
    /*
792
 
      Note the following switch will bug when we have DRIZZLE branch 30 ;)
793
 
    */
794
 
    switch (*drizzle->server_version)
795
 
    {
796
 
    case '0':
797
 
    case '1':
798
 
    case '2':
799
 
      errmsg = _("Master reported unrecognized DRIZZLE version");
800
 
      err_code= ER_SLAVE_FATAL_ERROR;
801
 
      sprintf(err_buff, ER(err_code), errmsg);
802
 
      err_msg.append(err_buff);
803
 
      break;
804
 
    case '3':
805
 
      mi->rli.relay_log.description_event_for_queue= new
806
 
        Format_description_log_event(1, drizzle->server_version);
807
 
      break;
808
 
    case '4':
809
 
      mi->rli.relay_log.description_event_for_queue= new
810
 
        Format_description_log_event(3, drizzle->server_version);
811
 
      break;
812
 
    default:
813
 
      /*
814
 
        Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
815
 
        take the early steps (like tests for "is this a 3.23 master") which we
816
 
        have to take before we receive the real master's Format_desc which will
817
 
        override this one. Note that the Format_desc we create below is garbage
818
 
        (it has the format of the *slave*); it's only good to help know if the
819
 
        master is 3.23, 4.0, etc.
820
 
      */
821
 
      mi->rli.relay_log.description_event_for_queue= new
822
 
        Format_description_log_event(4, drizzle->server_version);
823
 
      break;
824
 
    }
825
 
  }
826
 
 
827
 
  /*
828
 
     This does not mean that a 5.0 slave will be able to read a 6.0 master; but
829
 
     as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
830
 
     can't read a 6.0 master, this will show up when the slave can't read some
831
 
     events sent by the master, and there will be error messages.
832
 
  */
833
 
 
834
 
  if (err_msg.length() != 0)
835
 
    goto err;
836
 
 
837
 
  /* as we are here, we tried to allocate the event */
838
 
  if (!mi->rli.relay_log.description_event_for_queue)
839
 
  {
840
 
    errmsg= _("default Format_description_log_event");
841
 
    err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
842
 
    sprintf(err_buff, ER(err_code), errmsg);
843
 
    err_msg.append(err_buff);
844
 
    goto err;
845
 
  }
846
 
 
847
 
  /*
848
 
    Compare the master and slave's clock. Do not die if master's clock is
849
 
    unavailable (very old master not supporting UNIX_TIMESTAMP()?).
850
 
  */
851
 
 
852
 
  if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
853
 
      (master_res= drizzle_store_result(drizzle)) &&
854
 
      (master_row= drizzle_fetch_row(master_res)))
855
 
  {
856
 
    mi->clock_diff_with_master=
857
 
      (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
858
 
  }
859
 
  else if (!check_io_slave_killed(mi->io_session, mi, NULL))
860
 
  {
861
 
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
862
 
    sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
863
 
                        "do not trust column Seconds_Behind_Master of SHOW "
864
 
                        "SLAVE STATUS. Error: %s (%d)"),
865
 
                      drizzle_error(drizzle), drizzle_errno(drizzle));
866
 
  }
867
 
  if (master_res)
868
 
    drizzle_free_result(master_res);
869
 
 
870
 
  /*
871
 
    Check that the master's server id and ours are different. Because if they
872
 
    are equal (which can result from a simple copy of master's datadir to slave,
873
 
    thus copying some drizzle.cnf), replication will work but all events will be
874
 
    skipped.
875
 
    Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
876
 
    master?).
877
 
    Note: we could have put a @@SERVER_ID in the previous SELECT
878
 
    UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
879
 
  */
880
 
  if (!drizzle_real_query(drizzle,
881
 
                        STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
882
 
      (master_res= drizzle_store_result(drizzle)))
883
 
  {
884
 
    if ((master_row= drizzle_fetch_row(master_res)) &&
885
 
        (::server_id == strtoul(master_row[1], 0, 10)) &&
886
 
        !mi->rli.replicate_same_server_id)
887
 
    {
888
 
      errmsg=
889
 
        _("The slave I/O thread stops because master and slave have equal "
890
 
          "DRIZZLE server ids; these ids must be different "
891
 
          "for replication to work (or "
892
 
          "the --replicate-same-server-id option must be used "
893
 
          "on slave but this does"
894
 
          "not always make sense; please check the manual before using it).");
895
 
      err_code= ER_SLAVE_FATAL_ERROR;
896
 
      sprintf(err_buff, ER(err_code), errmsg);
897
 
      err_msg.append(err_buff);
898
 
    }
899
 
    drizzle_free_result(master_res);
900
 
    if (errmsg)
901
 
      goto err;
902
 
  }
903
 
 
904
 
  /*
905
 
    Check that the master's global character_set_server and ours are the same.
906
 
    Not fatal if query fails (old master?).
907
 
    Note that we don't check for equality of global character_set_client and
908
 
    collation_connection (neither do we prevent their setting in
909
 
    set_var.cc). That's because from what I (Guilhem) have tested, the global
910
 
    values of these 2 are never used (new connections don't use them).
911
 
    We don't test equality of global collation_database either as it's is
912
 
    going to be deprecated (made read-only) in 4.1 very soon.
913
 
    The test is only relevant if master < 5.0.3 (we'll test only if it's older
914
 
    than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
915
 
    charset info in each binlog event.
916
 
    We don't do it for 3.23 because masters <3.23.50 hang on
917
 
    SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
918
 
    test only if master is 4.x.
919
 
  */
920
 
 
921
 
  /* redundant with rest of code but safer against later additions */
922
 
  if (*drizzle->server_version == '3')
923
 
    goto err;
924
 
 
925
 
  if ((*drizzle->server_version == '4') &&
926
 
      !drizzle_real_query(drizzle,
927
 
                        STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
928
 
      (master_res= drizzle_store_result(drizzle)))
929
 
  {
930
 
    if ((master_row= drizzle_fetch_row(master_res)) &&
931
 
        strcmp(master_row[0], global_system_variables.collation_server->name))
932
 
    {
933
 
      errmsg=
934
 
        _("The slave I/O thread stops because master and slave have"
935
 
          " different values for the COLLATION_SERVER global variable."
936
 
          " The values must be equal for replication to work");
937
 
      err_code= ER_SLAVE_FATAL_ERROR;
938
 
      sprintf(err_buff, ER(err_code), errmsg);
939
 
      err_msg.append(err_buff);
940
 
    }
941
 
    drizzle_free_result(master_res);
942
 
    if (errmsg)
943
 
      goto err;
944
 
  }
945
 
 
946
 
  /*
947
 
    Perform analogous check for time zone. Theoretically we also should
948
 
    perform check here to verify that SYSTEM time zones are the same on
949
 
    slave and master, but we can't rely on value of @@system_time_zone
950
 
    variable (it is time zone abbreviation) since it determined at start
951
 
    time and so could differ for slave and master even if they are really
952
 
    in the same system time zone. So we are omiting this check and just
953
 
    relying on documentation. Also according to Monty there are many users
954
 
    who are using replication between servers in various time zones. Hence
955
 
    such check will broke everything for them. (And now everything will
956
 
    work for them because by default both their master and slave will have
957
 
    'SYSTEM' time zone).
958
 
    This check is only necessary for 4.x masters (and < 5.0.4 masters but
959
 
    those were alpha).
960
 
  */
961
 
  if ((*drizzle->server_version == '4') &&
962
 
      !drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
963
 
      (master_res= drizzle_store_result(drizzle)))
964
 
  {
965
 
    if ((master_row= drizzle_fetch_row(master_res)) &&
966
 
        strcmp(master_row[0],
967
 
               global_system_variables.time_zone->get_name()->ptr()))
968
 
    {
969
 
      errmsg=
970
 
        _("The slave I/O thread stops because master and slave have"
971
 
          " different values for the TIME_ZONE global variable."
972
 
          " The values must be equal for replication to work");
973
 
      err_code= ER_SLAVE_FATAL_ERROR;
974
 
      sprintf(err_buff, ER(err_code), errmsg);
975
 
      err_msg.append(err_buff);
976
 
    }
977
 
    drizzle_free_result(master_res);
978
 
 
979
 
    if (errmsg)
980
 
      goto err;
981
 
  }
982
 
 
983
 
  if (mi->heartbeat_period != 0.0)
984
 
  {
985
 
    char llbuf[22];
986
 
    const char query_format[]= "SET @master_heartbeat_period= %s";
987
 
    char query[sizeof(query_format) - 2 + sizeof(llbuf)];
988
 
    /* 
989
 
       the period is an uint64_t of nano-secs. 
990
 
    */
991
 
    llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
992
 
    sprintf(query, query_format, llbuf);
993
 
 
994
 
    if (drizzle_real_query(drizzle, query, strlen(query))
995
 
        && !check_io_slave_killed(mi->io_session, mi, NULL))
996
 
    {
997
 
      err_msg.append("The slave I/O thread stops because querying master with '");
998
 
      err_msg.append(query);
999
 
      err_msg.append("' failed;");
1000
 
      err_msg.append(" error: ");
1001
 
      err_code= drizzle_errno(drizzle);
1002
 
      err_msg.qs_append(err_code);
1003
 
      err_msg.append("  '");
1004
 
      err_msg.append(drizzle_error(drizzle));
1005
 
      err_msg.append("'");
1006
 
      drizzle_free_result(drizzle_store_result(drizzle));
1007
 
      goto err;
1008
 
    }
1009
 
    drizzle_free_result(drizzle_store_result(drizzle));
1010
 
  }
1011
 
  
1012
 
err:
1013
 
  if (err_msg.length() != 0)
1014
 
  {
1015
 
    sql_print_error("%s",err_msg.ptr());
1016
 
    assert(err_code != 0);
1017
 
    mi->report(ERROR_LEVEL, err_code, "%s",err_msg.ptr());
1018
 
    return(1);
1019
 
  }
1020
 
 
1021
 
  return(0);
1022
 
}
1023
 
 
1024
 
 
1025
 
static bool wait_for_relay_log_space(Relay_log_info* rli)
1026
 
{
1027
 
  bool slave_killed=0;
1028
 
  Master_info* mi = rli->mi;
1029
 
  const char *save_proc_info;
1030
 
  Session* session = mi->io_session;
1031
 
 
1032
 
  pthread_mutex_lock(&rli->log_space_lock);
1033
 
  save_proc_info= session->enter_cond(&rli->log_space_cond,
1034
 
                                  &rli->log_space_lock,
1035
 
                                  _("Waiting for the slave SQL thread "
1036
 
                                    "to free enough relay log space"));
1037
 
  while (rli->log_space_limit < rli->log_space_total &&
1038
 
         !(slave_killed=io_slave_killed(session,mi)) &&
1039
 
         !rli->ignore_log_space_limit)
1040
 
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1041
 
  session->exit_cond(save_proc_info);
1042
 
  return(slave_killed);
1043
 
}
1044
 
 
1045
 
 
1046
 
/*
1047
 
  Builds a Rotate from the ignored events' info and writes it to relay log.
1048
 
 
1049
 
  SYNOPSIS
1050
 
  write_ignored_events_info_to_relay_log()
1051
 
    session             pointer to I/O thread's session
1052
 
    mi
1053
 
 
1054
 
  DESCRIPTION
1055
 
    Slave I/O thread, going to die, must leave a durable trace of the
1056
 
    ignored events' end position for the use of the slave SQL thread, by
1057
 
    calling this function. Only that thread can call it (see assertion).
1058
 
 */
1059
 
static void write_ignored_events_info_to_relay_log(Session *session,
1060
 
                                                   Master_info *mi)
1061
 
{
1062
 
  Relay_log_info *rli= &mi->rli;
1063
 
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1064
 
 
1065
 
  assert(session == mi->io_session);
1066
 
  pthread_mutex_lock(log_lock);
1067
 
  if (rli->ign_master_log_name_end[0])
1068
 
  {
1069
 
    Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1070
 
                                               0, rli->ign_master_log_pos_end,
1071
 
                                               Rotate_log_event::DUP_NAME);
1072
 
    rli->ign_master_log_name_end[0]= 0;
1073
 
    /* can unlock before writing as slave SQL session will soon see our Rotate */
1074
 
    pthread_mutex_unlock(log_lock);
1075
 
    if (likely((bool)ev))
1076
 
    {
1077
 
      ev->server_id= 0; // don't be ignored by slave SQL thread
1078
 
      if (unlikely(rli->relay_log.append(ev)))
1079
 
        mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
1080
 
                   ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
1081
 
                   _("failed to write a Rotate event"
1082
 
                     " to the relay log, SHOW SLAVE STATUS may be"
1083
 
                     " inaccurate"));
1084
 
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1085
 
      if (mi->flush())
1086
 
        sql_print_error(_("Failed to flush master info file"));
1087
 
      delete ev;
1088
 
    }
1089
 
    else
1090
 
      mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
1091
 
                 ER(ER_SLAVE_CREATE_EVENT_FAILURE),
1092
 
                 _("Rotate_event (out of memory?),"
1093
 
                   " SHOW SLAVE STATUS may be inaccurate"));
1094
 
  }
1095
 
  else
1096
 
    pthread_mutex_unlock(log_lock);
1097
 
  return;
1098
 
}
1099
 
 
1100
 
 
1101
 
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
1102
 
                             bool *suppress_warnings)
1103
 
{
1104
 
  unsigned char buf[1024], *pos= buf;
1105
 
  uint32_t report_host_len, report_user_len=0, report_password_len=0;
1106
 
 
1107
 
  *suppress_warnings= false;
1108
 
  if (!report_host)
1109
 
    return(0);
1110
 
  report_host_len= strlen(report_host);
1111
 
  /* 30 is a good safety margin */
1112
 
  if (report_host_len + report_user_len + report_password_len + 30 >
1113
 
      sizeof(buf))
1114
 
    return(0);                                     // safety
1115
 
 
1116
 
  int4store(pos, server_id); pos+= 4;
1117
 
  pos= net_store_data(pos, (unsigned char*) report_host, report_host_len);
1118
 
  pos= net_store_data(pos, NULL, report_user_len);
1119
 
  pos= net_store_data(pos, NULL, report_password_len);
1120
 
  int2store(pos, (uint16_t) report_port); pos+= 2;
1121
 
  int4store(pos, 0);    pos+= 4;
1122
 
  /* The master will fill in master_id */
1123
 
  int4store(pos, 0);                    pos+= 4;
1124
 
 
1125
 
  if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
1126
 
  {
1127
 
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1128
 
    {
1129
 
      *suppress_warnings= true;                 // Suppress reconnect warning
1130
 
    }
1131
 
    else if (!check_io_slave_killed(mi->io_session, mi, NULL))
1132
 
    {
1133
 
      char buf[256];
1134
 
      snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle), 
1135
 
               drizzle_errno(drizzle));
1136
 
      mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1137
 
                 ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1138
 
    }
1139
 
    return(1);
1140
 
  }
1141
 
  return(0);
1142
 
}
1143
 
 
1144
 
 
1145
 
bool show_master_info(Session* session, Master_info* mi)
1146
 
{
1147
 
  // TODO: fix this for multi-master
1148
 
  List<Item> field_list;
1149
 
  Protocol *protocol= session->protocol;
1150
 
 
1151
 
  field_list.push_back(new Item_empty_string("Slave_IO_State",
1152
 
                                                     14));
1153
 
  field_list.push_back(new Item_empty_string("Master_Host",
1154
 
                                                     sizeof(mi->host)));
1155
 
  field_list.push_back(new Item_empty_string("Master_User",
1156
 
                                                     sizeof(mi->user)));
1157
 
  field_list.push_back(new Item_return_int("Master_Port", 7,
1158
 
                                           DRIZZLE_TYPE_LONG));
1159
 
  field_list.push_back(new Item_return_int("Connect_Retry", 10,
1160
 
                                           DRIZZLE_TYPE_LONG));
1161
 
  field_list.push_back(new Item_empty_string("Master_Log_File",
1162
 
                                             FN_REFLEN));
1163
 
  field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
1164
 
                                           DRIZZLE_TYPE_LONGLONG));
1165
 
  field_list.push_back(new Item_empty_string("Relay_Log_File",
1166
 
                                             FN_REFLEN));
1167
 
  field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
1168
 
                                           DRIZZLE_TYPE_LONGLONG));
1169
 
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
1170
 
                                             FN_REFLEN));
1171
 
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
1172
 
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1173
 
  field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
1174
 
  field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
1175
 
  field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
1176
 
  field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
1177
 
  field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
1178
 
  field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
1179
 
                                             28));
1180
 
  field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
1181
 
  field_list.push_back(new Item_empty_string("Last_Error", 20));
1182
 
  field_list.push_back(new Item_return_int("Skip_Counter", 10,
1183
 
                                           DRIZZLE_TYPE_LONG));
1184
 
  field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
1185
 
                                           DRIZZLE_TYPE_LONGLONG));
1186
 
  field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
1187
 
                                           DRIZZLE_TYPE_LONGLONG));
1188
 
  field_list.push_back(new Item_empty_string("Until_Condition", 6));
1189
 
  field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1190
 
  field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1191
 
                                           DRIZZLE_TYPE_LONGLONG));
1192
 
  field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1193
 
                                           DRIZZLE_TYPE_LONGLONG));
1194
 
  field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
1195
 
  field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1196
 
  field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
1197
 
  field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
1198
 
 
1199
 
  if (protocol->send_fields(&field_list,
1200
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1201
 
    return(true);
1202
 
 
1203
 
  if (mi->host[0])
1204
 
  {
1205
 
    String *packet= &session->packet;
1206
 
    protocol->prepare_for_resend();
1207
 
 
1208
 
    /*
1209
 
      slave_running can be accessed without run_lock but not other
1210
 
      non-volotile members like mi->io_session, which is guarded by the mutex.
1211
 
    */
1212
 
    pthread_mutex_lock(&mi->run_lock);
1213
 
    protocol->store(mi->io_session ? mi->io_session->get_proc_info() : "", &my_charset_bin);
1214
 
    pthread_mutex_unlock(&mi->run_lock);
1215
 
 
1216
 
    pthread_mutex_lock(&mi->data_lock);
1217
 
    pthread_mutex_lock(&mi->rli.data_lock);
1218
 
    protocol->store(mi->getHostname(), &my_charset_bin);
1219
 
    protocol->store(mi->getUsername(), &my_charset_bin);
1220
 
    protocol->store((uint32_t) mi->getPort());
1221
 
    protocol->store(mi->getConnectionRetry());
1222
 
    protocol->store(mi->getLogName(), &my_charset_bin);
1223
 
    protocol->store((uint64_t) mi->getLogPosition());
1224
 
    protocol->store(mi->rli.group_relay_log_name.c_str() +
1225
 
                    dirname_length(mi->rli.group_relay_log_name.c_str()),
1226
 
                    &my_charset_bin);
1227
 
    protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1228
 
    protocol->store(mi->rli.group_master_log_name.c_str(), &my_charset_bin);
1229
 
    protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1230
 
                    "Yes" : "No", &my_charset_bin);
1231
 
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1232
 
    protocol->store(rpl_filter->get_do_db());
1233
 
    protocol->store(rpl_filter->get_ignore_db());
1234
 
 
1235
 
    char buf[256];
1236
 
    String tmp(buf, sizeof(buf), &my_charset_bin);
1237
 
    rpl_filter->get_do_table(&tmp);
1238
 
    protocol->store(&tmp);
1239
 
    rpl_filter->get_ignore_table(&tmp);
1240
 
    protocol->store(&tmp);
1241
 
    rpl_filter->get_wild_do_table(&tmp);
1242
 
    protocol->store(&tmp);
1243
 
    rpl_filter->get_wild_ignore_table(&tmp);
1244
 
    protocol->store(&tmp);
1245
 
 
1246
 
    protocol->store(mi->rli.last_error().number);
1247
 
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
1248
 
    protocol->store((uint32_t) mi->rli.slave_skip_counter);
1249
 
    protocol->store((uint64_t) mi->rli.group_master_log_pos);
1250
 
    protocol->store((uint64_t) mi->rli.log_space_total);
1251
 
 
1252
 
    protocol->store(
1253
 
      mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
1254
 
        ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
1255
 
          "Relay"), &my_charset_bin);
1256
 
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
1257
 
    protocol->store((uint64_t) mi->rli.until_log_pos);
1258
 
 
1259
 
    /*
1260
 
      Seconds_Behind_Master: if SQL thread is running and I/O thread is
1261
 
      connected, we can compute it otherwise show NULL (i.e. unknown).
1262
 
    */
1263
 
    if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
1264
 
        mi->rli.slave_running)
1265
 
    {
1266
 
      long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
1267
 
                       - mi->clock_diff_with_master);
1268
 
      /*
1269
 
        Apparently on some systems time_diff can be <0. Here are possible
1270
 
        reasons related to MySQL:
1271
 
        - the master is itself a slave of another master whose time is ahead.
1272
 
        - somebody used an explicit SET TIMESTAMP on the master.
1273
 
        Possible reason related to granularity-to-second of time functions
1274
 
        (nothing to do with MySQL), which can explain a value of -1:
1275
 
        assume the master's and slave's time are perfectly synchronized, and
1276
 
        that at slave's connection time, when the master's timestamp is read,
1277
 
        it is at the very end of second 1, and (a very short time later) when
1278
 
        the slave's timestamp is read it is at the very beginning of second
1279
 
        2. Then the recorded value for master is 1 and the recorded value for
1280
 
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1281
 
        between timestamp of slave and rli->last_master_timestamp is 0
1282
 
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1283
 
        This confuses users, so we don't go below 0: hence the cmax().
1284
 
 
1285
 
        last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1286
 
        special marker to say "consider we have caught up".
1287
 
      */
1288
 
      protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1289
 
                                 cmax((long)0, time_diff) : 0));
1290
 
    }
1291
 
    else
1292
 
    {
1293
 
      protocol->store_null();
1294
 
    }
1295
 
 
1296
 
    // Last_IO_Errno
1297
 
    protocol->store(mi->last_error().number);
1298
 
    // Last_IO_Error
1299
 
    protocol->store(mi->last_error().message, &my_charset_bin);
1300
 
    // Last_SQL_Errno
1301
 
    protocol->store(mi->rli.last_error().number);
1302
 
    // Last_SQL_Error
1303
 
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
1304
 
 
1305
 
    pthread_mutex_unlock(&mi->rli.data_lock);
1306
 
    pthread_mutex_unlock(&mi->data_lock);
1307
 
 
1308
 
    if (my_net_write(&session->net, (unsigned char*) session->packet.ptr(), packet->length()))
1309
 
      return(true);
1310
 
  }
1311
 
  my_eof(session);
1312
 
  return(false);
1313
 
}
1314
 
 
1315
 
 
1316
 
void set_slave_thread_options(Session* session)
1317
 
{
1318
 
  /*
1319
 
     It's nonsense to constrain the slave threads with max_join_size; if a
1320
 
     query succeeded on master, we HAVE to execute it. So set
1321
 
     OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
1322
 
     (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
1323
 
     SELECT examining more than 4 billion rows would still fail (yes, because
1324
 
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1325
 
     only for client threads.
1326
 
  */
1327
 
  uint64_t options= session->options | OPTION_BIG_SELECTS;
1328
 
  if (opt_log_slave_updates)
1329
 
    options|= OPTION_BIN_LOG;
1330
 
  else
1331
 
    options&= ~OPTION_BIN_LOG;
1332
 
  session->options= options;
1333
 
  session->variables.completion_type= 0;
1334
 
  return;
1335
 
}
1336
 
 
1337
 
/*
1338
 
  init_slave_thread()
1339
 
*/
1340
 
 
1341
 
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type)
1342
 
{
1343
 
  int32_t simulate_error= 0;
1344
 
  session->system_thread = (session_type == SLAVE_Session_SQL) ?
1345
 
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1346
 
  session->security_ctx->skip_grants();
1347
 
  my_net_init(&session->net, 0);
1348
 
/*
1349
 
  Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1350
 
  slave threads, since a replication event can become this much larger
1351
 
  than the corresponding packet (query) sent from client to master.
1352
 
*/
1353
 
  session->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1354
 
    + MAX_LOG_EVENT_HEADER;  /* note, incr over the global not session var */
1355
 
  session->slave_thread = 1;
1356
 
  set_slave_thread_options(session);
1357
 
  session->client_capabilities = CLIENT_LOCAL_FILES;
1358
 
  pthread_mutex_lock(&LOCK_thread_count);
1359
 
  session->thread_id= session->variables.pseudo_thread_id= thread_id++;
1360
 
  pthread_mutex_unlock(&LOCK_thread_count);
1361
 
 
1362
 
 simulate_error|= (1 << SLAVE_Session_IO);
1363
 
 simulate_error|= (1 << SLAVE_Session_SQL);
1364
 
  if (init_thr_lock() || session->store_globals() || simulate_error & (1<< session_type))
1365
 
  {
1366
 
    session->cleanup();
1367
 
    return(-1);
1368
 
  }
1369
 
  lex_start(session);
1370
 
 
1371
 
  if (session_type == SLAVE_Session_SQL)
1372
 
    session->set_proc_info("Waiting for the next event in relay log");
1373
 
  else
1374
 
    session->set_proc_info("Waiting for master update");
1375
 
  session->version=refresh_version;
1376
 
  session->set_time();
1377
 
  return(0);
1378
 
}
1379
 
 
1380
 
 
1381
 
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1382
 
                      void* thread_killed_arg)
1383
 
{
1384
 
  int32_t nap_time;
1385
 
  thr_alarm_t alarmed;
1386
 
 
1387
 
  thr_alarm_init(&alarmed);
1388
 
  time_t start_time= my_time(0);
1389
 
  time_t end_time= start_time+sec;
1390
 
 
1391
 
  while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1392
 
  {
1393
 
    ALARM alarm_buff;
1394
 
    /*
1395
 
      The only reason we are asking for alarm is so that
1396
 
      we will be woken up in case of murder, so if we do not get killed,
1397
 
      set the alarm so it goes off after we wake up naturally
1398
 
    */
1399
 
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
1400
 
    sleep(nap_time);
1401
 
    thr_end_alarm(&alarmed);
1402
 
 
1403
 
    if ((*thread_killed)(session,thread_killed_arg))
1404
 
      return(1);
1405
 
    start_time= my_time(0);
1406
 
  }
1407
 
  return(0);
1408
 
}
1409
 
 
1410
 
 
1411
 
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
1412
 
                        bool *suppress_warnings)
1413
 
{
1414
 
  unsigned char buf[FN_REFLEN + 10];
1415
 
  int32_t len;
1416
 
  int32_t binlog_flags = 0; // for now
1417
 
  const char* logname = mi->getLogName();
1418
 
  
1419
 
  *suppress_warnings= false;
1420
 
 
1421
 
  // TODO if big log files: Change next to int8store()
1422
 
  int4store(buf, (uint32_t) mi->getLogPosition());
1423
 
  int2store(buf + 4, binlog_flags);
1424
 
  int4store(buf + 6, server_id);
1425
 
  len = (uint32_t) strlen(logname);
1426
 
  memcpy(buf + 10, logname,len);
1427
 
  if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
1428
 
  {
1429
 
    /*
1430
 
      Something went wrong, so we will just reconnect and retry later
1431
 
      in the future, we should do a better error analysis, but for
1432
 
      now we just fill up the error log :-)
1433
 
    */
1434
 
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1435
 
      *suppress_warnings= true;                 // Suppress reconnect warning
1436
 
    else
1437
 
      sql_print_error(_("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs"),
1438
 
                      drizzle_errno(drizzle), drizzle_error(drizzle),
1439
 
                      mi->connect_retry);
1440
 
    return(1);
1441
 
  }
1442
 
 
1443
 
  return(0);
1444
 
}
1445
 
 
1446
 
/*
1447
 
  Read one event from the master
1448
 
 
1449
 
  SYNOPSIS
1450
 
    read_event()
1451
 
    DRIZZLE               DRIZZLE connection
1452
 
    mi                  Master connection information
1453
 
    suppress_warnings   TRUE when a normal net read timeout has caused us to
1454
 
                        try a reconnect.  We do not want to print anything to
1455
 
                        the error log in this case because this a anormal
1456
 
                        event in an idle server.
1457
 
 
1458
 
    RETURN VALUES
1459
 
    'packet_error'      Error
1460
 
    number              Length of packet
1461
 
*/
1462
 
 
1463
 
static uint32_t read_event(DRIZZLE *drizzle,
1464
 
                           Master_info *mi,
1465
 
                           bool* suppress_warnings)
1466
 
{
1467
 
  uint32_t len;
1468
 
 
1469
 
  *suppress_warnings= false;
1470
 
  /*
1471
 
    my_real_read() will time us out
1472
 
    We check if we were told to die, and if not, try reading again
1473
 
  */
1474
 
  if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
1475
 
    return(packet_error);
1476
 
 
1477
 
  len = cli_safe_read(drizzle);
1478
 
  if (len == packet_error || (int32_t) len < 1)
1479
 
  {
1480
 
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
1481
 
    {
1482
 
      /*
1483
 
        We are trying a normal reconnect after a read timeout;
1484
 
        we suppress prints to .err file as long as the reconnect
1485
 
        happens without problems
1486
 
      */
1487
 
      *suppress_warnings= true;
1488
 
    }
1489
 
    else
1490
 
      sql_print_error(_("Error reading packet from server: %s ( server_errno=%d)"),
1491
 
                      drizzle_error(drizzle), drizzle_errno(drizzle));
1492
 
    return(packet_error);
1493
 
  }
1494
 
 
1495
 
  /* Check if eof packet */
1496
 
  if (len < 8 && drizzle->net.read_pos[0] == 254)
1497
 
  {
1498
 
    sql_print_information(_("Slave: received end packet from server, apparent "
1499
 
                            "master shutdown: %s"),
1500
 
                     drizzle_error(drizzle));
1501
 
     return(packet_error);
1502
 
  }
1503
 
 
1504
 
  return(len - 1);
1505
 
}
1506
 
 
1507
 
 
1508
 
int32_t check_expected_error(Session*, Relay_log_info const *,
1509
 
                             int32_t expected_error)
1510
 
{
1511
 
  switch (expected_error) {
1512
 
  case ER_NET_READ_ERROR:
1513
 
  case ER_NET_ERROR_ON_WRITE:
1514
 
  case ER_QUERY_INTERRUPTED:
1515
 
  case ER_SERVER_SHUTDOWN:
1516
 
  case ER_NEW_ABORTING_CONNECTION:
1517
 
    return(1);
1518
 
  default:
1519
 
    return(0);
1520
 
  }
1521
 
}
1522
 
 
1523
 
 
1524
 
/*
1525
 
  Check if the current error is of temporary nature of not.
1526
 
  Some errors are temporary in nature, such as
1527
 
  ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT.  Ndb also signals
1528
 
  that the error is temporary by pushing a warning with the error code
1529
 
  ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1530
 
*/
1531
 
static int32_t has_temporary_error(Session *session)
1532
 
{
1533
 
  if (session->is_fatal_error)
1534
 
    return(0);
1535
 
 
1536
 
  if (session->main_da.is_error())
1537
 
  {
1538
 
    session->clear_error();
1539
 
    my_error(ER_LOCK_DEADLOCK, MYF(0));
1540
 
  }
1541
 
 
1542
 
  /*
1543
 
    If there is no message in Session, we can't say if it's a temporary
1544
 
    error or not. This is currently the case for Incident_log_event,
1545
 
    which sets no message. Return FALSE.
1546
 
  */
1547
 
  if (!session->is_error())
1548
 
    return(0);
1549
 
 
1550
 
  /*
1551
 
    Temporary error codes:
1552
 
    currently, InnoDB deadlock detected by InnoDB or lock
1553
 
    wait timeout (innodb_lock_wait_timeout exceeded
1554
 
  */
1555
 
  if (session->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
1556
 
      session->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
1557
 
    return(1);
1558
 
 
1559
 
  return(0);
1560
 
}
1561
 
 
1562
 
 
1563
 
/**
1564
 
  Applies the given event and advances the relay log position.
1565
 
 
1566
 
  In essence, this function does:
1567
 
 
1568
 
  @code
1569
 
    ev->apply_event(rli);
1570
 
    ev->update_pos(rli);
1571
 
  @endcode
1572
 
 
1573
 
  But it also does some maintainance, such as skipping events if
1574
 
  needed and reporting errors.
1575
 
 
1576
 
  If the @c skip flag is set, then it is tested whether the event
1577
 
  should be skipped, by looking at the slave_skip_counter and the
1578
 
  server id.  The skip flag should be set when calling this from a
1579
 
  replication thread but not set when executing an explicit BINLOG
1580
 
  statement.
1581
 
 
1582
 
  @retval 0 OK.
1583
 
 
1584
 
  @retval 1 Error calling ev->apply_event().
1585
 
 
1586
 
  @retval 2 No error calling ev->apply_event(), but error calling
1587
 
  ev->update_pos().
1588
 
*/
1589
 
int32_t apply_event_and_update_pos(Log_event* ev, Session* session, Relay_log_info* rli,
1590
 
                               bool skip)
1591
 
{
1592
 
  int32_t exec_res= 0;
1593
 
 
1594
 
  /*
1595
 
    Execute the event to change the database and update the binary
1596
 
    log coordinates, but first we set some data that is needed for
1597
 
    the thread.
1598
 
 
1599
 
    The event will be executed unless it is supposed to be skipped.
1600
 
 
1601
 
    Queries originating from this server must be skipped.  Low-level
1602
 
    events (Format_description_log_event, Rotate_log_event,
1603
 
    Stop_log_event) from this server must also be skipped. But for
1604
 
    those we don't want to modify 'group_master_log_pos', because
1605
 
    these events did not exist on the master.
1606
 
    Format_description_log_event is not completely skipped.
1607
 
 
1608
 
    Skip queries specified by the user in 'slave_skip_counter'.  We
1609
 
    can't however skip events that has something to do with the log
1610
 
    files themselves.
1611
 
 
1612
 
    Filtering on own server id is extremely important, to ignore
1613
 
    execution of events created by the creation/rotation of the relay
1614
 
    log (remember that now the relay log starts with its Format_desc,
1615
 
    has a Rotate etc).
1616
 
  */
1617
 
 
1618
 
  session->server_id = ev->server_id; // use the original server id for logging
1619
 
  session->set_time();                            // time the query
1620
 
  session->lex->current_select= 0;
1621
 
  if (!ev->when)
1622
 
    ev->when= my_time(0);
1623
 
  ev->session = session; // because up to this point, ev->session == 0
1624
 
 
1625
 
  if (skip)
1626
 
  {
1627
 
    int32_t reason= ev->shall_skip(rli);
1628
 
    if (reason == Log_event::EVENT_SKIP_COUNT)
1629
 
      --rli->slave_skip_counter;
1630
 
    pthread_mutex_unlock(&rli->data_lock);
1631
 
    if (reason == Log_event::EVENT_SKIP_NOT)
1632
 
      exec_res= ev->apply_event(rli);
1633
 
  }
1634
 
  else
1635
 
    exec_res= ev->apply_event(rli);
1636
 
 
1637
 
  if (exec_res == 0)
1638
 
  {
1639
 
    int32_t error= ev->update_pos(rli);
1640
 
    /*
1641
 
      The update should not fail, so print an error message and
1642
 
      return an error code.
1643
 
 
1644
 
      TODO: Replace this with a decent error message when merged
1645
 
      with BUG#24954 (which adds several new error message).
1646
 
    */
1647
 
    if (error)
1648
 
    {
1649
 
      char buf[22];
1650
 
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
1651
 
                  _("It was not possible to update the positions"
1652
 
                  " of the relay log information: the slave may"
1653
 
                  " be in an inconsistent state."
1654
 
                  " Stopped in %s position %s"),
1655
 
                  rli->group_relay_log_name.c_str(),
1656
 
                  llstr(rli->group_relay_log_pos, buf));
1657
 
      return(2);
1658
 
    }
1659
 
  }
1660
 
 
1661
 
  return(exec_res ? 1 : 0);
1662
 
}
1663
 
 
1664
 
 
1665
 
/**
1666
 
  Top-level function for executing the next event from the relay log.
1667
 
 
1668
 
  This function reads the event from the relay log, executes it, and
1669
 
  advances the relay log position.  It also handles errors, etc.
1670
 
 
1671
 
  This function may fail to apply the event for the following reasons:
1672
 
 
1673
 
   - The position specfied by the UNTIL condition of the START SLAVE
1674
 
     command is reached.
1675
 
 
1676
 
   - It was not possible to read the event from the log.
1677
 
 
1678
 
   - The slave is killed.
1679
 
 
1680
 
   - An error occurred when applying the event, and the event has been
1681
 
     tried slave_trans_retries times.  If the event has been retried
1682
 
     fewer times, 0 is returned.
1683
 
 
1684
 
   - init_master_info or init_relay_log_pos failed. (These are called
1685
 
     if a failure occurs when applying the event.)</li>
1686
 
 
1687
 
   - An error occurred when updating the binlog position.
1688
 
 
1689
 
  @retval 0 The event was applied.
1690
 
 
1691
 
  @retval 1 The event was not applied.
1692
 
*/
1693
 
static int32_t exec_relay_log_event(Session* session, Relay_log_info* rli)
1694
 
{
1695
 
  /*
1696
 
     We acquire this mutex since we need it for all operations except
1697
 
     event execution. But we will release it in places where we will
1698
 
     wait for something for example inside of next_event().
1699
 
   */
1700
 
  pthread_mutex_lock(&rli->data_lock);
1701
 
 
1702
 
  Log_event * ev = next_event(rli);
1703
 
 
1704
 
  assert(rli->sql_session==session);
1705
 
 
1706
 
  if (sql_slave_killed(session,rli))
1707
 
  {
1708
 
    pthread_mutex_unlock(&rli->data_lock);
1709
 
    delete ev;
1710
 
    return(1);
1711
 
  }
1712
 
  if (ev)
1713
 
  {
1714
 
    int32_t exec_res;
1715
 
 
1716
 
    /*
1717
 
      This tests if the position of the beginning of the current event
1718
 
      hits the UNTIL barrier.
1719
 
    */
1720
 
    if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
1721
 
        rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
1722
 
                                rli->group_master_log_pos :
1723
 
                                ev->log_pos - ev->data_written))
1724
 
    {
1725
 
      char buf[22];
1726
 
      sql_print_information(_("Slave SQL thread stopped because it reached its"
1727
 
                              " UNTIL position %s"),
1728
 
                            llstr(rli->until_pos(), buf));
1729
 
      /*
1730
 
        Setting abort_slave flag because we do not want additional message about
1731
 
        error in query execution to be printed.
1732
 
      */
1733
 
      rli->abort_slave= 1;
1734
 
      pthread_mutex_unlock(&rli->data_lock);
1735
 
      delete ev;
1736
 
      return(1);
1737
 
    }
1738
 
    exec_res= apply_event_and_update_pos(ev, session, rli, true);
1739
 
 
1740
 
    /*
1741
 
      Format_description_log_event should not be deleted because it will be
1742
 
      used to read info about the relay log's format; it will be deleted when
1743
 
      the SQL thread does not need it, i.e. when this thread terminates.
1744
 
    */
1745
 
    if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
1746
 
    {
1747
 
      delete ev;
1748
 
    }
1749
 
 
1750
 
    /*
1751
 
      update_log_pos failed: this should not happen, so we don't
1752
 
      retry.
1753
 
    */
1754
 
    if (exec_res == 2)
1755
 
      return(1);
1756
 
 
1757
 
    if (slave_trans_retries)
1758
 
    {
1759
 
      int32_t temp_err= 0;
1760
 
      if (exec_res && (temp_err= has_temporary_error(session)))
1761
 
      {
1762
 
        const char *errmsg;
1763
 
        /*
1764
 
          We were in a transaction which has been rolled back because of a
1765
 
          temporary error;
1766
 
          let's seek back to BEGIN log event and retry it all again.
1767
 
          Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
1768
 
          there is no rollback since 5.0.13 (ref: manual).
1769
 
          We have to not only seek but also
1770
 
          a) init_master_info(), to seek back to hot relay log's start for later
1771
 
          (for when we will come back to this hot log after re-processing the
1772
 
          possibly existing old logs where BEGIN is: check_binlog_magic() will
1773
 
          then need the cache to be at position 0 (see comments at beginning of
1774
 
          init_master_info()).
1775
 
          b) init_relay_log_pos(), because the BEGIN may be an older relay log.
1776
 
        */
1777
 
        if (rli->trans_retries < slave_trans_retries)
1778
 
        {
1779
 
          if (rli->mi->init_master_info(0, 0, SLAVE_SQL))
1780
 
            sql_print_error(_("Failed to initialize the master info structure"));
1781
 
          else if (init_relay_log_pos(rli,
1782
 
                                      rli->group_relay_log_name.c_str(),
1783
 
                                      rli->group_relay_log_pos,
1784
 
                                      1, &errmsg, 1))
1785
 
            sql_print_error(_("Error initializing relay log position: %s"),
1786
 
                            errmsg);
1787
 
          else
1788
 
          {
1789
 
            exec_res= 0;
1790
 
            end_trans(session, ROLLBACK);
1791
 
            /* chance for concurrent connection to get more locks */
1792
 
            safe_sleep(session, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1793
 
                       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1794
 
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1795
 
            rli->trans_retries++;
1796
 
            rli->retried_trans++;
1797
 
            pthread_mutex_unlock(&rli->data_lock);
1798
 
          }
1799
 
        }
1800
 
        else
1801
 
          sql_print_error(_("Slave SQL thread retried transaction %lu time(s) "
1802
 
                            "in vain, giving up. Consider raising the value of "
1803
 
                            "the slave_transaction_retries variable."),
1804
 
                          slave_trans_retries);
1805
 
      }
1806
 
      else if ((exec_res && !temp_err) ||
1807
 
               (opt_using_transactions &&
1808
 
                rli->group_relay_log_pos == rli->event_relay_log_pos))
1809
 
      {
1810
 
        /*
1811
 
          Only reset the retry counter if the entire group succeeded
1812
 
          or failed with a non-transient error.  On a successful
1813
 
          event, the execution will proceed as usual; in the case of a
1814
 
          non-transient error, the slave will stop with an error.
1815
 
         */
1816
 
        rli->trans_retries= 0; // restart from fresh
1817
 
      }
1818
 
    }
1819
 
    return(exec_res);
1820
 
  }
1821
 
  pthread_mutex_unlock(&rli->data_lock);
1822
 
  rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
1823
 
              ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
1824
 
              _("Could not parse relay log event entry. The possible reasons "
1825
 
                "are: the master's binary log is corrupted (you can check this "
1826
 
                "by running 'mysqlbinlog' on the binary log), the slave's "
1827
 
                "relay log is corrupted (you can check this by running "
1828
 
                "'mysqlbinlog' on the relay log), a network problem, or a bug "
1829
 
                "in the master's or slave's DRIZZLE code. If you want to check "
1830
 
                "the master's binary log or slave's relay log, you will be "
1831
 
                "able to know their names by issuing 'SHOW SLAVE STATUS' "
1832
 
                "on this slave."));
1833
 
  return(1);
1834
 
}
1835
 
 
1836
 
 
1837
 
/**
1838
 
  @brief Try to reconnect slave IO thread.
1839
 
 
1840
 
  @details Terminates current connection to master, sleeps for
1841
 
  @c mi->connect_retry msecs and initiates new connection with
1842
 
  @c safe_reconnect(). Variable pointed by @c retry_count is increased -
1843
 
  if it exceeds @c master_retry_count then connection is not re-established
1844
 
  and function signals error.
1845
 
  Unless @c suppres_warnings is TRUE, a warning is put in the server error log
1846
 
  when reconnecting. The warning message and messages used to report errors
1847
 
  are taken from @c messages array. In case @c master_retry_count is exceeded,
1848
 
  no messages are added to the log.
1849
 
 
1850
 
  @param[in]     session                 Thread context.
1851
 
  @param[in]     DRIZZLE               DRIZZLE connection.
1852
 
  @param[in]     mi                  Master connection information.
1853
 
  @param[in,out] retry_count         Number of attempts to reconnect.
1854
 
  @param[in]     suppress_warnings   TRUE when a normal net read timeout 
1855
 
                                     has caused to reconnecting.
1856
 
  @param[in]     messages            Messages to print/log, see 
1857
 
                                     reconnect_messages[] array.
1858
 
 
1859
 
  @retval        0                   OK.
1860
 
  @retval        1                   There was an error.
1861
 
*/
1862
 
 
1863
 
static int32_t try_to_reconnect(Session *session, DRIZZLE *drizzle, Master_info *mi,
1864
 
                                uint32_t *retry_count, bool suppress_warnings,
1865
 
                                const char *messages[SLAVE_RECON_MSG_MAX])
1866
 
{
1867
 
  mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1868
 
  session->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
1869
 
  drizzle_disconnect(drizzle);
1870
 
  if ((*retry_count)++)
1871
 
  {
1872
 
    if (*retry_count > master_retry_count)
1873
 
      return 1;                             // Don't retry forever
1874
 
    safe_sleep(session, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1875
 
               (void *) mi);
1876
 
  }
1877
 
  if (check_io_slave_killed(session, mi,
1878
 
                            _(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1879
 
    return 1;
1880
 
  session->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
1881
 
  if (!suppress_warnings)
1882
 
  {
1883
 
    char buf[256], llbuff[22];
1884
 
    snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1885
 
             IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
1886
 
    /*
1887
 
      Raise a warining during registering on master/requesting dump.
1888
 
      Log a message reading event.
1889
 
    */
1890
 
    if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
1891
 
    {
1892
 
      mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1893
 
                 ER(ER_SLAVE_MASTER_COM_FAILURE),
1894
 
                 _(messages[SLAVE_RECON_MSG_COMMAND]), buf);
1895
 
    }
1896
 
    else
1897
 
    {
1898
 
      sql_print_information("%s",buf);
1899
 
    }
1900
 
  }
1901
 
  if (safe_reconnect(session, drizzle, mi, 1) || io_slave_killed(session, mi))
1902
 
  {
1903
 
    if (global_system_variables.log_warnings)
1904
 
      sql_print_information("%s",_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1905
 
    return 1;
1906
 
  }
1907
 
  return 0;
1908
 
}
1909
 
 
1910
 
 
1911
 
/* Slave I/O Thread entry point */
1912
 
 
1913
 
pthread_handler_t handle_slave_io(void *arg)
1914
 
{
1915
 
  Session *session; // needs to be first for thread_stack
1916
 
  DRIZZLE *drizzle;
1917
 
  Master_info *mi = (Master_info*)arg;
1918
 
  Relay_log_info *rli= &mi->rli;
1919
 
  char llbuff[22];
1920
 
  uint32_t retry_count;
1921
 
  bool suppress_warnings;
1922
 
  uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
1923
 
  my_thread_init();
1924
 
 
1925
 
  assert(mi->inited);
1926
 
  drizzle= NULL ;
1927
 
  retry_count= 0;
1928
 
 
1929
 
  pthread_mutex_lock(&mi->run_lock);
1930
 
  /* Inform waiting threads that slave has started */
1931
 
  mi->slave_run_id++;
1932
 
 
1933
 
  mi->events_till_disconnect = disconnect_slave_event_count;
1934
 
 
1935
 
  session= new Session;
1936
 
  Session_CHECK_SENTRY(session);
1937
 
  mi->io_session = session;
1938
 
 
1939
 
  pthread_detach_this_thread();
1940
 
  session->thread_stack= (char*) &session; // remember where our stack is
1941
 
  if (init_slave_thread(session, SLAVE_Session_IO))
1942
 
  {
1943
 
    pthread_cond_broadcast(&mi->start_cond);
1944
 
    pthread_mutex_unlock(&mi->run_lock);
1945
 
    sql_print_error(_("Failed during slave I/O thread initialization"));
1946
 
    goto err;
1947
 
  }
1948
 
  pthread_mutex_lock(&LOCK_thread_count);
1949
 
  threads.append(session);
1950
 
  pthread_mutex_unlock(&LOCK_thread_count);
1951
 
  mi->slave_running = 1;
1952
 
  mi->abort_slave = 0;
1953
 
  pthread_mutex_unlock(&mi->run_lock);
1954
 
  pthread_cond_broadcast(&mi->start_cond);
1955
 
 
1956
 
  if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
1957
 
  {
1958
 
    mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
1959
 
               ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
1960
 
    goto err;
1961
 
  }
1962
 
 
1963
 
  session->set_proc_info("Connecting to master");
1964
 
  // we can get killed during safe_connect
1965
 
  if (!safe_connect(session, drizzle, mi))
1966
 
  {
1967
 
    sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
1968
 
                            "replication started in log '%s' at position %s"),
1969
 
                          mi->getUsername(), mi->getHostname(), mi->getPort(),
1970
 
                          IO_RPL_LOG_NAME,
1971
 
                          llstr(mi->getLogPosition(), llbuff));
1972
 
  /*
1973
 
    Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
1974
 
    thread, since a replication event can become this much larger than
1975
 
    the corresponding packet (query) sent from client to master.
1976
 
  */
1977
 
    drizzle->net.max_packet_size= session->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
1978
 
  }
1979
 
  else
1980
 
  {
1981
 
    sql_print_information(_("Slave I/O thread killed while connecting to master"));
1982
 
    goto err;
1983
 
  }
1984
 
 
1985
 
connected:
1986
 
 
1987
 
  // TODO: the assignment below should be under mutex (5.0)
1988
 
  mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
1989
 
  session->slave_net = &drizzle->net;
1990
 
  session->set_proc_info("Checking master version");
1991
 
  if (get_master_version_and_clock(drizzle, mi))
1992
 
    goto err;
1993
 
  
1994
 
  if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
1995
 
  {
1996
 
    /*
1997
 
      Register ourselves with the master.
1998
 
    */
1999
 
    session->set_proc_info("Registering slave on master");
2000
 
    if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2001
 
    {
2002
 
      if (!check_io_slave_killed(session, mi, "Slave I/O thread killed "
2003
 
                                 "while registering slave on master"))
2004
 
      {
2005
 
        sql_print_error(_("Slave I/O thread couldn't register on master"));
2006
 
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2007
 
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
2008
 
          goto err;
2009
 
      }
2010
 
      else
2011
 
        goto err;
2012
 
      goto connected;
2013
 
    }
2014
 
    if (!retry_count_reg)
2015
 
    {
2016
 
      retry_count_reg++;
2017
 
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
2018
 
      if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2019
 
                           reconnect_messages[SLAVE_RECON_ACT_REG]))
2020
 
        goto err;
2021
 
      goto connected;
2022
 
    }
2023
 
  }
2024
 
 
2025
 
  while (!io_slave_killed(session,mi))
2026
 
  {
2027
 
    session->set_proc_info("Requesting binlog dump");
2028
 
    if (request_dump(drizzle, mi, &suppress_warnings))
2029
 
    {
2030
 
      sql_print_error(_("Failed on request_dump()"));
2031
 
      if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while \
2032
 
requesting master dump")) ||
2033
 
          try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2034
 
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2035
 
        goto err;
2036
 
      goto connected;
2037
 
    }
2038
 
    if (!retry_count_dump)
2039
 
    {
2040
 
      retry_count_dump++;
2041
 
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
2042
 
      if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2043
 
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2044
 
        goto err;
2045
 
      goto connected;
2046
 
    }
2047
 
 
2048
 
    while (!io_slave_killed(session,mi))
2049
 
    {
2050
 
      uint32_t event_len;
2051
 
      /*
2052
 
        We say "waiting" because read_event() will wait if there's nothing to
2053
 
        read. But if there's something to read, it will not wait. The
2054
 
        important thing is to not confuse users by saying "reading" whereas
2055
 
        we're in fact receiving nothing.
2056
 
      */
2057
 
      session->set_proc_info(_("Waiting for master to send event"));
2058
 
      event_len= read_event(drizzle, mi, &suppress_warnings);
2059
 
      if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while "
2060
 
                                           "reading event")))
2061
 
        goto err;
2062
 
      if (!retry_count_event)
2063
 
      {
2064
 
        retry_count_event++;
2065
 
        sql_print_information(_("Forcing to reconnect slave I/O thread"));
2066
 
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2067
 
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2068
 
          goto err;
2069
 
        goto connected;
2070
 
      }
2071
 
 
2072
 
      if (event_len == packet_error)
2073
 
      {
2074
 
        uint32_t drizzle_error_number= drizzle_errno(drizzle);
2075
 
        switch (drizzle_error_number) {
2076
 
        case CR_NET_PACKET_TOO_LARGE:
2077
 
          sql_print_error(_("Log entry on master is longer than "
2078
 
                            "max_allowed_packet (%ld) on "
2079
 
                            "slave. If the entry is correct, restart the "
2080
 
                            "server with a higher value of "
2081
 
                            "max_allowed_packet"),
2082
 
                          session->variables.max_allowed_packet);
2083
 
          goto err;
2084
 
        case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2085
 
          sql_print_error(ER(drizzle_error_number), drizzle_error_number,
2086
 
                          drizzle_error(drizzle));
2087
 
          goto err;
2088
 
        case EE_OUTOFMEMORY:
2089
 
        case ER_OUTOFMEMORY:
2090
 
          sql_print_error(
2091
 
       _("Stopping slave I/O thread due to out-of-memory error from master"));
2092
 
          goto err;
2093
 
        }
2094
 
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2095
 
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2096
 
          goto err;
2097
 
        goto connected;
2098
 
      } // if (event_len == packet_error)
2099
 
 
2100
 
      retry_count=0;                    // ok event, reset retry counter
2101
 
      session->set_proc_info(_("Queueing master event to the relay log"));
2102
 
      if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2103
 
      {
2104
 
        goto err;
2105
 
      }
2106
 
      if (mi->flush())
2107
 
      {
2108
 
        sql_print_error(_("Failed to flush master info file"));
2109
 
        goto err;
2110
 
      }
2111
 
      /*
2112
 
        See if the relay logs take too much space.
2113
 
        We don't lock mi->rli.log_space_lock here; this dirty read saves time
2114
 
        and does not introduce any problem:
2115
 
        - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
2116
 
        the clean value is 0), then we are reading only one more event as we
2117
 
        should, and we'll block only at the next event. No big deal.
2118
 
        - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
2119
 
        the clean value is 1), then we are going into wait_for_relay_log_space()
2120
 
        for no reason, but this function will do a clean read, notice the clean
2121
 
        value and exit immediately.
2122
 
      */
2123
 
      if (rli->log_space_limit && rli->log_space_limit <
2124
 
          rli->log_space_total &&
2125
 
          !rli->ignore_log_space_limit)
2126
 
        if (wait_for_relay_log_space(rli))
2127
 
        {
2128
 
          sql_print_error(_("Slave I/O thread aborted while waiting for "
2129
 
                            "relay log space"));
2130
 
          goto err;
2131
 
        }
2132
 
    }
2133
 
  }
2134
 
 
2135
 
// error = 0;
2136
 
err:
2137
 
// print the current replication position
2138
 
  sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2139
 
                          "position %s"),
2140
 
                        IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
2141
 
  pthread_mutex_lock(&LOCK_thread_count);
2142
 
  session->query = session->db = 0; // extra safety
2143
 
  session->query_length= session->db_length= 0;
2144
 
  pthread_mutex_unlock(&LOCK_thread_count);
2145
 
  if (drizzle)
2146
 
  {
2147
 
    /*
2148
 
      Here we need to clear the active VIO before closing the
2149
 
      connection with the master.  The reason is that Session::awake()
2150
 
      might be called from terminate_slave_thread() because somebody
2151
 
      issued a STOP SLAVE.  If that happends, the close_active_vio()
2152
 
      can be called in the middle of closing the VIO associated with
2153
 
      the 'mysql' object, causing a crash.
2154
 
    */
2155
 
    drizzle_close(drizzle);
2156
 
    mi->drizzle=0;
2157
 
  }
2158
 
  write_ignored_events_info_to_relay_log(session, mi);
2159
 
  session->set_proc_info(_("Waiting for slave mutex on exit"));
2160
 
  pthread_mutex_lock(&mi->run_lock);
2161
 
 
2162
 
  /* Forget the relay log's format */
2163
 
  delete mi->rli.relay_log.description_event_for_queue;
2164
 
  mi->rli.relay_log.description_event_for_queue= 0;
2165
 
  assert(session->net.buff != 0);
2166
 
  net_end(&session->net); // destructor will not free it, because net.vio is 0
2167
 
  close_thread_tables(session);
2168
 
  pthread_mutex_lock(&LOCK_thread_count);
2169
 
  Session_CHECK_SENTRY(session);
2170
 
  delete session;
2171
 
  pthread_mutex_unlock(&LOCK_thread_count);
2172
 
  mi->abort_slave= 0;
2173
 
  mi->slave_running= 0;
2174
 
  mi->io_session= 0;
2175
 
  /*
2176
 
    Note: the order of the two following calls (first broadcast, then unlock)
2177
 
    is important. Otherwise a killer_thread can execute between the calls and
2178
 
    delete the mi structure leading to a crash! (see BUG#25306 for details)
2179
 
   */ 
2180
 
  pthread_cond_broadcast(&mi->stop_cond);       // tell the world we are done
2181
 
  pthread_mutex_unlock(&mi->run_lock);
2182
 
  my_thread_end();
2183
 
  pthread_exit(0);
2184
 
  return(0);                               // Can't return anything here
2185
 
}
2186
 
 
2187
 
 
2188
 
/* Slave SQL Thread entry point */
2189
 
 
2190
 
pthread_handler_t handle_slave_sql(void *arg)
2191
 
{
2192
 
  Session *session;                     /* needs to be first for thread_stack */
2193
 
  char llbuff[22],llbuff1[22];
2194
 
 
2195
 
  Relay_log_info* rli = &((Master_info*)arg)->rli;
2196
 
  const char *errmsg;
2197
 
 
2198
 
  my_thread_init();
2199
 
 
2200
 
  assert(rli->inited);
2201
 
  pthread_mutex_lock(&rli->run_lock);
2202
 
  assert(!rli->slave_running);
2203
 
  errmsg= 0;
2204
 
  rli->events_till_abort = abort_slave_event_count;
2205
 
 
2206
 
  session = new Session;
2207
 
  session->thread_stack = (char*)&session; // remember where our stack is
2208
 
  rli->sql_session= session;
2209
 
  
2210
 
  /* Inform waiting threads that slave has started */
2211
 
  rli->slave_run_id++;
2212
 
  rli->slave_running = 1;
2213
 
 
2214
 
  pthread_detach_this_thread();
2215
 
  if (init_slave_thread(session, SLAVE_Session_SQL))
2216
 
  {
2217
 
    /*
2218
 
      TODO: this is currently broken - slave start and change master
2219
 
      will be stuck if we fail here
2220
 
    */
2221
 
    pthread_cond_broadcast(&rli->start_cond);
2222
 
    pthread_mutex_unlock(&rli->run_lock);
2223
 
    sql_print_error(_("Failed during slave thread initialization"));
2224
 
    goto err;
2225
 
  }
2226
 
  session->init_for_queries();
2227
 
  session->temporary_tables = rli->save_temporary_tables; // restore temp tables
2228
 
  pthread_mutex_lock(&LOCK_thread_count);
2229
 
  threads.append(session);
2230
 
  pthread_mutex_unlock(&LOCK_thread_count);
2231
 
  /*
2232
 
    We are going to set slave_running to 1. Assuming slave I/O thread is
2233
 
    alive and connected, this is going to make Seconds_Behind_Master be 0
2234
 
    i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
2235
 
    the moment we start we can think we are caught up, and the next second we
2236
 
    start receiving data so we realize we are not caught up and
2237
 
    Seconds_Behind_Master grows. No big deal.
2238
 
  */
2239
 
  rli->abort_slave = 0;
2240
 
  pthread_mutex_unlock(&rli->run_lock);
2241
 
  pthread_cond_broadcast(&rli->start_cond);
2242
 
 
2243
 
  /*
2244
 
    Reset errors for a clean start (otherwise, if the master is idle, the SQL
2245
 
    thread may execute no Query_log_event, so the error will remain even
2246
 
    though there's no problem anymore). Do not reset the master timestamp
2247
 
    (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
2248
 
    as we are not sure that we are going to receive a query, we want to
2249
 
    remember the last master timestamp (to say how many seconds behind we are
2250
 
    now.
2251
 
    But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
2252
 
  */
2253
 
  rli->clear_error();
2254
 
 
2255
 
  //tell the I/O thread to take relay_log_space_limit into account from now on
2256
 
  pthread_mutex_lock(&rli->log_space_lock);
2257
 
  rli->ignore_log_space_limit= 0;
2258
 
  pthread_mutex_unlock(&rli->log_space_lock);
2259
 
  rli->trans_retries= 0; // start from "no error"
2260
 
 
2261
 
  if (init_relay_log_pos(rli,
2262
 
                         rli->group_relay_log_name.c_str(),
2263
 
                         rli->group_relay_log_pos,
2264
 
                         1 /*need data lock*/, &errmsg,
2265
 
                         1 /*look for a description_event*/))
2266
 
  {
2267
 
    sql_print_error(_("Error initializing relay log position: %s"),
2268
 
                    errmsg);
2269
 
    goto err;
2270
 
  }
2271
 
  Session_CHECK_SENTRY(session);
2272
 
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2273
 
  /*
2274
 
    Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2275
 
    correct position when it's called just after my_b_seek() (the questionable
2276
 
    stuff is those "seek is done on next read" comments in the my_b_seek()
2277
 
    source code).
2278
 
    The crude reality is that this assertion randomly fails whereas
2279
 
    replication seems to work fine. And there is no easy explanation why it
2280
 
    fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
2281
 
    init_relay_log_pos() called above). Maybe the assertion would be
2282
 
    meaningful if we held rli->data_lock between the my_b_seek() and the
2283
 
    assert().
2284
 
  */
2285
 
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2286
 
  assert(rli->sql_session == session);
2287
 
 
2288
 
  if (global_system_variables.log_warnings)
2289
 
    sql_print_information(_("Slave SQL thread initialized, "
2290
 
                            "starting replication in log '%s' at "
2291
 
                            "position %s, relay log '%s' position: %s"),
2292
 
                            RPL_LOG_NAME,
2293
 
                          llstr(rli->group_master_log_pos,llbuff),
2294
 
                          rli->group_relay_log_name.c_str(),
2295
 
                          llstr(rli->group_relay_log_pos,llbuff1));
2296
 
 
2297
 
  /* execute init_slave variable */
2298
 
  if (sys_init_slave.value_length)
2299
 
  {
2300
 
    execute_init_command(session, &sys_init_slave, &LOCK_sys_init_slave);
2301
 
    if (session->is_slave_error)
2302
 
    {
2303
 
      sql_print_error(_("Slave SQL thread aborted. "
2304
 
                        "Can't execute init_slave query"));
2305
 
      goto err;
2306
 
    }
2307
 
  }
2308
 
 
2309
 
  /*
2310
 
    First check until condition - probably there is nothing to execute. We
2311
 
    do not want to wait for next event in this case.
2312
 
  */
2313
 
  pthread_mutex_lock(&rli->data_lock);
2314
 
  if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
2315
 
      rli->is_until_satisfied(rli->group_master_log_pos))
2316
 
  {
2317
 
    char buf[22];
2318
 
    sql_print_information(_("Slave SQL thread stopped because it reached its"
2319
 
                            " UNTIL position %s"), llstr(rli->until_pos(), buf));
2320
 
    pthread_mutex_unlock(&rli->data_lock);
2321
 
    goto err;
2322
 
  }
2323
 
  pthread_mutex_unlock(&rli->data_lock);
2324
 
 
2325
 
  /* Read queries from the IO/THREAD until this thread is killed */
2326
 
 
2327
 
  while (!sql_slave_killed(session,rli))
2328
 
  {
2329
 
    session->set_proc_info(_("Reading event from the relay log"));
2330
 
    assert(rli->sql_session == session);
2331
 
    Session_CHECK_SENTRY(session);
2332
 
    if (exec_relay_log_event(session,rli))
2333
 
    {
2334
 
      // do not scare the user if SQL thread was simply killed or stopped
2335
 
      if (!sql_slave_killed(session,rli))
2336
 
      {
2337
 
        /*
2338
 
          retrieve as much info as possible from the session and, error
2339
 
          codes and warnings and print this to the error log as to
2340
 
          allow the user to locate the error
2341
 
        */
2342
 
        uint32_t const last_errno= rli->last_error().number;
2343
 
 
2344
 
        if (session->is_error())
2345
 
        {
2346
 
          char const *const errmsg= session->main_da.message();
2347
 
 
2348
 
          if (last_errno == 0)
2349
 
          {
2350
 
            rli->report(ERROR_LEVEL, session->main_da.sql_errno(), "%s", errmsg);
2351
 
          }
2352
 
          else if (last_errno != session->main_da.sql_errno())
2353
 
          {
2354
 
            sql_print_error(_("Slave (additional info): %s Error_code: %d"),
2355
 
                            errmsg, session->main_da.sql_errno());
2356
 
          }
2357
 
        }
2358
 
 
2359
 
        /* Print any warnings issued */
2360
 
        List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
2361
 
        DRIZZLE_ERROR *err;
2362
 
        /*
2363
 
          Added controlled slave thread cancel for replication
2364
 
          of user-defined variables.
2365
 
        */
2366
 
        bool udf_error = false;
2367
 
        while ((err= it++))
2368
 
        {
2369
 
          if (err->code == ER_CANT_OPEN_LIBRARY)
2370
 
            udf_error = true;
2371
 
          sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
2372
 
        }
2373
 
        if (udf_error)
2374
 
          sql_print_error(_("Error loading user-defined library, slave SQL "
2375
 
                            "thread aborted. Install the missing library, "
2376
 
                            "and restart the slave SQL thread with "
2377
 
                            "\"SLAVE START\". We stopped at log '%s' "
2378
 
                            "position %s"),
2379
 
                          RPL_LOG_NAME, llstr(rli->group_master_log_pos,
2380
 
            llbuff));
2381
 
        else
2382
 
          sql_print_error(_("Error running query, slave SQL thread aborted. "
2383
 
                            "Fix the problem, and restart "
2384
 
                            "the slave SQL thread with \"SLAVE START\". "
2385
 
                            "We stopped at log '%s' position %s"),
2386
 
                          RPL_LOG_NAME,
2387
 
                          llstr(rli->group_master_log_pos, llbuff));
2388
 
      }
2389
 
      goto err;
2390
 
    }
2391
 
  }
2392
 
 
2393
 
  /* Thread stopped. Print the current replication position to the log */
2394
 
  sql_print_information(_("Slave SQL thread exiting, replication stopped in "
2395
 
                          "log '%s' at position %s"),
2396
 
                        RPL_LOG_NAME,
2397
 
                        llstr(rli->group_master_log_pos,llbuff));
2398
 
 
2399
 
 err:
2400
 
 
2401
 
  /*
2402
 
    Some events set some playgrounds, which won't be cleared because thread
2403
 
    stops. Stopping of this thread may not be known to these events ("stop"
2404
 
    request is detected only by the present function, not by events), so we
2405
 
    must "proactively" clear playgrounds:
2406
 
  */
2407
 
  rli->cleanup_context(session, 1);
2408
 
  pthread_mutex_lock(&LOCK_thread_count);
2409
 
  /*
2410
 
    Some extra safety, which should not been needed (normally, event deletion
2411
 
    should already have done these assignments (each event which sets these
2412
 
    variables is supposed to set them to 0 before terminating)).
2413
 
  */
2414
 
  session->query= session->db= session->catalog= 0;
2415
 
  session->query_length= session->db_length= 0;
2416
 
  pthread_mutex_unlock(&LOCK_thread_count);
2417
 
  session->set_proc_info("Waiting for slave mutex on exit");
2418
 
  pthread_mutex_lock(&rli->run_lock);
2419
 
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
2420
 
  pthread_mutex_lock(&rli->data_lock);
2421
 
  assert(rli->slave_running == 1); // tracking buffer overrun
2422
 
  /* When master_pos_wait() wakes up it will check this and terminate */
2423
 
  rli->slave_running= 0;
2424
 
  /* Forget the relay log's format */
2425
 
  delete rli->relay_log.description_event_for_exec;
2426
 
  rli->relay_log.description_event_for_exec= 0;
2427
 
  /* Wake up master_pos_wait() */
2428
 
  pthread_mutex_unlock(&rli->data_lock);
2429
 
  pthread_cond_broadcast(&rli->data_cond);
2430
 
  rli->ignore_log_space_limit= 0; /* don't need any lock */
2431
 
  rli->save_temporary_tables = session->temporary_tables;
2432
 
 
2433
 
  /*
2434
 
    TODO: see if we can do this conditionally in next_event() instead
2435
 
    to avoid unneeded position re-init
2436
 
  */
2437
 
  session->temporary_tables = 0; // remove tempation from destructor to close them
2438
 
  assert(session->net.buff != 0);
2439
 
  net_end(&session->net); // destructor will not free it, because we are weird
2440
 
  assert(rli->sql_session == session);
2441
 
  Session_CHECK_SENTRY(session);
2442
 
  rli->sql_session= 0;
2443
 
  pthread_mutex_lock(&LOCK_thread_count);
2444
 
  Session_CHECK_SENTRY(session);
2445
 
  delete session;
2446
 
  pthread_mutex_unlock(&LOCK_thread_count);
2447
 
 /*
2448
 
  Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
2449
 
  is important. Otherwise a killer_thread can execute between the calls and
2450
 
  delete the mi structure leading to a crash! (see BUG#25306 for details)
2451
 
 */ 
2452
 
  pthread_cond_broadcast(&rli->stop_cond);
2453
 
  pthread_mutex_unlock(&rli->run_lock);  // tell the world we are done
2454
 
  
2455
 
  my_thread_end();
2456
 
  pthread_exit(0);
2457
 
  return(0);                               // Can't return anything here
2458
 
}
2459
 
 
2460
 
 
2461
 
/*
2462
 
  process_io_create_file()
2463
 
*/
2464
 
 
2465
 
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
2466
 
{
2467
 
  int32_t error = 1;
2468
 
  uint32_t num_bytes;
2469
 
  bool cev_not_written;
2470
 
  Session *session = mi->io_session;
2471
 
  NET *net = &mi->drizzle->net;
2472
 
 
2473
 
  if (unlikely(!cev->is_valid()))
2474
 
    return(1);
2475
 
 
2476
 
  if (!rpl_filter->db_ok(cev->db))
2477
 
  {
2478
 
    skip_load_data_infile(net);
2479
 
    return(0);
2480
 
  }
2481
 
  assert(cev->inited_from_old);
2482
 
  session->file_id = cev->file_id = mi->file_id++;
2483
 
  session->server_id = cev->server_id;
2484
 
  cev_not_written = 1;
2485
 
 
2486
 
  if (unlikely(net_request_file(net,cev->fname)))
2487
 
  {
2488
 
    sql_print_error(_("Slave I/O: failed requesting download of '%s'"),
2489
 
                    cev->fname);
2490
 
    goto err;
2491
 
  }
2492
 
 
2493
 
  /*
2494
 
    This dummy block is so we could instantiate Append_block_log_event
2495
 
    once and then modify it slightly instead of doing it multiple times
2496
 
    in the loop
2497
 
  */
2498
 
  {
2499
 
    Append_block_log_event aev(session,0,0,0,0);
2500
 
 
2501
 
    for (;;)
2502
 
    {
2503
 
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
2504
 
      {
2505
 
        sql_print_error(_("Network read error downloading '%s' from master"),
2506
 
                        cev->fname);
2507
 
        goto err;
2508
 
      }
2509
 
      if (unlikely(!num_bytes)) /* eof */
2510
 
      {
2511
 
        /* 3.23 master wants it */
2512
 
        net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0);
2513
 
        /*
2514
 
          If we wrote Create_file_log_event, then we need to write
2515
 
          Execute_load_log_event. If we did not write Create_file_log_event,
2516
 
          then this is an empty file and we can just do as if the LOAD DATA
2517
 
          INFILE had not existed, i.e. write nothing.
2518
 
        */
2519
 
        if (unlikely(cev_not_written))
2520
 
          break;
2521
 
        Execute_load_log_event xev(session,0,0);
2522
 
        xev.log_pos = cev->log_pos;
2523
 
        if (unlikely(mi->rli.relay_log.append(&xev)))
2524
 
        {
2525
 
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2526
 
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2527
 
                     _("error writing Exec_load event to relay log"));
2528
 
          goto err;
2529
 
        }
2530
 
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2531
 
        break;
2532
 
      }
2533
 
      if (unlikely(cev_not_written))
2534
 
      {
2535
 
        cev->block = net->read_pos;
2536
 
        cev->block_len = num_bytes;
2537
 
        if (unlikely(mi->rli.relay_log.append(cev)))
2538
 
        {
2539
 
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2540
 
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2541
 
                     _("error writing Create_file event to relay log"));
2542
 
          goto err;
2543
 
        }
2544
 
        cev_not_written=0;
2545
 
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
2546
 
      }
2547
 
      else
2548
 
      {
2549
 
        aev.block = net->read_pos;
2550
 
        aev.block_len = num_bytes;
2551
 
        aev.log_pos = cev->log_pos;
2552
 
        if (unlikely(mi->rli.relay_log.append(&aev)))
2553
 
        {
2554
 
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2555
 
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2556
 
                     _("error writing Append_block event to relay log"));
2557
 
          goto err;
2558
 
        }
2559
 
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
2560
 
      }
2561
 
    }
2562
 
  }
2563
 
  error=0;
2564
 
err:
2565
 
  return(error);
2566
 
}
2567
 
 
2568
 
 
2569
 
/*
2570
 
  Start using a new binary log on the master
2571
 
 
2572
 
  SYNOPSIS
2573
 
    process_io_rotate()
2574
 
    mi                  master_info for the slave
2575
 
    rev                 The rotate log event read from the binary log
2576
 
 
2577
 
  DESCRIPTION
2578
 
    Updates the master info with the place in the next binary
2579
 
    log where we should start reading.
2580
 
    Rotate the relay log to avoid mixed-format relay logs.
2581
 
 
2582
 
  NOTES
2583
 
    We assume we already locked mi->data_lock
2584
 
 
2585
 
  RETURN VALUES
2586
 
    0           ok
2587
 
    1           Log event is illegal
2588
 
 
2589
 
*/
2590
 
 
2591
 
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
2592
 
{
2593
 
  safe_mutex_assert_owner(&mi->data_lock);
2594
 
 
2595
 
  if (unlikely(!rev->is_valid()))
2596
 
    return(1);
2597
 
 
2598
 
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2599
 
  mi->setLogName(rev->new_log_ident);
2600
 
  mi->setLogPosition(rev->pos);
2601
 
  /*
2602
 
    If we do not do this, we will be getting the first
2603
 
    rotate event forever, so we need to not disconnect after one.
2604
 
  */
2605
 
  if (disconnect_slave_event_count)
2606
 
    mi->events_till_disconnect++;
2607
 
 
2608
 
  /*
2609
 
    If description_event_for_queue is format <4, there is conversion in the
2610
 
    relay log to the slave's format (4). And Rotate can mean upgrade or
2611
 
    nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
2612
 
    no need to reset description_event_for_queue now. And if it's nothing (same
2613
 
    master version as before), no need (still using the slave's format).
2614
 
  */
2615
 
  if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
2616
 
  {
2617
 
    delete mi->rli.relay_log.description_event_for_queue;
2618
 
    /* start from format 3 (DRIZZLE 4.0) again */
2619
 
    mi->rli.relay_log.description_event_for_queue= new
2620
 
      Format_description_log_event(3);
2621
 
  }
2622
 
  /*
2623
 
    Rotate the relay log makes binlog format detection easier (at next slave
2624
 
    start or mysqlbinlog)
2625
 
  */
2626
 
  rotate_relay_log(mi); /* will take the right mutexes */
2627
 
  return(0);
2628
 
}
2629
 
 
2630
 
/*
2631
 
  Reads a 3.23 event and converts it to the slave's format. This code was
2632
 
  copied from DRIZZLE 4.0.
2633
 
*/
2634
 
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
2635
 
                           uint32_t event_len)
2636
 
{
2637
 
  const char *errmsg = 0;
2638
 
  uint32_t inc_pos;
2639
 
  bool ignore_event= 0;
2640
 
  char *tmp_buf = 0;
2641
 
  Relay_log_info *rli= &mi->rli;
2642
 
 
2643
 
  /*
2644
 
    If we get Load event, we need to pass a non-reusable buffer
2645
 
    to read_log_event, so we do a trick
2646
 
  */
2647
 
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
2648
 
  {
2649
 
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
2650
 
    {
2651
 
      mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2652
 
                 ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
2653
 
      return(1);
2654
 
    }
2655
 
    memcpy(tmp_buf,buf,event_len);
2656
 
    /*
2657
 
      Create_file constructor wants a 0 as last char of buffer, this 0 will
2658
 
      serve as the string-termination char for the file's name (which is at the
2659
 
      end of the buffer)
2660
 
      We must increment event_len, otherwise the event constructor will not see
2661
 
      this end 0, which leads to segfault.
2662
 
    */
2663
 
    tmp_buf[event_len++]=0;
2664
 
    int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
2665
 
    buf = (const char*)tmp_buf;
2666
 
  }
2667
 
  /*
2668
 
    This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
2669
 
    send the loaded file, and write it to the relay log in the form of
2670
 
    Append_block/Exec_load (the SQL thread needs the data, as that thread is not
2671
 
    connected to the master).
2672
 
  */
2673
 
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2674
 
                                            mi->rli.relay_log.description_event_for_queue);
2675
 
  if (unlikely(!ev))
2676
 
  {
2677
 
    sql_print_error(_("Read invalid event from master: '%s', "
2678
 
                      "master could be corrupt but a more likely cause "
2679
 
                      "of this is a bug"),
2680
 
                    errmsg);
2681
 
    free((char*) tmp_buf);
2682
 
    return(1);
2683
 
  }
2684
 
 
2685
 
  pthread_mutex_lock(&mi->data_lock);
2686
 
  ev->log_pos= mi->getLogPosition(); /* 3.23 events don't contain log_pos */
2687
 
  switch (ev->get_type_code()) {
2688
 
  case STOP_EVENT:
2689
 
    ignore_event= 1;
2690
 
    inc_pos= event_len;
2691
 
    break;
2692
 
  case ROTATE_EVENT:
2693
 
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2694
 
    {
2695
 
      delete ev;
2696
 
      pthread_mutex_unlock(&mi->data_lock);
2697
 
      return(1);
2698
 
    }
2699
 
    inc_pos= 0;
2700
 
    break;
2701
 
  case CREATE_FILE_EVENT:
2702
 
    /*
2703
 
      Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
2704
 
      queue_old_event() which is for 3.23 events which don't comprise
2705
 
      CREATE_FILE_EVENT. This is because read_log_event() above has just
2706
 
      transformed LOAD_EVENT into CREATE_FILE_EVENT.
2707
 
    */
2708
 
  {
2709
 
    /* We come here when and only when tmp_buf != 0 */
2710
 
    assert(tmp_buf != 0);
2711
 
    inc_pos=event_len;
2712
 
    ev->log_pos+= inc_pos;
2713
 
    int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2714
 
    delete ev;
2715
 
    mi->incrementLogPosition(inc_pos);
2716
 
    pthread_mutex_unlock(&mi->data_lock);
2717
 
    free((char*)tmp_buf);
2718
 
    return(error);
2719
 
  }
2720
 
  default:
2721
 
    inc_pos= event_len;
2722
 
    break;
2723
 
  }
2724
 
  if (likely(!ignore_event))
2725
 
  {
2726
 
    if (ev->log_pos)
2727
 
      /*
2728
 
         Don't do it for fake Rotate events (see comment in
2729
 
      Log_event::Log_event(const char* buf...) in log_event.cc).
2730
 
      */
2731
 
      ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
2732
 
    if (unlikely(rli->relay_log.append(ev)))
2733
 
    {
2734
 
      delete ev;
2735
 
      pthread_mutex_unlock(&mi->data_lock);
2736
 
      return(1);
2737
 
    }
2738
 
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2739
 
  }
2740
 
  delete ev;
2741
 
  mi->incrementLogPosition(inc_pos);
2742
 
  pthread_mutex_unlock(&mi->data_lock);
2743
 
  return(0);
2744
 
}
2745
 
 
2746
 
/*
2747
 
  Reads a 4.0 event and converts it to the slave's format. This code was copied
2748
 
  from queue_binlog_ver_1_event(), with some affordable simplifications.
2749
 
*/
2750
 
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
2751
 
                           uint32_t event_len)
2752
 
{
2753
 
  const char *errmsg = 0;
2754
 
  uint32_t inc_pos;
2755
 
  char *tmp_buf = 0;
2756
 
  Relay_log_info *rli= &mi->rli;
2757
 
 
2758
 
  /* read_log_event() will adjust log_pos to be end_log_pos */
2759
 
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
2760
 
                                            mi->rli.relay_log.description_event_for_queue);
2761
 
  if (unlikely(!ev))
2762
 
  {
2763
 
    sql_print_error(_("Read invalid event from master: '%s', "
2764
 
                      "master could be corrupt but a more likely cause of "
2765
 
                      "this is a bug"),
2766
 
                    errmsg);
2767
 
    free((char*) tmp_buf);
2768
 
    return(1);
2769
 
  }
2770
 
  pthread_mutex_lock(&mi->data_lock);
2771
 
  switch (ev->get_type_code()) {
2772
 
  case STOP_EVENT:
2773
 
    goto err;
2774
 
  case ROTATE_EVENT:
2775
 
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
2776
 
    {
2777
 
      delete ev;
2778
 
      pthread_mutex_unlock(&mi->data_lock);
2779
 
      return(1);
2780
 
    }
2781
 
    inc_pos= 0;
2782
 
    break;
2783
 
  default:
2784
 
    inc_pos= event_len;
2785
 
    break;
2786
 
  }
2787
 
  if (unlikely(rli->relay_log.append(ev)))
2788
 
  {
2789
 
    delete ev;
2790
 
    pthread_mutex_unlock(&mi->data_lock);
2791
 
    return(1);
2792
 
  }
2793
 
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2794
 
  delete ev;
2795
 
  mi->incrementLogPosition(inc_pos);
2796
 
err:
2797
 
  pthread_mutex_unlock(&mi->data_lock);
2798
 
  return(0);
2799
 
}
2800
 
 
2801
 
/*
2802
 
  queue_old_event()
2803
 
 
2804
 
  Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
2805
 
  (exactly, slave's) format. To do the conversion, we create a 5.0 event from
2806
 
  the 3.23/4.0 bytes, then write this event to the relay log.
2807
 
 
2808
 
  TODO:
2809
 
    Test this code before release - it has to be tested on a separate
2810
 
    setup with 3.23 master or 4.0 master
2811
 
*/
2812
 
 
2813
 
static int32_t queue_old_event(Master_info *mi, const char *buf,
2814
 
                           uint32_t event_len)
2815
 
{
2816
 
  switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
2817
 
  {
2818
 
  case 1:
2819
 
      return(queue_binlog_ver_1_event(mi,buf,event_len));
2820
 
  case 3:
2821
 
      return(queue_binlog_ver_3_event(mi,buf,event_len));
2822
 
  default: /* unsupported format; eg version 2 */
2823
 
    return(1);
2824
 
  }
2825
 
}
2826
 
 
2827
 
/*
2828
 
  queue_event()
2829
 
 
2830
 
  If the event is 3.23/4.0, passes it to queue_old_event() which will convert
2831
 
  it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
2832
 
  no format conversion, it's pure read/write of bytes.
2833
 
  So a 5.0.0 slave's relay log can contain events in the slave's format or in
2834
 
  any >=5.0.0 format.
2835
 
*/
2836
 
 
2837
 
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
2838
 
{
2839
 
  int32_t error= 0;
2840
 
  String error_msg;
2841
 
  uint32_t inc_pos= 0;
2842
 
  Relay_log_info *rli= &mi->rli;
2843
 
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
2844
 
 
2845
 
 
2846
 
  if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
2847
 
      buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
2848
 
    return(queue_old_event(mi,buf,event_len));
2849
 
 
2850
 
  pthread_mutex_lock(&mi->data_lock);
2851
 
 
2852
 
  switch (buf[EVENT_TYPE_OFFSET]) {
2853
 
  case STOP_EVENT:
2854
 
    /*
2855
 
      We needn't write this event to the relay log. Indeed, it just indicates a
2856
 
      master server shutdown. The only thing this does is cleaning. But
2857
 
      cleaning is already done on a per-master-thread basis (as the master
2858
 
      server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
2859
 
      prepared statements' deletion are TODO only when we binlog prep stmts).
2860
 
 
2861
 
      We don't even increment mi->master_log_pos, because we may be just after
2862
 
      a Rotate event. Btw, in a few milliseconds we are going to have a Start
2863
 
      event from the next binlog (unless the master is presently running
2864
 
      without --log-bin).
2865
 
    */
2866
 
    goto err;
2867
 
  case ROTATE_EVENT:
2868
 
  {
2869
 
    Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
2870
 
    if (unlikely(process_io_rotate(mi,&rev)))
2871
 
    {
2872
 
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2873
 
      goto err;
2874
 
    }
2875
 
    /*
2876
 
      Now the I/O thread has just changed its mi->master_log_name, so
2877
 
      incrementing mi->master_log_pos is nonsense.
2878
 
    */
2879
 
    inc_pos= 0;
2880
 
    break;
2881
 
  }
2882
 
  case FORMAT_DESCRIPTION_EVENT:
2883
 
  {
2884
 
    /*
2885
 
      Create an event, and save it (when we rotate the relay log, we will have
2886
 
      to write this event again).
2887
 
    */
2888
 
    /*
2889
 
      We are the only thread which reads/writes description_event_for_queue.
2890
 
      The relay_log struct does not move (though some members of it can
2891
 
      change), so we needn't any lock (no rli->data_lock, no log lock).
2892
 
    */
2893
 
    Format_description_log_event* tmp;
2894
 
    const char* errmsg;
2895
 
    if (!(tmp= (Format_description_log_event*)
2896
 
          Log_event::read_log_event(buf, event_len, &errmsg,
2897
 
                                    mi->rli.relay_log.description_event_for_queue)))
2898
 
    {
2899
 
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
2900
 
      goto err;
2901
 
    }
2902
 
    delete mi->rli.relay_log.description_event_for_queue;
2903
 
    mi->rli.relay_log.description_event_for_queue= tmp;
2904
 
    /*
2905
 
       Though this does some conversion to the slave's format, this will
2906
 
       preserve the master's binlog format version, and number of event types.
2907
 
    */
2908
 
    /*
2909
 
       If the event was not requested by the slave (the slave did not ask for
2910
 
       it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
2911
 
    */
2912
 
    inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
2913
 
  }
2914
 
  break;
2915
 
 
2916
 
  case HEARTBEAT_LOG_EVENT:
2917
 
  {
2918
 
    /*
2919
 
      HB (heartbeat) cannot come before RL (Relay)
2920
 
    */
2921
 
    char  llbuf[22];
2922
 
    Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
2923
 
    if (!hb.is_valid())
2924
 
    {
2925
 
      error= ER_SLAVE_HEARTBEAT_FAILURE;
2926
 
      error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
2927
 
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2928
 
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2929
 
      error_msg.append(STRING_WITH_LEN(" log_pos "));
2930
 
      llstr(hb.log_pos, llbuf);
2931
 
      error_msg.append(llbuf, strlen(llbuf));
2932
 
      goto err;
2933
 
    }
2934
 
    mi->received_heartbeats++;
2935
 
    /* 
2936
 
       compare local and event's versions of log_file, log_pos.
2937
 
       
2938
 
       Heartbeat is sent only after an event corresponding to the corrdinates
2939
 
       the heartbeat carries.
2940
 
       Slave can not have a difference in coordinates except in the only
2941
 
       special case when mi->master_log_name, master_log_pos have never
2942
 
       been updated by Rotate event i.e when slave does not have any history
2943
 
       with the master (and thereafter mi->master_log_pos is NULL).
2944
 
 
2945
 
       TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2946
 
    */
2947
 
    if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
2948
 
        || mi->getLogPosition() != hb.log_pos)
2949
 
    {
2950
 
      /* missed events of heartbeat from the past */
2951
 
      error= ER_SLAVE_HEARTBEAT_FAILURE;
2952
 
      error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
2953
 
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
2954
 
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
2955
 
      error_msg.append(STRING_WITH_LEN(" log_pos "));
2956
 
      llstr(hb.log_pos, llbuf);
2957
 
      error_msg.append(llbuf, strlen(llbuf));
2958
 
      goto err;
2959
 
    }
2960
 
    goto skip_relay_logging;
2961
 
  }
2962
 
  break;
2963
 
    
2964
 
  default:
2965
 
    inc_pos= event_len;
2966
 
    break;
2967
 
  }
2968
 
 
2969
 
  /*
2970
 
     If this event is originating from this server, don't queue it.
2971
 
     We don't check this for 3.23 events because it's simpler like this; 3.23
2972
 
     will be filtered anyway by the SQL slave thread which also tests the
2973
 
     server id (we must also keep this test in the SQL thread, in case somebody
2974
 
     upgrades a 4.0 slave which has a not-filtered relay log).
2975
 
 
2976
 
     ANY event coming from ourselves can be ignored: it is obvious for queries;
2977
 
     for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
2978
 
     (--log-slave-updates would not log that) unless this slave is also its
2979
 
     direct master (an unsupported, useless setup!).
2980
 
  */
2981
 
 
2982
 
  pthread_mutex_lock(log_lock);
2983
 
 
2984
 
  if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
2985
 
      !mi->rli.replicate_same_server_id)
2986
 
  {
2987
 
    /*
2988
 
      Do not write it to the relay log.
2989
 
      a) We still want to increment mi->master_log_pos, so that we won't
2990
 
      re-read this event from the master if the slave IO thread is now
2991
 
      stopped/restarted (more efficient if the events we are ignoring are big
2992
 
      LOAD DATA INFILE).
2993
 
      b) We want to record that we are skipping events, for the information of
2994
 
      the slave SQL thread, otherwise that thread may let
2995
 
      rli->group_relay_log_pos stay too small if the last binlog's event is
2996
 
      ignored.
2997
 
      But events which were generated by this slave and which do not exist in
2998
 
      the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
2999
 
      mi->master_log_pos.
3000
 
    */
3001
 
    if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
3002
 
        buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3003
 
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3004
 
    {
3005
 
      mi->incrementLogPosition(inc_pos);
3006
 
      memcpy(rli->ign_master_log_name_end, mi->getLogName(), FN_REFLEN);
3007
 
      assert(rli->ign_master_log_name_end[0]);
3008
 
      rli->ign_master_log_pos_end= mi->getLogPosition();
3009
 
    }
3010
 
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3011
 
  }
3012
 
  else
3013
 
  {
3014
 
    /* write the event to the relay log */
3015
 
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3016
 
    {
3017
 
      mi->incrementLogPosition(inc_pos);
3018
 
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3019
 
    }
3020
 
    else
3021
 
    {
3022
 
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
3023
 
    }
3024
 
    rli->ign_master_log_name_end[0]= 0; // last event is not ignored
3025
 
  }
3026
 
  pthread_mutex_unlock(log_lock);
3027
 
 
3028
 
skip_relay_logging:
3029
 
  
3030
 
err:
3031
 
  pthread_mutex_unlock(&mi->data_lock);
3032
 
  if (error)
3033
 
    mi->report(ERROR_LEVEL, error, ER(error),
3034
 
               (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
3035
 
               _("could not queue event from master") :
3036
 
               error_msg.ptr());
3037
 
  return(error);
3038
 
}
3039
 
 
3040
 
 
3041
 
void end_relay_log_info(Relay_log_info* rli)
3042
 
{
3043
 
  if (!rli->inited)
3044
 
    return;
3045
 
  if (rli->info_fd >= 0)
3046
 
  {
3047
 
    end_io_cache(&rli->info_file);
3048
 
    (void) my_close(rli->info_fd, MYF(MY_WME));
3049
 
    rli->info_fd = -1;
3050
 
  }
3051
 
  if (rli->cur_log_fd >= 0)
3052
 
  {
3053
 
    end_io_cache(&rli->cache_buf);
3054
 
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
3055
 
    rli->cur_log_fd = -1;
3056
 
  }
3057
 
  rli->inited = 0;
3058
 
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
3059
 
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3060
 
  /*
3061
 
    Delete the slave's temporary tables from memory.
3062
 
    In the future there will be other actions than this, to ensure persistance
3063
 
    of slave's temp tables after shutdown.
3064
 
  */
3065
 
  rli->close_temporary_tables();
3066
 
  return;
3067
 
}
3068
 
 
3069
 
/*
3070
 
  Try to connect until successful or slave killed
3071
 
 
3072
 
  SYNPOSIS
3073
 
    safe_connect()
3074
 
    session                 Thread handler for slave
3075
 
    DRIZZLE               DRIZZLE connection handle
3076
 
    mi                  Replication handle
3077
 
 
3078
 
  RETURN
3079
 
    0   ok
3080
 
    #   Error
3081
 
*/
3082
 
 
3083
 
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi)
3084
 
{
3085
 
  return(connect_to_master(session, drizzle, mi, 0, 0));
3086
 
}
3087
 
 
3088
 
 
3089
 
/*
3090
 
  SYNPOSIS
3091
 
    connect_to_master()
3092
 
 
3093
 
  IMPLEMENTATION
3094
 
    Try to connect until successful or slave killed or we have retried
3095
 
    master_retry_count times
3096
 
*/
3097
 
 
3098
 
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
3099
 
                                 bool reconnect, bool suppress_warnings)
3100
 
{
3101
 
  int32_t slave_was_killed;
3102
 
  int32_t last_errno= -2;                           // impossible error
3103
 
  uint32_t err_count=0;
3104
 
  char llbuff[22];
3105
 
 
3106
 
  mi->events_till_disconnect = disconnect_slave_event_count;
3107
 
  uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
3108
 
  if (opt_slave_compressed_protocol)
3109
 
    client_flag=CLIENT_COMPRESS;                /* We will use compression */
3110
 
 
3111
 
  drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3112
 
  drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3113
 
 
3114
 
  while (!(slave_was_killed = io_slave_killed(session,mi)) &&
3115
 
         (reconnect ? drizzle_reconnect(drizzle) != 0 :
3116
 
          drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
3117
 
                             mi->getPort(), 0, client_flag) == 0))
3118
 
  {
3119
 
    /* Don't repeat last error */
3120
 
    if ((int32_t)drizzle_errno(drizzle) != last_errno)
3121
 
    {
3122
 
      last_errno=drizzle_errno(drizzle);
3123
 
      suppress_warnings= 0;
3124
 
      mi->report(ERROR_LEVEL, last_errno,
3125
 
                 _("error %s to master '%s@%s:%d'"
3126
 
                   " - retry-time: %d  retries: %u"),
3127
 
                 (reconnect ? _("reconnecting") : _("connecting")),
3128
 
                 mi->getUsername(), mi->getHostname(), mi->getPort(),
3129
 
                 mi->getConnectionRetry(), master_retry_count);
3130
 
    }
3131
 
    /*
3132
 
      By default we try forever. The reason is that failure will trigger
3133
 
      master election, so if the user did not set master_retry_count we
3134
 
      do not want to have election triggered on the first failure to
3135
 
      connect
3136
 
    */
3137
 
    if (++err_count == master_retry_count)
3138
 
    {
3139
 
      slave_was_killed=1;
3140
 
      break;
3141
 
    }
3142
 
    safe_sleep(session,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3143
 
               (void*)mi);
3144
 
  }
3145
 
 
3146
 
  if (!slave_was_killed)
3147
 
  {
3148
 
    if (reconnect)
3149
 
    {
3150
 
      if (!suppress_warnings && global_system_variables.log_warnings)
3151
 
        sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3152
 
                                "replication resumed in log '%s' at "
3153
 
                                "position %s"), mi->getUsername(),
3154
 
                                mi->getHostname(), mi->getPort(),
3155
 
                                IO_RPL_LOG_NAME,
3156
 
                                llstr(mi->getLogPosition(),llbuff));
3157
 
    }
3158
 
  }
3159
 
  drizzle->reconnect= 1;
3160
 
  return(slave_was_killed);
3161
 
}
3162
 
 
3163
 
 
3164
 
/*
3165
 
  safe_reconnect()
3166
 
 
3167
 
  IMPLEMENTATION
3168
 
    Try to connect until successful or slave killed or we have retried
3169
 
    master_retry_count times
3170
 
*/
3171
 
 
3172
 
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
3173
 
                          bool suppress_warnings)
3174
 
{
3175
 
  return(connect_to_master(session, drizzle, mi, 1, suppress_warnings));
3176
 
}
3177
 
 
3178
 
 
3179
 
/*
3180
 
  Store the file and position where the execute-slave thread are in the
3181
 
  relay log.
3182
 
 
3183
 
  SYNOPSIS
3184
 
    flush_relay_log_info()
3185
 
    rli                 Relay log information
3186
 
 
3187
 
  NOTES
3188
 
    - As this is only called by the slave thread, we don't need to
3189
 
      have a lock on this.
3190
 
    - If there is an active transaction, then we don't update the position
3191
 
      in the relay log.  This is to ensure that we re-execute statements
3192
 
      if we die in the middle of an transaction that was rolled back.
3193
 
    - As a transaction never spans binary logs, we don't have to handle the
3194
 
      case where we do a relay-log-rotation in the middle of the transaction.
3195
 
      If this would not be the case, we would have to ensure that we
3196
 
      don't delete the relay log file where the transaction started when
3197
 
      we switch to a new relay log file.
3198
 
 
3199
 
  TODO
3200
 
    - Change the log file information to a binary format to avoid calling
3201
 
      int64_t2str.
3202
 
 
3203
 
  RETURN VALUES
3204
 
    0   ok
3205
 
    1   write error
3206
 
*/
3207
 
 
3208
 
bool flush_relay_log_info(Relay_log_info* rli)
3209
 
{
3210
 
  bool error= 0;
3211
 
 
3212
 
  if (unlikely(rli->no_storage))
3213
 
    return(0);
3214
 
 
3215
 
  return(error);
3216
 
}
3217
 
 
3218
 
 
3219
 
/*
3220
 
  Called when we notice that the current "hot" log got rotated under our feet.
3221
 
*/
3222
 
 
3223
 
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
3224
 
{
3225
 
  assert(rli->cur_log != &rli->cache_buf);
3226
 
  assert(rli->cur_log_fd == -1);
3227
 
 
3228
 
  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3229
 
  if ((rli->cur_log_fd=open_binlog(cur_log, rli->event_relay_log_name.c_str(), errmsg)) < 0)
3230
 
    return(0);
3231
 
  /*
3232
 
    We want to start exactly where we was before:
3233
 
    relay_log_pos       Current log pos
3234
 
    pending             Number of bytes already processed from the event
3235
 
  */
3236
 
  rli->event_relay_log_pos= cmax(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3237
 
  my_b_seek(cur_log,rli->event_relay_log_pos);
3238
 
  return(cur_log);
3239
 
}
3240
 
 
3241
 
 
3242
 
static Log_event* next_event(Relay_log_info* rli)
3243
 
{
3244
 
  Log_event* ev;
3245
 
  IO_CACHE* cur_log = rli->cur_log;
3246
 
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3247
 
  const char* errmsg=0;
3248
 
  Session* session = rli->sql_session;
3249
 
 
3250
 
  assert(session != 0);
3251
 
 
3252
 
  if (abort_slave_event_count && !rli->events_till_abort--)
3253
 
    return(0);
3254
 
 
3255
 
  /*
3256
 
    For most operations we need to protect rli members with data_lock,
3257
 
    so we assume calling function acquired this mutex for us and we will
3258
 
    hold it for the most of the loop below However, we will release it
3259
 
    whenever it is worth the hassle,  and in the cases when we go into a
3260
 
    pthread_cond_wait() with the non-data_lock mutex
3261
 
  */
3262
 
  safe_mutex_assert_owner(&rli->data_lock);
3263
 
 
3264
 
  while (!sql_slave_killed(session,rli))
3265
 
  {
3266
 
    /*
3267
 
      We can have two kinds of log reading:
3268
 
      hot_log:
3269
 
        rli->cur_log points at the IO_CACHE of relay_log, which
3270
 
        is actively being updated by the I/O thread. We need to be careful
3271
 
        in this case and make sure that we are not looking at a stale log that
3272
 
        has already been rotated. If it has been, we reopen the log.
3273
 
 
3274
 
      The other case is much simpler:
3275
 
        We just have a read only log that nobody else will be updating.
3276
 
    */
3277
 
    bool hot_log;
3278
 
    if ((hot_log = (cur_log != &rli->cache_buf)))
3279
 
    {
3280
 
      assert(rli->cur_log_fd == -1); // foreign descriptor
3281
 
      pthread_mutex_lock(log_lock);
3282
 
 
3283
 
      /*
3284
 
        Reading xxx_file_id is safe because the log will only
3285
 
        be rotated when we hold relay_log.LOCK_log
3286
 
      */
3287
 
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
3288
 
      {
3289
 
        // The master has switched to a new log file; Reopen the old log file
3290
 
        cur_log=reopen_relay_log(rli, &errmsg);
3291
 
        pthread_mutex_unlock(log_lock);
3292
 
        if (!cur_log)                           // No more log files
3293
 
          goto err;
3294
 
        hot_log=0;                              // Using old binary log
3295
 
      }
3296
 
    }
3297
 
    /* 
3298
 
      As there is no guarantee that the relay is open (for example, an I/O
3299
 
      error during a write by the slave I/O thread may have closed it), we
3300
 
      have to test it.
3301
 
    */
3302
 
    if (!my_b_inited(cur_log))
3303
 
      goto err;
3304
 
    assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
3305
 
    assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
3306
 
 
3307
 
    /*
3308
 
      Relay log is always in new format - if the master is 3.23, the
3309
 
      I/O thread will convert the format for us.
3310
 
      A problem: the description event may be in a previous relay log. So if
3311
 
      the slave has been shutdown meanwhile, we would have to look in old relay
3312
 
      logs, which may even have been deleted. So we need to write this
3313
 
      description event at the beginning of the relay log.
3314
 
      When the relay log is created when the I/O thread starts, easy: the
3315
 
      master will send the description event and we will queue it.
3316
 
      But if the relay log is created by new_file(): then the solution is:
3317
 
      DRIZZLE_BIN_LOG::open() will write the buffered description event.
3318
 
    */
3319
 
    if ((ev=Log_event::read_log_event(cur_log,0,
3320
 
                                      rli->relay_log.description_event_for_exec)))
3321
 
 
3322
 
    {
3323
 
      assert(session==rli->sql_session);
3324
 
      /*
3325
 
        read it while we have a lock, to avoid a mutex lock in
3326
 
        inc_event_relay_log_pos()
3327
 
      */
3328
 
      rli->future_event_relay_log_pos= my_b_tell(cur_log);
3329
 
      if (hot_log)
3330
 
        pthread_mutex_unlock(log_lock);
3331
 
      return(ev);
3332
 
    }
3333
 
    assert(session==rli->sql_session);
3334
 
    if (opt_reckless_slave)                     // For mysql-test
3335
 
      cur_log->error = 0;
3336
 
    if (cur_log->error < 0)
3337
 
    {
3338
 
      errmsg = "slave SQL thread aborted because of I/O error";
3339
 
      if (hot_log)
3340
 
        pthread_mutex_unlock(log_lock);
3341
 
      goto err;
3342
 
    }
3343
 
    if (!cur_log->error) /* EOF */
3344
 
    {
3345
 
      /*
3346
 
        On a hot log, EOF means that there are no more updates to
3347
 
        process and we must block until I/O thread adds some and
3348
 
        signals us to continue
3349
 
      */
3350
 
      if (hot_log)
3351
 
      {
3352
 
        /*
3353
 
          We say in Seconds_Behind_Master that we have "caught up". Note that
3354
 
          for example if network link is broken but I/O slave thread hasn't
3355
 
          noticed it (slave_net_timeout not elapsed), then we'll say "caught
3356
 
          up" whereas we're not really caught up. Fixing that would require
3357
 
          internally cutting timeout in smaller pieces in network read, no
3358
 
          thanks. Another example: SQL has caught up on I/O, now I/O has read
3359
 
          a new event and is queuing it; the false "0" will exist until SQL
3360
 
          finishes executing the new event; it will be look abnormal only if
3361
 
          the events have old timestamps (then you get "many", 0, "many").
3362
 
 
3363
 
          Transient phases like this can be fixed with implemeting
3364
 
          Heartbeat event which provides the slave the status of the
3365
 
          master at time the master does not have any new update to send.
3366
 
          Seconds_Behind_Master would be zero only when master has no
3367
 
          more updates in binlog for slave. The heartbeat can be sent
3368
 
          in a (small) fraction of slave_net_timeout. Until it's done
3369
 
          rli->last_master_timestamp is temporarely (for time of
3370
 
          waiting for the following event) reset whenever EOF is
3371
 
          reached.
3372
 
        */
3373
 
        time_t save_timestamp= rli->last_master_timestamp;
3374
 
        rli->last_master_timestamp= 0;
3375
 
 
3376
 
        assert(rli->relay_log.get_open_count() ==
3377
 
                    rli->cur_log_old_open_count);
3378
 
 
3379
 
        if (rli->ign_master_log_name_end[0])
3380
 
        {
3381
 
          /* We generate and return a Rotate, to make our positions advance */
3382
 
          ev= new Rotate_log_event(rli->ign_master_log_name_end,
3383
 
                                   0, rli->ign_master_log_pos_end,
3384
 
                                   Rotate_log_event::DUP_NAME);
3385
 
          rli->ign_master_log_name_end[0]= 0;
3386
 
          pthread_mutex_unlock(log_lock);
3387
 
          if (unlikely(!ev))
3388
 
          {
3389
 
            errmsg= "Slave SQL thread failed to create a Rotate event "
3390
 
              "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
3391
 
            goto err;
3392
 
          }
3393
 
          ev->server_id= 0; // don't be ignored by slave SQL thread
3394
 
          return(ev);
3395
 
        }
3396
 
 
3397
 
        /*
3398
 
          We can, and should release data_lock while we are waiting for
3399
 
          update. If we do not, show slave status will block
3400
 
        */
3401
 
        pthread_mutex_unlock(&rli->data_lock);
3402
 
 
3403
 
        /*
3404
 
          Possible deadlock :
3405
 
          - the I/O thread has reached log_space_limit
3406
 
          - the SQL thread has read all relay logs, but cannot purge for some
3407
 
          reason:
3408
 
            * it has already purged all logs except the current one
3409
 
            * there are other logs than the current one but they're involved in
3410
 
            a transaction that finishes in the current one (or is not finished)
3411
 
          Solution :
3412
 
          Wake up the possibly waiting I/O thread, and set a boolean asking
3413
 
          the I/O thread to temporarily ignore the log_space_limit
3414
 
          constraint, because we do not want the I/O thread to block because of
3415
 
          space (it's ok if it blocks for any other reason (e.g. because the
3416
 
          master does not send anything). Then the I/O thread stops waiting
3417
 
          and reads more events.
3418
 
          The SQL thread decides when the I/O thread should take log_space_limit
3419
 
          into account again : ignore_log_space_limit is reset to 0
3420
 
          in purge_first_log (when the SQL thread purges the just-read relay
3421
 
          log), and also when the SQL thread starts. We should also reset
3422
 
          ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
3423
 
          fact, no need as RESET SLAVE requires that the slave
3424
 
          be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
3425
 
          it stops.
3426
 
        */
3427
 
        pthread_mutex_lock(&rli->log_space_lock);
3428
 
        // prevent the I/O thread from blocking next times
3429
 
        rli->ignore_log_space_limit= 1;
3430
 
        /*
3431
 
          If the I/O thread is blocked, unblock it.  Ok to broadcast
3432
 
          after unlock, because the mutex is only destroyed in
3433
 
          ~Relay_log_info(), i.e. when rli is destroyed, and rli will
3434
 
          not be destroyed before we exit the present function.
3435
 
        */
3436
 
        pthread_mutex_unlock(&rli->log_space_lock);
3437
 
        pthread_cond_broadcast(&rli->log_space_cond);
3438
 
        // Note that wait_for_update_relay_log unlocks lock_log !
3439
 
        rli->relay_log.wait_for_update_relay_log(rli->sql_session);
3440
 
        // re-acquire data lock since we released it earlier
3441
 
        pthread_mutex_lock(&rli->data_lock);
3442
 
        rli->last_master_timestamp= save_timestamp;
3443
 
        continue;
3444
 
      }
3445
 
      /*
3446
 
        If the log was not hot, we need to move to the next log in
3447
 
        sequence. The next log could be hot or cold, we deal with both
3448
 
        cases separately after doing some common initialization
3449
 
      */
3450
 
      end_io_cache(cur_log);
3451
 
      assert(rli->cur_log_fd >= 0);
3452
 
      my_close(rli->cur_log_fd, MYF(MY_WME));
3453
 
      rli->cur_log_fd = -1;
3454
 
 
3455
 
      if (relay_log_purge)
3456
 
      {
3457
 
        /*
3458
 
          purge_first_log will properly set up relay log coordinates in rli.
3459
 
          If the group's coordinates are equal to the event's coordinates
3460
 
          (i.e. the relay log was not rotated in the middle of a group),
3461
 
          we can purge this relay log too.
3462
 
          We do uint64_t and string comparisons, this may be slow but
3463
 
          - purging the last relay log is nice (it can save 1GB of disk), so we
3464
 
          like to detect the case where we can do it, and given this,
3465
 
          - I see no better detection method
3466
 
          - purge_first_log is not called that often
3467
 
        */
3468
 
        if (rli->relay_log.purge_first_log
3469
 
            (rli,
3470
 
             rli->group_relay_log_pos == rli->event_relay_log_pos
3471
 
             && !strcmp(rli->group_relay_log_name.c_str(), rli->event_relay_log_name.c_str())))
3472
 
        {
3473
 
          errmsg = "Error purging processed logs";
3474
 
          goto err;
3475
 
        }
3476
 
      }
3477
 
      else
3478
 
      {
3479
 
        /*
3480
 
          If hot_log is set, then we already have a lock on
3481
 
          LOCK_log.  If not, we have to get the lock.
3482
 
 
3483
 
          According to Sasha, the only time this code will ever be executed
3484
 
          is if we are recovering from a bug.
3485
 
        */
3486
 
        if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
3487
 
        {
3488
 
          errmsg = "error switching to the next log";
3489
 
          goto err;
3490
 
        }
3491
 
        rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3492
 
        rli->event_relay_log_name.assign(rli->linfo.log_file_name);
3493
 
        flush_relay_log_info(rli);
3494
 
      }
3495
 
 
3496
 
      /*
3497
 
        Now we want to open this next log. To know if it's a hot log (the one
3498
 
        being written by the I/O thread now) or a cold log, we can use
3499
 
        is_active(); if it is hot, we use the I/O cache; if it's cold we open
3500
 
        the file normally. But if is_active() reports that the log is hot, this
3501
 
        may change between the test and the consequence of the test. So we may
3502
 
        open the I/O cache whereas the log is now cold, which is nonsense.
3503
 
        To guard against this, we need to have LOCK_log.
3504
 
      */
3505
 
 
3506
 
      if (!hot_log) /* if hot_log, we already have this mutex */
3507
 
        pthread_mutex_lock(log_lock);
3508
 
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
3509
 
      {
3510
 
#ifdef EXTRA_DEBUG
3511
 
        if (global_system_variables.log_warnings)
3512
 
          sql_print_information(_("next log '%s' is currently active"),
3513
 
                                rli->linfo.log_file_name);
3514
 
#endif
3515
 
        rli->cur_log= cur_log= rli->relay_log.get_log_file();
3516
 
        rli->cur_log_old_open_count= rli->relay_log.get_open_count();
3517
 
        assert(rli->cur_log_fd == -1);
3518
 
 
3519
 
        /*
3520
 
          Read pointer has to be at the start since we are the only
3521
 
          reader.
3522
 
          We must keep the LOCK_log to read the 4 first bytes, as this is a hot
3523
 
          log (same as when we call read_log_event() above: for a hot log we
3524
 
          take the mutex).
3525
 
        */
3526
 
        if (check_binlog_magic(cur_log,&errmsg))
3527
 
        {
3528
 
          if (!hot_log) pthread_mutex_unlock(log_lock);
3529
 
          goto err;
3530
 
        }
3531
 
        if (!hot_log) pthread_mutex_unlock(log_lock);
3532
 
        continue;
3533
 
      }
3534
 
      if (!hot_log) pthread_mutex_unlock(log_lock);
3535
 
      /*
3536
 
        if we get here, the log was not hot, so we will have to open it
3537
 
        ourselves. We are sure that the log is still not hot now (a log can get
3538
 
        from hot to cold, but not from cold to hot). No need for LOCK_log.
3539
 
      */
3540
 
#ifdef EXTRA_DEBUG
3541
 
      if (global_system_variables.log_warnings)
3542
 
        sql_print_information(_("next log '%s' is not active"),
3543
 
                              rli->linfo.log_file_name);
3544
 
#endif
3545
 
      // open_binlog() will check the magic header
3546
 
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
3547
 
                                       &errmsg)) <0)
3548
 
        goto err;
3549
 
    }
3550
 
    else
3551
 
    {
3552
 
      /*
3553
 
        Read failed with a non-EOF error.
3554
 
        TODO: come up with something better to handle this error
3555
 
      */
3556
 
      if (hot_log)
3557
 
        pthread_mutex_unlock(log_lock);
3558
 
      sql_print_error(_("Slave SQL thread: I/O error reading "
3559
 
                        "event(errno: %d  cur_log->error: %d)"),
3560
 
                      my_errno,cur_log->error);
3561
 
      // set read position to the beginning of the event
3562
 
      my_b_seek(cur_log,rli->event_relay_log_pos);
3563
 
      /* otherwise, we have had a partial read */
3564
 
      errmsg = _("Aborting slave SQL thread because of partial event read");
3565
 
      break;                                    // To end of function
3566
 
    }
3567
 
  }
3568
 
  if (!errmsg && global_system_variables.log_warnings)
3569
 
  {
3570
 
    sql_print_information(_("Error reading relay log event: %s"),
3571
 
                          _("slave SQL thread was killed"));
3572
 
    return(0);
3573
 
  }
3574
 
 
3575
 
err:
3576
 
  if (errmsg)
3577
 
    sql_print_error(_("Error reading relay log event: %s"), errmsg);
3578
 
  return(0);
3579
 
}
3580
 
 
3581
 
/*
3582
 
  Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
3583
 
  because of size is simpler because when we do it we already have all relevant
3584
 
  locks; here we don't, so this function is mainly taking locks).
3585
 
  Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
3586
 
  is void).
3587
 
*/
3588
 
 
3589
 
void rotate_relay_log(Master_info* mi)
3590
 
{
3591
 
  Relay_log_info* rli= &mi->rli;
3592
 
 
3593
 
  /* We don't lock rli->run_lock. This would lead to deadlocks. */
3594
 
  pthread_mutex_lock(&mi->run_lock);
3595
 
 
3596
 
  /*
3597
 
     We need to test inited because otherwise, new_file() will attempt to lock
3598
 
     LOCK_log, which may not be inited (if we're not a slave).
3599
 
  */
3600
 
  if (!rli->inited)
3601
 
  {
3602
 
    goto end;
3603
 
  }
3604
 
 
3605
 
  /* If the relay log is closed, new_file() will do nothing. */
3606
 
  rli->relay_log.new_file();
3607
 
 
3608
 
  /*
3609
 
    We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
3610
 
    be counted, so imagine a succession of FLUSH LOGS  and assume the slave
3611
 
    threads are started:
3612
 
    relay_log_space decreases by the size of the deleted relay log, but does
3613
 
    not increase, so flush-after-flush we may become negative, which is wrong.
3614
 
    Even if this will be corrected as soon as a query is replicated on the
3615
 
    slave (because the I/O thread will then call harvest_bytes_written() which
3616
 
    will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
3617
 
    output in SHOW SLAVE STATUS meanwhile. So we harvest now.
3618
 
    If the log is closed, then this will just harvest the last writes, probably
3619
 
    0 as they probably have been harvested.
3620
 
  */
3621
 
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3622
 
end:
3623
 
  pthread_mutex_unlock(&mi->run_lock);
3624
 
  return;
3625
 
}
3626
 
 
3627
 
 
3628
 
/**
3629
 
   Detects, based on master's version (as found in the relay log), if master
3630
 
   has a certain bug.
3631
 
   @param rli Relay_log_info which tells the master's version
3632
 
   @param bug_id Number of the bug as found in bugs.mysql.com
3633
 
   @param report bool report error message, default TRUE
3634
 
   @return true if master has the bug, FALSE if it does not.
3635
 
*/
3636
 
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
3637
 
{
3638
 
  struct st_version_range_for_one_bug {
3639
 
    uint32_t        bug_id;
3640
 
    const unsigned char introduced_in[3]; // first version with bug
3641
 
    const unsigned char fixed_in[3];      // first version with fix
3642
 
  };
3643
 
  static struct st_version_range_for_one_bug versions_for_all_bugs[]=
3644
 
  {
3645
 
    {24432, { 5, 0, 24 }, { 5, 0, 38 } },
3646
 
    {24432, { 5, 1, 12 }, { 5, 1, 17 } },
3647
 
    {33029, { 5, 0,  0 }, { 5, 0, 58 } },
3648
 
    {33029, { 5, 1,  0 }, { 5, 1, 12 } },
3649
 
  };
3650
 
  const unsigned char *master_ver=
3651
 
    rli->relay_log.description_event_for_exec->server_version_split;
3652
 
 
3653
 
  assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3654
 
 
3655
 
  for (uint32_t i= 0;
3656
 
       i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3657
 
  {
3658
 
    const unsigned char *introduced_in= versions_for_all_bugs[i].introduced_in,
3659
 
      *fixed_in= versions_for_all_bugs[i].fixed_in;
3660
 
    if ((versions_for_all_bugs[i].bug_id == bug_id) &&
3661
 
        (memcmp(introduced_in, master_ver, 3) <= 0) &&
3662
 
        (memcmp(fixed_in,      master_ver, 3) >  0))
3663
 
    {
3664
 
      if (!report)
3665
 
        return true;
3666
 
 
3667
 
      // a short message for SHOW SLAVE STATUS (message length constraints)
3668
 
      my_printf_error(ER_UNKNOWN_ERROR,
3669
 
                      _("master may suffer from"
3670
 
                        " http://bugs.mysql.com/bug.php?id=%u"
3671
 
                        " so slave stops; check error log on slave"
3672
 
                        " for more info"), MYF(0), bug_id);
3673
 
      // a verbose message for the error log
3674
 
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
3675
 
                  _("According to the master's version ('%s'),"
3676
 
                    " it is probable that master suffers from this bug:"
3677
 
                    " http://bugs.mysql.com/bug.php?id=%u"
3678
 
                    " and thus replicating the current binary log event"
3679
 
                    " may make the slave's data become different from the"
3680
 
                    " master's data."
3681
 
                    " To take no risk, slave refuses to replicate"
3682
 
                    " this event and stops."
3683
 
                    " We recommend that all updates be stopped on the"
3684
 
                    " master and slave, that the data of both be"
3685
 
                    " manually synchronized,"
3686
 
                    " that master's binary logs be deleted,"
3687
 
                    " that master be upgraded to a version at least"
3688
 
                    " equal to '%d.%d.%d'. Then replication can be"
3689
 
                    " restarted."),
3690
 
                  rli->relay_log.description_event_for_exec->server_version,
3691
 
                  bug_id,
3692
 
                  fixed_in[0], fixed_in[1], fixed_in[2]);
3693
 
      return true;
3694
 
    }
3695
 
  }
3696
 
  return false;
3697
 
}
3698
 
 
3699
 
/**
3700
 
   BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
3701
 
   exclusive, if one statement in a SP generated AUTO_INCREMENT value
3702
 
   by the top statement, all statements after it would be considered
3703
 
   generated AUTO_INCREMENT value by the top statement, and a
3704
 
   erroneous INSERT_ID value might be associated with these statement,
3705
 
   which could cause duplicate entry error and stop the slave.
3706
 
 
3707
 
   Detect buggy master to work around.
3708
 
 */
3709
 
bool rpl_master_erroneous_autoinc(Session *session)
3710
 
{
3711
 
  if (active_mi && active_mi->rli.sql_session == session)
3712
 
  {
3713
 
    Relay_log_info *rli= &active_mi->rli;
3714
 
    return rpl_master_has_bug(rli, 33029, false);
3715
 
  }
3716
 
  return false;
3717
 
}
3718
 
 
3719
 
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
3720
 
template class I_List_iterator<i_string>;
3721
 
template class I_List_iterator<i_string_pair>;
3722
 
#endif
3723
 
 
3724
 
/**
3725
 
  @} (end of group Replication)
3726
 
*/