~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/slave.cc

  • Committer: Monty Taylor
  • Date: 2008-10-30 18:44:27 UTC
  • mto: (520.4.35 devel)
  • mto: This revision was merged to the branch mainline in revision 572.
  • Revision ID: monty@inaugust.com-20081030184427-7cr1v6r5cqnuqm3v
Removed global sql_array.h.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2003 DRIZZLE AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/**
 
18
  @addtogroup Replication
 
19
  @{
 
20
 
 
21
  @file
 
22
 
 
23
  @brief Code to run the io thread and the sql thread on the
 
24
  replication slave.
 
25
*/
 
26
#include <drizzled/server_includes.h>
 
27
 
 
28
#include <storage/myisam/myisam.h>
 
29
#include <drizzled/rpl_mi.h>
 
30
#include <drizzled/rpl_rli.h>
 
31
#include <drizzled/sql_repl.h>
 
32
#include <drizzled/rpl_filter.h>
 
33
#include <mysys/thr_alarm.h>
 
34
#include <libdrizzle/errmsg.h>
 
35
#include <mysys/mysys_err.h>
 
36
#include <drizzled/error.h>
 
37
#include <drizzled/sql_parse.h>
 
38
#include <drizzled/gettext.h>
 
39
#include <signal.h>
 
40
 
 
41
#if TIME_WITH_SYS_TIME
 
42
# include <sys/time.h>
 
43
# include <time.h>
 
44
#else
 
45
# if HAVE_SYS_TIME_H
 
46
#  include <sys/time.h>
 
47
# else
 
48
#  include <time.h>
 
49
# endif
 
50
#endif
 
51
 
 
52
#include <drizzled/tztime.h>
 
53
 
 
54
#include "rpl_tblmap.h"
 
55
 
 
56
#define MAX_SLAVE_RETRY_PAUSE 5
 
57
bool use_slave_mask = 0;
 
58
MY_BITMAP slave_error_mask;
 
59
 
 
60
typedef bool (*CHECK_KILLED_FUNC)(Session*,void*);
 
61
 
 
62
char* slave_load_tmpdir = 0;
 
63
Master_info *active_mi= 0;
 
64
bool replicate_same_server_id;
 
65
uint64_t relay_log_space_limit = 0;
 
66
 
 
67
/*
 
68
  When slave thread exits, we need to remember the temporary tables so we
 
69
  can re-use them on slave start.
 
70
 
 
71
  TODO: move the vars below under Master_info
 
72
*/
 
73
 
 
74
int32_t disconnect_slave_event_count = 0, abort_slave_event_count = 0;
 
75
int32_t events_till_abort = -1;
 
76
 
 
77
enum enum_slave_reconnect_actions
 
78
{
 
79
  SLAVE_RECON_ACT_REG= 0,
 
80
  SLAVE_RECON_ACT_DUMP= 1,
 
81
  SLAVE_RECON_ACT_EVENT= 2,
 
82
  SLAVE_RECON_ACT_MAX
 
83
};
 
84
 
 
85
enum enum_slave_reconnect_messages
 
86
{
 
87
  SLAVE_RECON_MSG_WAIT= 0,
 
88
  SLAVE_RECON_MSG_KILLED_WAITING= 1,
 
89
  SLAVE_RECON_MSG_AFTER= 2,
 
90
  SLAVE_RECON_MSG_FAILED= 3,
 
91
  SLAVE_RECON_MSG_COMMAND= 4,
 
92
  SLAVE_RECON_MSG_KILLED_AFTER= 5,
 
93
  SLAVE_RECON_MSG_MAX
 
94
};
 
95
 
 
96
static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
 
97
{
 
98
  {
 
99
    N_("Waiting to reconnect after a failed registration on master"),
 
100
    N_("Slave I/O thread killed while waitnig to reconnect after a "
 
101
                 "failed registration on master"),
 
102
    N_("Reconnecting after a failed registration on master"),
 
103
    N_("failed registering on master, reconnecting to try again, "
 
104
                 "log '%s' at postion %s"),
 
105
    "COM_REGISTER_SLAVE",
 
106
    N_("Slave I/O thread killed during or after reconnect")
 
107
  },
 
108
  {
 
109
    N_("Waiting to reconnect after a failed binlog dump request"),
 
110
    N_("Slave I/O thread killed while retrying master dump"),
 
111
    N_("Reconnecting after a failed binlog dump request"),
 
112
    N_("failed dump request, reconnecting to try again, "
 
113
                 "log '%s' at postion %s"),
 
114
    "COM_BINLOG_DUMP",
 
115
    N_("Slave I/O thread killed during or after reconnect")
 
116
  },
 
117
  {
 
118
    N_("Waiting to reconnect after a failed master event read"),
 
119
    N_("Slave I/O thread killed while waiting to reconnect "
 
120
                 "after a failed read"),
 
121
    N_("Reconnecting after a failed master event read"),
 
122
    N_("Slave I/O thread: Failed reading log event, "
 
123
                 "reconnecting to retry, log '%s' at postion %s"),
 
124
    "",
 
125
    N_("Slave I/O thread killed during or after a "
 
126
                 "reconnect done to recover from failed read")
 
127
  }
 
128
};
 
129
 
 
130
 
 
131
typedef enum { SLAVE_Session_IO, SLAVE_Session_SQL} SLAVE_Session_TYPE;
 
132
 
 
133
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
 
134
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
 
135
static bool wait_for_relay_log_space(Relay_log_info* rli);
 
136
static inline bool io_slave_killed(Session* session,Master_info* mi);
 
137
static inline bool sql_slave_killed(Session* session,Relay_log_info* rli);
 
138
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type);
 
139
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi);
 
140
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
 
141
                          bool suppress_warnings);
 
142
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
 
143
                             bool reconnect, bool suppress_warnings);
 
144
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
145
                      void* thread_killed_arg);
 
146
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
 
147
static Log_event* next_event(Relay_log_info* rli);
 
148
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
 
149
static int32_t terminate_slave_thread(Session *session,
 
150
                                  pthread_mutex_t* term_lock,
 
151
                                  pthread_cond_t* term_cond,
 
152
                                  volatile uint32_t *slave_running,
 
153
                                  bool skip_lock);
 
154
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info);
 
155
 
 
156
/*
 
157
  Find out which replications threads are running
 
158
 
 
159
  SYNOPSIS
 
160
    init_thread_mask()
 
161
    mask                Return value here
 
162
    mi                  master_info for slave
 
163
    inverse             If set, returns which threads are not running
 
164
 
 
165
  IMPLEMENTATION
 
166
    Get a bit mask for which threads are running so that we can later restart
 
167
    these threads.
 
168
 
 
169
  RETURN
 
170
    mask        If inverse == 0, running threads
 
171
                If inverse == 1, stopped threads
 
172
*/
 
173
 
 
174
void init_thread_mask(int32_t* mask,Master_info* mi,bool inverse)
 
175
{
 
176
  bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
 
177
  register int32_t tmp_mask=0;
 
178
 
 
179
  if (set_io)
 
180
    tmp_mask |= SLAVE_IO;
 
181
  if (set_sql)
 
182
    tmp_mask |= SLAVE_SQL;
 
183
  if (inverse)
 
184
    tmp_mask^= (SLAVE_IO | SLAVE_SQL);
 
185
  *mask = tmp_mask;
 
186
  return;
 
187
}
 
188
 
 
189
 
 
190
/*
 
191
  lock_slave_threads()
 
192
*/
 
193
 
 
194
void lock_slave_threads(Master_info* mi)
 
195
{
 
196
  //TODO: see if we can do this without dual mutex
 
197
  pthread_mutex_lock(&mi->run_lock);
 
198
  pthread_mutex_lock(&mi->rli.run_lock);
 
199
  return;
 
200
}
 
201
 
 
202
 
 
203
/*
 
204
  unlock_slave_threads()
 
205
*/
 
206
 
 
207
void unlock_slave_threads(Master_info* mi)
 
208
{
 
209
  //TODO: see if we can do this without dual mutex
 
210
  pthread_mutex_unlock(&mi->rli.run_lock);
 
211
  pthread_mutex_unlock(&mi->run_lock);
 
212
  return;
 
213
}
 
214
 
 
215
 
 
216
/* Initialize slave structures */
 
217
 
 
218
int32_t init_slave()
 
219
{
 
220
  /*
 
221
    This is called when mysqld starts. Before client connections are
 
222
    accepted. However bootstrap may conflict with us if it does START SLAVE.
 
223
    So it's safer to take the lock.
 
224
  */
 
225
  pthread_mutex_lock(&LOCK_active_mi);
 
226
  /*
 
227
    TODO: re-write this to interate through the list of files
 
228
    for multi-master
 
229
  */
 
230
  active_mi= new Master_info;
 
231
 
 
232
  /*
 
233
    If master_host is not specified, try to read it from the master_info file.
 
234
    If master_host is specified, create the master_info file if it doesn't
 
235
    exists.
 
236
  */
 
237
  if (!active_mi)
 
238
  {
 
239
    sql_print_error(_("Failed to allocate memory for the master info structure"));
 
240
    goto err;
 
241
  }
 
242
 
 
243
  if (active_mi->init_master_info(master_info_file, relay_log_info_file, (SLAVE_IO | SLAVE_SQL)))
 
244
  {
 
245
    sql_print_error(_("Failed to initialize the master info structure"));
 
246
    goto err;
 
247
  }
 
248
 
 
249
  /* If server id is not set, start_slave_thread() will say it */
 
250
 
 
251
  if (active_mi->host[0] && !opt_skip_slave_start)
 
252
  {
 
253
    if (start_slave_threads(1 /* need mutex */,
 
254
                            0 /* no wait for start*/,
 
255
                            active_mi,
 
256
                            master_info_file,
 
257
                            relay_log_info_file,
 
258
                            SLAVE_IO | SLAVE_SQL))
 
259
    {
 
260
      sql_print_error(_("Failed to create slave threads"));
 
261
      goto err;
 
262
    }
 
263
  }
 
264
  pthread_mutex_unlock(&LOCK_active_mi);
 
265
  return(0);
 
266
 
 
267
err:
 
268
  pthread_mutex_unlock(&LOCK_active_mi);
 
269
  return(1);
 
270
}
 
271
 
 
272
 
 
273
/*
 
274
  Init function to set up array for errors that should be skipped for slave
 
275
 
 
276
  SYNOPSIS
 
277
    init_slave_skip_errors()
 
278
    arg         List of errors numbers to skip, separated with ','
 
279
 
 
280
  NOTES
 
281
    Called from get_options() in mysqld.cc on start-up
 
282
*/
 
283
 
 
284
void init_slave_skip_errors(const char* arg)
 
285
{
 
286
  const char *p;
 
287
 
 
288
  if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
 
289
  {
 
290
    fprintf(stderr, "Badly out of memory, please check your system status\n");
 
291
    exit(1);
 
292
  }
 
293
  use_slave_mask = 1;
 
294
  for (;my_isspace(system_charset_info,*arg);++arg)
 
295
    /* empty */;
 
296
  if (!my_strnncoll(system_charset_info,(unsigned char*)arg,4,(const unsigned char*)"all",4))
 
297
  {
 
298
    bitmap_set_all(&slave_error_mask);
 
299
    return;
 
300
  }
 
301
  for (p= arg ; *p; )
 
302
  {
 
303
    long err_code;
 
304
    if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
 
305
      break;
 
306
    if (err_code < MAX_SLAVE_ERROR)
 
307
       bitmap_set_bit(&slave_error_mask,(uint32_t)err_code);
 
308
    while (!my_isdigit(system_charset_info,*p) && *p)
 
309
      p++;
 
310
  }
 
311
  return;
 
312
}
 
313
 
 
314
 
 
315
int32_t terminate_slave_threads(Master_info* mi,int32_t thread_mask,bool skip_lock)
 
316
{
 
317
  if (!mi->inited)
 
318
    return(0); /* successfully do nothing */
 
319
  int32_t error,force_all = (thread_mask & SLAVE_FORCE_ALL);
 
320
  pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
 
321
 
 
322
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
 
323
  {
 
324
    mi->abort_slave=1;
 
325
    if ((error=terminate_slave_thread(mi->io_session,io_lock,
 
326
                                      &mi->stop_cond,
 
327
                                      &mi->slave_running,
 
328
                                      skip_lock)) &&
 
329
        !force_all)
 
330
      return(error);
 
331
  }
 
332
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
 
333
  {
 
334
    mi->rli.abort_slave=1;
 
335
    if ((error=terminate_slave_thread(mi->rli.sql_session,sql_lock,
 
336
                                      &mi->rli.stop_cond,
 
337
                                      &mi->rli.slave_running,
 
338
                                      skip_lock)) &&
 
339
        !force_all)
 
340
      return(error);
 
341
  }
 
342
  return(0);
 
343
}
 
344
 
 
345
 
 
346
/**
 
347
   Wait for a slave thread to terminate.
 
348
 
 
349
   This function is called after requesting the thread to terminate
 
350
   (by setting @c abort_slave member of @c Relay_log_info or @c
 
351
   Master_info structure to 1). Termination of the thread is
 
352
   controlled with the the predicate <code>*slave_running</code>.
 
353
 
 
354
   Function will acquire @c term_lock before waiting on the condition
 
355
   unless @c skip_lock is true in which case the mutex should be owned
 
356
   by the caller of this function and will remain acquired after
 
357
   return from the function.
 
358
 
 
359
   @param term_lock
 
360
          Associated lock to use when waiting for @c term_cond
 
361
 
 
362
   @param term_cond
 
363
          Condition that is signalled when the thread has terminated
 
364
 
 
365
   @param slave_running
 
366
          Pointer to predicate to check for slave thread termination
 
367
 
 
368
   @param skip_lock
 
369
          If @c true the lock will not be acquired before waiting on
 
370
          the condition. In this case, it is assumed that the calling
 
371
          function acquires the lock before calling this function.
 
372
 
 
373
   @retval 0 All OK
 
374
 */
 
375
static int32_t
 
376
terminate_slave_thread(Session *session,
 
377
                       pthread_mutex_t* term_lock,
 
378
                       pthread_cond_t* term_cond,
 
379
                       volatile uint32_t *slave_running,
 
380
                       bool skip_lock)
 
381
{
 
382
  int32_t error;
 
383
 
 
384
  if (!skip_lock)
 
385
    pthread_mutex_lock(term_lock);
 
386
 
 
387
  safe_mutex_assert_owner(term_lock);
 
388
 
 
389
  if (!*slave_running)
 
390
  {
 
391
    if (!skip_lock)
 
392
      pthread_mutex_unlock(term_lock);
 
393
    return(ER_SLAVE_NOT_RUNNING);
 
394
  }
 
395
  assert(session != 0);
 
396
  Session_CHECK_SENTRY(session);
 
397
 
 
398
  /*
 
399
    Is is critical to test if the slave is running. Otherwise, we might
 
400
    be referening freed memory trying to kick it
 
401
  */
 
402
 
 
403
  while (*slave_running)                        // Should always be true
 
404
  {
 
405
    pthread_mutex_lock(&session->LOCK_delete);
 
406
#ifndef DONT_USE_THR_ALARM
 
407
    /*
 
408
      Error codes from pthread_kill are:
 
409
      EINVAL: invalid signal number (can't happen)
 
410
      ESRCH: thread already killed (can happen, should be ignored)
 
411
    */
 
412
    int32_t err= pthread_kill(session->real_id, thr_client_alarm);
 
413
    assert(err != EINVAL);
 
414
#endif
 
415
    session->awake(Session::NOT_KILLED);
 
416
    pthread_mutex_unlock(&session->LOCK_delete);
 
417
 
 
418
    /*
 
419
      There is a small chance that slave thread might miss the first
 
420
      alarm. To protect againts it, resend the signal until it reacts
 
421
    */
 
422
    struct timespec abstime;
 
423
    set_timespec(abstime,2);
 
424
    error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
 
425
    assert(error == ETIMEDOUT || error == 0);
 
426
  }
 
427
 
 
428
  assert(*slave_running == 0);
 
429
 
 
430
  if (!skip_lock)
 
431
    pthread_mutex_unlock(term_lock);
 
432
  return(0);
 
433
}
 
434
 
 
435
 
 
436
int32_t start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
 
437
                           pthread_mutex_t *cond_lock,
 
438
                           pthread_cond_t *start_cond,
 
439
                           volatile uint32_t *slave_running,
 
440
                           volatile uint32_t *slave_run_id,
 
441
                           Master_info* mi,
 
442
                           bool high_priority)
 
443
{
 
444
  pthread_t th;
 
445
  uint32_t start_id;
 
446
 
 
447
  assert(mi->inited);
 
448
 
 
449
  if (start_lock)
 
450
    pthread_mutex_lock(start_lock);
 
451
  if (!server_id)
 
452
  {
 
453
    if (start_cond)
 
454
      pthread_cond_broadcast(start_cond);
 
455
    if (start_lock)
 
456
      pthread_mutex_unlock(start_lock);
 
457
    sql_print_error(_("Server id not set, will not start slave"));
 
458
    return(ER_BAD_SLAVE);
 
459
  }
 
460
 
 
461
  if (*slave_running)
 
462
  {
 
463
    if (start_cond)
 
464
      pthread_cond_broadcast(start_cond);
 
465
    if (start_lock)
 
466
      pthread_mutex_unlock(start_lock);
 
467
    return(ER_SLAVE_MUST_STOP);
 
468
  }
 
469
  start_id= *slave_run_id;
 
470
  if (high_priority)
 
471
  {
 
472
    struct sched_param tmp_sched_param;
 
473
 
 
474
    memset(&tmp_sched_param, 0, sizeof(tmp_sched_param));
 
475
    tmp_sched_param.sched_priority= CONNECT_PRIOR;
 
476
    (void)pthread_attr_setschedparam(&connection_attrib, &tmp_sched_param);
 
477
  }
 
478
  if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
 
479
  {
 
480
    if (start_lock)
 
481
      pthread_mutex_unlock(start_lock);
 
482
    return(ER_SLAVE_THREAD);
 
483
  }
 
484
  if (start_cond && cond_lock) // caller has cond_lock
 
485
  {
 
486
    Session* session = current_session;
 
487
    while (start_id == *slave_run_id)
 
488
    {
 
489
      const char* old_msg = session->enter_cond(start_cond,cond_lock,
 
490
                                            "Waiting for slave thread to start");
 
491
      pthread_cond_wait(start_cond,cond_lock);
 
492
      session->exit_cond(old_msg);
 
493
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
 
494
      if (session->killed)
 
495
        return(session->killed_errno());
 
496
    }
 
497
  }
 
498
  if (start_lock)
 
499
    pthread_mutex_unlock(start_lock);
 
500
  return(0);
 
501
}
 
502
 
 
503
 
 
504
/*
 
505
  start_slave_threads()
 
506
 
 
507
  NOTES
 
508
    SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
 
509
    sense to do that for starting a slave--we always care if it actually
 
510
    started the threads that were not previously running
 
511
*/
 
512
 
 
513
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
 
514
                        Master_info* mi,
 
515
                        const char* master_info_fname __attribute__((unused)),
 
516
                        const char* slave_info_fname __attribute__((unused)),
 
517
                        int32_t thread_mask)
 
518
{
 
519
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
 
520
  pthread_cond_t* cond_io=0,*cond_sql=0;
 
521
  int32_t error=0;
 
522
 
 
523
  if (need_slave_mutex)
 
524
  {
 
525
    lock_io = &mi->run_lock;
 
526
    lock_sql = &mi->rli.run_lock;
 
527
  }
 
528
  if (wait_for_start)
 
529
  {
 
530
    cond_io = &mi->start_cond;
 
531
    cond_sql = &mi->rli.start_cond;
 
532
    lock_cond_io = &mi->run_lock;
 
533
    lock_cond_sql = &mi->rli.run_lock;
 
534
  }
 
535
 
 
536
  if (thread_mask & SLAVE_IO)
 
537
    error= start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
 
538
                              cond_io,
 
539
                              &mi->slave_running, &mi->slave_run_id,
 
540
                              mi, 1); //high priority, to read the most possible
 
541
  if (!error && (thread_mask & SLAVE_SQL))
 
542
  {
 
543
    error= start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
 
544
                              cond_sql,
 
545
                              &mi->rli.slave_running, &mi->rli.slave_run_id,
 
546
                              mi, 0);
 
547
    if (error)
 
548
      terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
 
549
  }
 
550
  return(error);
 
551
}
 
552
 
 
553
 
 
554
#ifdef NOT_USED_YET
 
555
static int32_t end_slave_on_walk(Master_info* mi, unsigned char* /*unused*/)
 
556
{
 
557
  end_master_info(mi);
 
558
  return(0);
 
559
}
 
560
#endif
 
561
 
 
562
 
 
563
/*
 
564
  Free all resources used by slave
 
565
 
 
566
  SYNOPSIS
 
567
    end_slave()
 
568
*/
 
569
 
 
570
void end_slave()
 
571
{
 
572
  /*
 
573
    This is called when the server terminates, in close_connections().
 
574
    It terminates slave threads. However, some CHANGE MASTER etc may still be
 
575
    running presently. If a START SLAVE was in progress, the mutex lock below
 
576
    will make us wait until slave threads have started, and START SLAVE
 
577
    returns, then we terminate them here.
 
578
  */
 
579
  pthread_mutex_lock(&LOCK_active_mi);
 
580
  if (active_mi)
 
581
  {
 
582
    /*
 
583
      TODO: replace the line below with
 
584
      list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
 
585
      once multi-master code is ready.
 
586
    */
 
587
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
 
588
    active_mi->end_master_info();
 
589
    delete active_mi;
 
590
    active_mi= 0;
 
591
  }
 
592
  pthread_mutex_unlock(&LOCK_active_mi);
 
593
  return;
 
594
}
 
595
 
 
596
 
 
597
static bool io_slave_killed(Session* session, Master_info* mi)
 
598
{
 
599
  assert(mi->io_session == session);
 
600
  assert(mi->slave_running); // tracking buffer overrun
 
601
  return(mi->abort_slave || abort_loop || session->killed);
 
602
}
 
603
 
 
604
 
 
605
static bool sql_slave_killed(Session* session, Relay_log_info* rli)
 
606
{
 
607
  assert(rli->sql_session == session);
 
608
  assert(rli->slave_running == 1);// tracking buffer overrun
 
609
  if (abort_loop || session->killed || rli->abort_slave)
 
610
  {
 
611
    /*
 
612
      If we are in an unsafe situation (stopping could corrupt replication),
 
613
      we give one minute to the slave SQL thread of grace before really
 
614
      terminating, in the hope that it will be able to read more events and
 
615
      the unsafe situation will soon be left. Note that this one minute starts
 
616
      from the last time anything happened in the slave SQL thread. So it's
 
617
      really one minute of idleness, we don't timeout if the slave SQL thread
 
618
      is actively working.
 
619
    */
 
620
    if (rli->last_event_start_time == 0)
 
621
      return(1);
 
622
    if (difftime(time(0), rli->last_event_start_time) > 60)
 
623
    {
 
624
      rli->report(ERROR_LEVEL, 0,
 
625
                  _("SQL thread had to stop in an unsafe situation, in "
 
626
                  "the middle of applying updates to a "
 
627
                  "non-transactional table without any primary key. "
 
628
                  "There is a risk of duplicate updates when the slave "
 
629
                  "SQL thread is restarted. Please check your tables' "
 
630
                  "contents after restart."));
 
631
      return(1);
 
632
    }
 
633
  }
 
634
  return(0);
 
635
}
 
636
 
 
637
 
 
638
/*
 
639
  skip_load_data_infile()
 
640
 
 
641
  NOTES
 
642
    This is used to tell a 3.23 master to break send_file()
 
643
*/
 
644
 
 
645
void skip_load_data_infile(NET *net)
 
646
{
 
647
  (void)net_request_file(net, "/dev/null");
 
648
  (void)my_net_read(net);                               // discard response
 
649
  (void)net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0); // ok
 
650
  return;
 
651
}
 
652
 
 
653
 
 
654
bool net_request_file(NET* net, const char* fname)
 
655
{
 
656
  return(net_write_command(net, 251, (unsigned char*) fname, strlen(fname),
 
657
                                (unsigned char*) "", 0));
 
658
}
 
659
 
 
660
/*
 
661
  From other comments and tests in code, it looks like
 
662
  sometimes Query_log_event and Load_log_event can have db == 0
 
663
  (see rewrite_db() above for example)
 
664
  (cases where this happens are unclear; it may be when the master is 3.23).
 
665
*/
 
666
 
 
667
const char *print_slave_db_safe(const char* db)
 
668
{
 
669
  return((db ? db : ""));
 
670
}
 
671
 
 
672
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
 
673
                                 const char *default_val)
 
674
{
 
675
  uint32_t length;
 
676
 
 
677
  if ((length=my_b_gets(f,var, max_size)))
 
678
  {
 
679
    char* last_p = var + length -1;
 
680
    if (*last_p == '\n')
 
681
      *last_p = 0; // if we stopped on newline, kill it
 
682
    else
 
683
    {
 
684
      /*
 
685
        If we truncated a line or stopped on last char, remove all chars
 
686
        up to and including newline.
 
687
      */
 
688
      int32_t c;
 
689
      while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) {};
 
690
    }
 
691
    return(0);
 
692
  }
 
693
  else if (default_val)
 
694
  {
 
695
    strmake(var,  default_val, max_size-1);
 
696
    return(0);
 
697
  }
 
698
  return(1);
 
699
}
 
700
 
 
701
 
 
702
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val)
 
703
{
 
704
  char buf[32];
 
705
 
 
706
 
 
707
  if (my_b_gets(f, buf, sizeof(buf)))
 
708
  {
 
709
    *var = atoi(buf);
 
710
    return(0);
 
711
  }
 
712
  else if (default_val)
 
713
  {
 
714
    *var = default_val;
 
715
    return(0);
 
716
  }
 
717
  return(1);
 
718
}
 
719
 
 
720
int32_t init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
 
721
{
 
722
  char buf[16];
 
723
 
 
724
 
 
725
  if (my_b_gets(f, buf, sizeof(buf)))
 
726
  {
 
727
    if (sscanf(buf, "%f", var) != 1)
 
728
      return(1);
 
729
    else
 
730
      return(0);
 
731
  }
 
732
  else if (default_val != 0.0)
 
733
  {
 
734
    *var = default_val;
 
735
    return(0);
 
736
  }
 
737
  return(1);
 
738
}
 
739
 
 
740
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info)
 
741
{
 
742
  if (io_slave_killed(session, mi))
 
743
  {
 
744
    if (info && global_system_variables.log_warnings)
 
745
      sql_print_information("%s",info);
 
746
    return true;
 
747
  }
 
748
  return false;
 
749
}
 
750
 
 
751
 
 
752
/*
 
753
  Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
 
754
  relying on the binlog's version. This is not perfect: imagine an upgrade
 
755
  of the master without waiting that all slaves are in sync with the master;
 
756
  then a slave could be fooled about the binlog's format. This is what happens
 
757
  when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
 
758
  slaves are fooled. So we do this only to distinguish between 3.23 and more
 
759
  recent masters (it's too late to change things for 3.23).
 
760
 
 
761
  RETURNS
 
762
  0       ok
 
763
  1       error
 
764
*/
 
765
 
 
766
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi)
 
767
{
 
768
  char error_buf[512];
 
769
  String err_msg(error_buf, sizeof(error_buf), &my_charset_bin);
 
770
  char err_buff[MAX_SLAVE_ERRMSG];
 
771
  const char* errmsg= 0;
 
772
  int32_t err_code= 0;
 
773
  DRIZZLE_RES *master_res= 0;
 
774
  DRIZZLE_ROW master_row;
 
775
 
 
776
  err_msg.length(0);
 
777
  /*
 
778
    Free old description_event_for_queue (that is needed if we are in
 
779
    a reconnection).
 
780
  */
 
781
  delete mi->rli.relay_log.description_event_for_queue;
 
782
  mi->rli.relay_log.description_event_for_queue= 0;
 
783
 
 
784
  if (!my_isdigit(&my_charset_bin,*drizzle->server_version))
 
785
  {
 
786
    errmsg = _("Master reported unrecognized DRIZZLE version");
 
787
    err_code= ER_SLAVE_FATAL_ERROR;
 
788
    sprintf(err_buff, ER(err_code), errmsg);
 
789
    err_msg.append(err_buff);
 
790
  }
 
791
  else
 
792
  {
 
793
    /*
 
794
      Note the following switch will bug when we have DRIZZLE branch 30 ;)
 
795
    */
 
796
    switch (*drizzle->server_version)
 
797
    {
 
798
    case '0':
 
799
    case '1':
 
800
    case '2':
 
801
      errmsg = _("Master reported unrecognized DRIZZLE version");
 
802
      err_code= ER_SLAVE_FATAL_ERROR;
 
803
      sprintf(err_buff, ER(err_code), errmsg);
 
804
      err_msg.append(err_buff);
 
805
      break;
 
806
    case '3':
 
807
      mi->rli.relay_log.description_event_for_queue= new
 
808
        Format_description_log_event(1, drizzle->server_version);
 
809
      break;
 
810
    case '4':
 
811
      mi->rli.relay_log.description_event_for_queue= new
 
812
        Format_description_log_event(3, drizzle->server_version);
 
813
      break;
 
814
    default:
 
815
      /*
 
816
        Master is DRIZZLE >=5.0. Give a default Format_desc event, so that we can
 
817
        take the early steps (like tests for "is this a 3.23 master") which we
 
818
        have to take before we receive the real master's Format_desc which will
 
819
        override this one. Note that the Format_desc we create below is garbage
 
820
        (it has the format of the *slave*); it's only good to help know if the
 
821
        master is 3.23, 4.0, etc.
 
822
      */
 
823
      mi->rli.relay_log.description_event_for_queue= new
 
824
        Format_description_log_event(4, drizzle->server_version);
 
825
      break;
 
826
    }
 
827
  }
 
828
 
 
829
  /*
 
830
     This does not mean that a 5.0 slave will be able to read a 6.0 master; but
 
831
     as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
 
832
     can't read a 6.0 master, this will show up when the slave can't read some
 
833
     events sent by the master, and there will be error messages.
 
834
  */
 
835
 
 
836
  if (err_msg.length() != 0)
 
837
    goto err;
 
838
 
 
839
  /* as we are here, we tried to allocate the event */
 
840
  if (!mi->rli.relay_log.description_event_for_queue)
 
841
  {
 
842
    errmsg= _("default Format_description_log_event");
 
843
    err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
 
844
    sprintf(err_buff, ER(err_code), errmsg);
 
845
    err_msg.append(err_buff);
 
846
    goto err;
 
847
  }
 
848
 
 
849
  /*
 
850
    Compare the master and slave's clock. Do not die if master's clock is
 
851
    unavailable (very old master not supporting UNIX_TIMESTAMP()?).
 
852
  */
 
853
 
 
854
  if (!drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
 
855
      (master_res= drizzle_store_result(drizzle)) &&
 
856
      (master_row= drizzle_fetch_row(master_res)))
 
857
  {
 
858
    mi->clock_diff_with_master=
 
859
      (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
 
860
  }
 
861
  else if (!check_io_slave_killed(mi->io_session, mi, NULL))
 
862
  {
 
863
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
 
864
    sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
 
865
                        "do not trust column Seconds_Behind_Master of SHOW "
 
866
                        "SLAVE STATUS. Error: %s (%d)"),
 
867
                      drizzle_error(drizzle), drizzle_errno(drizzle));
 
868
  }
 
869
  if (master_res)
 
870
    drizzle_free_result(master_res);
 
871
 
 
872
  /*
 
873
    Check that the master's server id and ours are different. Because if they
 
874
    are equal (which can result from a simple copy of master's datadir to slave,
 
875
    thus copying some drizzle.cnf), replication will work but all events will be
 
876
    skipped.
 
877
    Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
 
878
    master?).
 
879
    Note: we could have put a @@SERVER_ID in the previous SELECT
 
880
    UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
 
881
  */
 
882
  if (!drizzle_real_query(drizzle,
 
883
                        STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
 
884
      (master_res= drizzle_store_result(drizzle)))
 
885
  {
 
886
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
887
        (::server_id == strtoul(master_row[1], 0, 10)) &&
 
888
        !mi->rli.replicate_same_server_id)
 
889
    {
 
890
      errmsg=
 
891
        _("The slave I/O thread stops because master and slave have equal "
 
892
          "DRIZZLE server ids; these ids must be different "
 
893
          "for replication to work (or "
 
894
          "the --replicate-same-server-id option must be used "
 
895
          "on slave but this does"
 
896
          "not always make sense; please check the manual before using it).");
 
897
      err_code= ER_SLAVE_FATAL_ERROR;
 
898
      sprintf(err_buff, ER(err_code), errmsg);
 
899
      err_msg.append(err_buff);
 
900
    }
 
901
    drizzle_free_result(master_res);
 
902
    if (errmsg)
 
903
      goto err;
 
904
  }
 
905
 
 
906
  /*
 
907
    Check that the master's global character_set_server and ours are the same.
 
908
    Not fatal if query fails (old master?).
 
909
    Note that we don't check for equality of global character_set_client and
 
910
    collation_connection (neither do we prevent their setting in
 
911
    set_var.cc). That's because from what I (Guilhem) have tested, the global
 
912
    values of these 2 are never used (new connections don't use them).
 
913
    We don't test equality of global collation_database either as it's is
 
914
    going to be deprecated (made read-only) in 4.1 very soon.
 
915
    The test is only relevant if master < 5.0.3 (we'll test only if it's older
 
916
    than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
 
917
    charset info in each binlog event.
 
918
    We don't do it for 3.23 because masters <3.23.50 hang on
 
919
    SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
 
920
    test only if master is 4.x.
 
921
  */
 
922
 
 
923
  /* redundant with rest of code but safer against later additions */
 
924
  if (*drizzle->server_version == '3')
 
925
    goto err;
 
926
 
 
927
  if ((*drizzle->server_version == '4') &&
 
928
      !drizzle_real_query(drizzle,
 
929
                        STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
 
930
      (master_res= drizzle_store_result(drizzle)))
 
931
  {
 
932
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
933
        strcmp(master_row[0], global_system_variables.collation_server->name))
 
934
    {
 
935
      errmsg=
 
936
        _("The slave I/O thread stops because master and slave have"
 
937
          " different values for the COLLATION_SERVER global variable."
 
938
          " The values must be equal for replication to work");
 
939
      err_code= ER_SLAVE_FATAL_ERROR;
 
940
      sprintf(err_buff, ER(err_code), errmsg);
 
941
      err_msg.append(err_buff);
 
942
    }
 
943
    drizzle_free_result(master_res);
 
944
    if (errmsg)
 
945
      goto err;
 
946
  }
 
947
 
 
948
  /*
 
949
    Perform analogous check for time zone. Theoretically we also should
 
950
    perform check here to verify that SYSTEM time zones are the same on
 
951
    slave and master, but we can't rely on value of @@system_time_zone
 
952
    variable (it is time zone abbreviation) since it determined at start
 
953
    time and so could differ for slave and master even if they are really
 
954
    in the same system time zone. So we are omiting this check and just
 
955
    relying on documentation. Also according to Monty there are many users
 
956
    who are using replication between servers in various time zones. Hence
 
957
    such check will broke everything for them. (And now everything will
 
958
    work for them because by default both their master and slave will have
 
959
    'SYSTEM' time zone).
 
960
    This check is only necessary for 4.x masters (and < 5.0.4 masters but
 
961
    those were alpha).
 
962
  */
 
963
  if ((*drizzle->server_version == '4') &&
 
964
      !drizzle_real_query(drizzle, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
 
965
      (master_res= drizzle_store_result(drizzle)))
 
966
  {
 
967
    if ((master_row= drizzle_fetch_row(master_res)) &&
 
968
        strcmp(master_row[0],
 
969
               global_system_variables.time_zone->get_name()->ptr()))
 
970
    {
 
971
      errmsg=
 
972
        _("The slave I/O thread stops because master and slave have"
 
973
          " different values for the TIME_ZONE global variable."
 
974
          " The values must be equal for replication to work");
 
975
      err_code= ER_SLAVE_FATAL_ERROR;
 
976
      sprintf(err_buff, ER(err_code), errmsg);
 
977
      err_msg.append(err_buff);
 
978
    }
 
979
    drizzle_free_result(master_res);
 
980
 
 
981
    if (errmsg)
 
982
      goto err;
 
983
  }
 
984
 
 
985
  if (mi->heartbeat_period != 0.0)
 
986
  {
 
987
    char llbuf[22];
 
988
    const char query_format[]= "SET @master_heartbeat_period= %s";
 
989
    char query[sizeof(query_format) - 2 + sizeof(llbuf)];
 
990
    /* 
 
991
       the period is an uint64_t of nano-secs. 
 
992
    */
 
993
    llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
 
994
    sprintf(query, query_format, llbuf);
 
995
 
 
996
    if (drizzle_real_query(drizzle, query, strlen(query))
 
997
        && !check_io_slave_killed(mi->io_session, mi, NULL))
 
998
    {
 
999
      err_msg.append("The slave I/O thread stops because querying master with '");
 
1000
      err_msg.append(query);
 
1001
      err_msg.append("' failed;");
 
1002
      err_msg.append(" error: ");
 
1003
      err_code= drizzle_errno(drizzle);
 
1004
      err_msg.qs_append(err_code);
 
1005
      err_msg.append("  '");
 
1006
      err_msg.append(drizzle_error(drizzle));
 
1007
      err_msg.append("'");
 
1008
      drizzle_free_result(drizzle_store_result(drizzle));
 
1009
      goto err;
 
1010
    }
 
1011
    drizzle_free_result(drizzle_store_result(drizzle));
 
1012
  }
 
1013
  
 
1014
err:
 
1015
  if (err_msg.length() != 0)
 
1016
  {
 
1017
    sql_print_error("%s",err_msg.ptr());
 
1018
    assert(err_code != 0);
 
1019
    mi->report(ERROR_LEVEL, err_code, "%s",err_msg.ptr());
 
1020
    return(1);
 
1021
  }
 
1022
 
 
1023
  return(0);
 
1024
}
 
1025
 
 
1026
 
 
1027
static bool wait_for_relay_log_space(Relay_log_info* rli)
 
1028
{
 
1029
  bool slave_killed=0;
 
1030
  Master_info* mi = rli->mi;
 
1031
  const char *save_proc_info;
 
1032
  Session* session = mi->io_session;
 
1033
 
 
1034
  pthread_mutex_lock(&rli->log_space_lock);
 
1035
  save_proc_info= session->enter_cond(&rli->log_space_cond,
 
1036
                                  &rli->log_space_lock,
 
1037
                                  _("Waiting for the slave SQL thread "
 
1038
                                    "to free enough relay log space"));
 
1039
  while (rli->log_space_limit < rli->log_space_total &&
 
1040
         !(slave_killed=io_slave_killed(session,mi)) &&
 
1041
         !rli->ignore_log_space_limit)
 
1042
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
 
1043
  session->exit_cond(save_proc_info);
 
1044
  return(slave_killed);
 
1045
}
 
1046
 
 
1047
 
 
1048
/*
 
1049
  Builds a Rotate from the ignored events' info and writes it to relay log.
 
1050
 
 
1051
  SYNOPSIS
 
1052
  write_ignored_events_info_to_relay_log()
 
1053
    session             pointer to I/O thread's session
 
1054
    mi
 
1055
 
 
1056
  DESCRIPTION
 
1057
    Slave I/O thread, going to die, must leave a durable trace of the
 
1058
    ignored events' end position for the use of the slave SQL thread, by
 
1059
    calling this function. Only that thread can call it (see assertion).
 
1060
 */
 
1061
static void write_ignored_events_info_to_relay_log(Session *session __attribute__((unused)),
 
1062
                                                   Master_info *mi)
 
1063
{
 
1064
  Relay_log_info *rli= &mi->rli;
 
1065
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
 
1066
 
 
1067
  assert(session == mi->io_session);
 
1068
  pthread_mutex_lock(log_lock);
 
1069
  if (rli->ign_master_log_name_end[0])
 
1070
  {
 
1071
    Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
 
1072
                                               0, rli->ign_master_log_pos_end,
 
1073
                                               Rotate_log_event::DUP_NAME);
 
1074
    rli->ign_master_log_name_end[0]= 0;
 
1075
    /* can unlock before writing as slave SQL session will soon see our Rotate */
 
1076
    pthread_mutex_unlock(log_lock);
 
1077
    if (likely((bool)ev))
 
1078
    {
 
1079
      ev->server_id= 0; // don't be ignored by slave SQL thread
 
1080
      if (unlikely(rli->relay_log.append(ev)))
 
1081
        mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
1082
                   ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
1083
                   _("failed to write a Rotate event"
 
1084
                     " to the relay log, SHOW SLAVE STATUS may be"
 
1085
                     " inaccurate"));
 
1086
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
1087
      if (mi->flush())
 
1088
        sql_print_error(_("Failed to flush master info file"));
 
1089
      delete ev;
 
1090
    }
 
1091
    else
 
1092
      mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
 
1093
                 ER(ER_SLAVE_CREATE_EVENT_FAILURE),
 
1094
                 _("Rotate_event (out of memory?),"
 
1095
                   " SHOW SLAVE STATUS may be inaccurate"));
 
1096
  }
 
1097
  else
 
1098
    pthread_mutex_unlock(log_lock);
 
1099
  return;
 
1100
}
 
1101
 
 
1102
 
 
1103
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
 
1104
                             bool *suppress_warnings)
 
1105
{
 
1106
  unsigned char buf[1024], *pos= buf;
 
1107
  uint32_t report_host_len, report_user_len=0, report_password_len=0;
 
1108
 
 
1109
  *suppress_warnings= false;
 
1110
  if (!report_host)
 
1111
    return(0);
 
1112
  report_host_len= strlen(report_host);
 
1113
  if (report_user)
 
1114
    report_user_len= strlen(report_user);
 
1115
  if (report_password)
 
1116
    report_password_len= strlen(report_password);
 
1117
  /* 30 is a good safety margin */
 
1118
  if (report_host_len + report_user_len + report_password_len + 30 >
 
1119
      sizeof(buf))
 
1120
    return(0);                                     // safety
 
1121
 
 
1122
  int4store(pos, server_id); pos+= 4;
 
1123
  pos= net_store_data(pos, (unsigned char*) report_host, report_host_len);
 
1124
  pos= net_store_data(pos, (unsigned char*) report_user, report_user_len);
 
1125
  pos= net_store_data(pos, (unsigned char*) report_password, report_password_len);
 
1126
  int2store(pos, (uint16_t) report_port); pos+= 2;
 
1127
  int4store(pos, 0);    pos+= 4;
 
1128
  /* The master will fill in master_id */
 
1129
  int4store(pos, 0);                    pos+= 4;
 
1130
 
 
1131
  if (simple_command(drizzle, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
 
1132
  {
 
1133
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1134
    {
 
1135
      *suppress_warnings= true;                 // Suppress reconnect warning
 
1136
    }
 
1137
    else if (!check_io_slave_killed(mi->io_session, mi, NULL))
 
1138
    {
 
1139
      char buf[256];
 
1140
      snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle), 
 
1141
               drizzle_errno(drizzle));
 
1142
      mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
 
1143
                 ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
 
1144
    }
 
1145
    return(1);
 
1146
  }
 
1147
  return(0);
 
1148
}
 
1149
 
 
1150
 
 
1151
bool show_master_info(Session* session, Master_info* mi)
 
1152
{
 
1153
  // TODO: fix this for multi-master
 
1154
  List<Item> field_list;
 
1155
  Protocol *protocol= session->protocol;
 
1156
 
 
1157
  field_list.push_back(new Item_empty_string("Slave_IO_State",
 
1158
                                                     14));
 
1159
  field_list.push_back(new Item_empty_string("Master_Host",
 
1160
                                                     sizeof(mi->host)));
 
1161
  field_list.push_back(new Item_empty_string("Master_User",
 
1162
                                                     sizeof(mi->user)));
 
1163
  field_list.push_back(new Item_return_int("Master_Port", 7,
 
1164
                                           DRIZZLE_TYPE_LONG));
 
1165
  field_list.push_back(new Item_return_int("Connect_Retry", 10,
 
1166
                                           DRIZZLE_TYPE_LONG));
 
1167
  field_list.push_back(new Item_empty_string("Master_Log_File",
 
1168
                                             FN_REFLEN));
 
1169
  field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
 
1170
                                           DRIZZLE_TYPE_LONGLONG));
 
1171
  field_list.push_back(new Item_empty_string("Relay_Log_File",
 
1172
                                             FN_REFLEN));
 
1173
  field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
 
1174
                                           DRIZZLE_TYPE_LONGLONG));
 
1175
  field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
 
1176
                                             FN_REFLEN));
 
1177
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
 
1178
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
 
1179
  field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
 
1180
  field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
 
1181
  field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
 
1182
  field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
 
1183
  field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
 
1184
  field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
 
1185
                                             28));
 
1186
  field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
 
1187
  field_list.push_back(new Item_empty_string("Last_Error", 20));
 
1188
  field_list.push_back(new Item_return_int("Skip_Counter", 10,
 
1189
                                           DRIZZLE_TYPE_LONG));
 
1190
  field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
 
1191
                                           DRIZZLE_TYPE_LONGLONG));
 
1192
  field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
 
1193
                                           DRIZZLE_TYPE_LONGLONG));
 
1194
  field_list.push_back(new Item_empty_string("Until_Condition", 6));
 
1195
  field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
 
1196
  field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
 
1197
                                           DRIZZLE_TYPE_LONGLONG));
 
1198
  field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
 
1199
                                           DRIZZLE_TYPE_LONGLONG));
 
1200
  field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
 
1201
  field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
 
1202
  field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
 
1203
  field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
 
1204
 
 
1205
  if (protocol->send_fields(&field_list,
 
1206
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1207
    return(true);
 
1208
 
 
1209
  if (mi->host[0])
 
1210
  {
 
1211
    String *packet= &session->packet;
 
1212
    protocol->prepare_for_resend();
 
1213
 
 
1214
    /*
 
1215
      slave_running can be accessed without run_lock but not other
 
1216
      non-volotile members like mi->io_session, which is guarded by the mutex.
 
1217
    */
 
1218
    pthread_mutex_lock(&mi->run_lock);
 
1219
    protocol->store(mi->io_session ? mi->io_session->get_proc_info() : "", &my_charset_bin);
 
1220
    pthread_mutex_unlock(&mi->run_lock);
 
1221
 
 
1222
    pthread_mutex_lock(&mi->data_lock);
 
1223
    pthread_mutex_lock(&mi->rli.data_lock);
 
1224
    protocol->store(mi->getHostname(), &my_charset_bin);
 
1225
    protocol->store(mi->getUsername(), &my_charset_bin);
 
1226
    protocol->store((uint32_t) mi->getPort());
 
1227
    protocol->store(mi->getConnectionRetry());
 
1228
    protocol->store(mi->getLogName(), &my_charset_bin);
 
1229
    protocol->store((uint64_t) mi->getLogPosition());
 
1230
    protocol->store(mi->rli.group_relay_log_name.c_str() +
 
1231
                    dirname_length(mi->rli.group_relay_log_name.c_str()),
 
1232
                    &my_charset_bin);
 
1233
    protocol->store((uint64_t) mi->rli.group_relay_log_pos);
 
1234
    protocol->store(mi->rli.group_master_log_name.c_str(), &my_charset_bin);
 
1235
    protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
 
1236
                    "Yes" : "No", &my_charset_bin);
 
1237
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
 
1238
    protocol->store(rpl_filter->get_do_db());
 
1239
    protocol->store(rpl_filter->get_ignore_db());
 
1240
 
 
1241
    char buf[256];
 
1242
    String tmp(buf, sizeof(buf), &my_charset_bin);
 
1243
    rpl_filter->get_do_table(&tmp);
 
1244
    protocol->store(&tmp);
 
1245
    rpl_filter->get_ignore_table(&tmp);
 
1246
    protocol->store(&tmp);
 
1247
    rpl_filter->get_wild_do_table(&tmp);
 
1248
    protocol->store(&tmp);
 
1249
    rpl_filter->get_wild_ignore_table(&tmp);
 
1250
    protocol->store(&tmp);
 
1251
 
 
1252
    protocol->store(mi->rli.last_error().number);
 
1253
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
 
1254
    protocol->store((uint32_t) mi->rli.slave_skip_counter);
 
1255
    protocol->store((uint64_t) mi->rli.group_master_log_pos);
 
1256
    protocol->store((uint64_t) mi->rli.log_space_total);
 
1257
 
 
1258
    protocol->store(
 
1259
      mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
 
1260
        ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
 
1261
          "Relay"), &my_charset_bin);
 
1262
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
 
1263
    protocol->store((uint64_t) mi->rli.until_log_pos);
 
1264
 
 
1265
    /*
 
1266
      Seconds_Behind_Master: if SQL thread is running and I/O thread is
 
1267
      connected, we can compute it otherwise show NULL (i.e. unknown).
 
1268
    */
 
1269
    if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
 
1270
        mi->rli.slave_running)
 
1271
    {
 
1272
      long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
 
1273
                       - mi->clock_diff_with_master);
 
1274
      /*
 
1275
        Apparently on some systems time_diff can be <0. Here are possible
 
1276
        reasons related to MySQL:
 
1277
        - the master is itself a slave of another master whose time is ahead.
 
1278
        - somebody used an explicit SET TIMESTAMP on the master.
 
1279
        Possible reason related to granularity-to-second of time functions
 
1280
        (nothing to do with MySQL), which can explain a value of -1:
 
1281
        assume the master's and slave's time are perfectly synchronized, and
 
1282
        that at slave's connection time, when the master's timestamp is read,
 
1283
        it is at the very end of second 1, and (a very short time later) when
 
1284
        the slave's timestamp is read it is at the very beginning of second
 
1285
        2. Then the recorded value for master is 1 and the recorded value for
 
1286
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
 
1287
        between timestamp of slave and rli->last_master_timestamp is 0
 
1288
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
 
1289
        This confuses users, so we don't go below 0: hence the cmax().
 
1290
 
 
1291
        last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
 
1292
        special marker to say "consider we have caught up".
 
1293
      */
 
1294
      protocol->store((int64_t)(mi->rli.last_master_timestamp ?
 
1295
                                 cmax((long)0, time_diff) : 0));
 
1296
    }
 
1297
    else
 
1298
    {
 
1299
      protocol->store_null();
 
1300
    }
 
1301
 
 
1302
    // Last_IO_Errno
 
1303
    protocol->store(mi->last_error().number);
 
1304
    // Last_IO_Error
 
1305
    protocol->store(mi->last_error().message, &my_charset_bin);
 
1306
    // Last_SQL_Errno
 
1307
    protocol->store(mi->rli.last_error().number);
 
1308
    // Last_SQL_Error
 
1309
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
 
1310
 
 
1311
    pthread_mutex_unlock(&mi->rli.data_lock);
 
1312
    pthread_mutex_unlock(&mi->data_lock);
 
1313
 
 
1314
    if (my_net_write(&session->net, (unsigned char*) session->packet.ptr(), packet->length()))
 
1315
      return(true);
 
1316
  }
 
1317
  my_eof(session);
 
1318
  return(false);
 
1319
}
 
1320
 
 
1321
 
 
1322
void set_slave_thread_options(Session* session)
 
1323
{
 
1324
  /*
 
1325
     It's nonsense to constrain the slave threads with max_join_size; if a
 
1326
     query succeeded on master, we HAVE to execute it. So set
 
1327
     OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
 
1328
     (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
 
1329
     SELECT examining more than 4 billion rows would still fail (yes, because
 
1330
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
 
1331
     only for client threads.
 
1332
  */
 
1333
  uint64_t options= session->options | OPTION_BIG_SELECTS;
 
1334
  if (opt_log_slave_updates)
 
1335
    options|= OPTION_BIN_LOG;
 
1336
  else
 
1337
    options&= ~OPTION_BIN_LOG;
 
1338
  session->options= options;
 
1339
  session->variables.completion_type= 0;
 
1340
  return;
 
1341
}
 
1342
 
 
1343
/*
 
1344
  init_slave_thread()
 
1345
*/
 
1346
 
 
1347
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type)
 
1348
{
 
1349
  int32_t simulate_error= 0;
 
1350
  session->system_thread = (session_type == SLAVE_Session_SQL) ?
 
1351
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
 
1352
  session->security_ctx->skip_grants();
 
1353
  my_net_init(&session->net, 0);
 
1354
/*
 
1355
  Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
 
1356
  slave threads, since a replication event can become this much larger
 
1357
  than the corresponding packet (query) sent from client to master.
 
1358
*/
 
1359
  session->variables.max_allowed_packet= global_system_variables.max_allowed_packet
 
1360
    + MAX_LOG_EVENT_HEADER;  /* note, incr over the global not session var */
 
1361
  session->slave_thread = 1;
 
1362
  set_slave_thread_options(session);
 
1363
  session->client_capabilities = CLIENT_LOCAL_FILES;
 
1364
  pthread_mutex_lock(&LOCK_thread_count);
 
1365
  session->thread_id= session->variables.pseudo_thread_id= thread_id++;
 
1366
  pthread_mutex_unlock(&LOCK_thread_count);
 
1367
 
 
1368
 simulate_error|= (1 << SLAVE_Session_IO);
 
1369
 simulate_error|= (1 << SLAVE_Session_SQL);
 
1370
  if (init_thr_lock() || session->store_globals() || simulate_error & (1<< session_type))
 
1371
  {
 
1372
    session->cleanup();
 
1373
    return(-1);
 
1374
  }
 
1375
  lex_start(session);
 
1376
 
 
1377
  if (session_type == SLAVE_Session_SQL)
 
1378
    session->set_proc_info("Waiting for the next event in relay log");
 
1379
  else
 
1380
    session->set_proc_info("Waiting for master update");
 
1381
  session->version=refresh_version;
 
1382
  session->set_time();
 
1383
  return(0);
 
1384
}
 
1385
 
 
1386
 
 
1387
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
1388
                      void* thread_killed_arg)
 
1389
{
 
1390
  int32_t nap_time;
 
1391
  thr_alarm_t alarmed;
 
1392
 
 
1393
  thr_alarm_init(&alarmed);
 
1394
  time_t start_time= my_time(0);
 
1395
  time_t end_time= start_time+sec;
 
1396
 
 
1397
  while ((nap_time= (int32_t) (end_time - start_time)) > 0)
 
1398
  {
 
1399
    ALARM alarm_buff;
 
1400
    /*
 
1401
      The only reason we are asking for alarm is so that
 
1402
      we will be woken up in case of murder, so if we do not get killed,
 
1403
      set the alarm so it goes off after we wake up naturally
 
1404
    */
 
1405
    thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
 
1406
    sleep(nap_time);
 
1407
    thr_end_alarm(&alarmed);
 
1408
 
 
1409
    if ((*thread_killed)(session,thread_killed_arg))
 
1410
      return(1);
 
1411
    start_time= my_time(0);
 
1412
  }
 
1413
  return(0);
 
1414
}
 
1415
 
 
1416
 
 
1417
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
 
1418
                        bool *suppress_warnings)
 
1419
{
 
1420
  unsigned char buf[FN_REFLEN + 10];
 
1421
  int32_t len;
 
1422
  int32_t binlog_flags = 0; // for now
 
1423
  const char* logname = mi->getLogName();
 
1424
  
 
1425
  *suppress_warnings= false;
 
1426
 
 
1427
  // TODO if big log files: Change next to int8store()
 
1428
  int4store(buf, (uint32_t) mi->getLogPosition());
 
1429
  int2store(buf + 4, binlog_flags);
 
1430
  int4store(buf + 6, server_id);
 
1431
  len = (uint32_t) strlen(logname);
 
1432
  memcpy(buf + 10, logname,len);
 
1433
  if (simple_command(drizzle, COM_BINLOG_DUMP, buf, len + 10, 1))
 
1434
  {
 
1435
    /*
 
1436
      Something went wrong, so we will just reconnect and retry later
 
1437
      in the future, we should do a better error analysis, but for
 
1438
      now we just fill up the error log :-)
 
1439
    */
 
1440
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1441
      *suppress_warnings= true;                 // Suppress reconnect warning
 
1442
    else
 
1443
      sql_print_error(_("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs"),
 
1444
                      drizzle_errno(drizzle), drizzle_error(drizzle),
 
1445
                      mi->connect_retry);
 
1446
    return(1);
 
1447
  }
 
1448
 
 
1449
  return(0);
 
1450
}
 
1451
 
 
1452
/*
 
1453
  Read one event from the master
 
1454
 
 
1455
  SYNOPSIS
 
1456
    read_event()
 
1457
    DRIZZLE               DRIZZLE connection
 
1458
    mi                  Master connection information
 
1459
    suppress_warnings   TRUE when a normal net read timeout has caused us to
 
1460
                        try a reconnect.  We do not want to print anything to
 
1461
                        the error log in this case because this a anormal
 
1462
                        event in an idle server.
 
1463
 
 
1464
    RETURN VALUES
 
1465
    'packet_error'      Error
 
1466
    number              Length of packet
 
1467
*/
 
1468
 
 
1469
static uint32_t read_event(DRIZZLE *drizzle,
 
1470
                        Master_info *mi __attribute__((unused)),
 
1471
                        bool* suppress_warnings)
 
1472
{
 
1473
  uint32_t len;
 
1474
 
 
1475
  *suppress_warnings= false;
 
1476
  /*
 
1477
    my_real_read() will time us out
 
1478
    We check if we were told to die, and if not, try reading again
 
1479
  */
 
1480
  if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
 
1481
    return(packet_error);
 
1482
 
 
1483
  len = cli_safe_read(drizzle);
 
1484
  if (len == packet_error || (int32_t) len < 1)
 
1485
  {
 
1486
    if (drizzle_errno(drizzle) == ER_NET_READ_INTERRUPTED)
 
1487
    {
 
1488
      /*
 
1489
        We are trying a normal reconnect after a read timeout;
 
1490
        we suppress prints to .err file as long as the reconnect
 
1491
        happens without problems
 
1492
      */
 
1493
      *suppress_warnings= true;
 
1494
    }
 
1495
    else
 
1496
      sql_print_error(_("Error reading packet from server: %s ( server_errno=%d)"),
 
1497
                      drizzle_error(drizzle), drizzle_errno(drizzle));
 
1498
    return(packet_error);
 
1499
  }
 
1500
 
 
1501
  /* Check if eof packet */
 
1502
  if (len < 8 && drizzle->net.read_pos[0] == 254)
 
1503
  {
 
1504
    sql_print_information(_("Slave: received end packet from server, apparent "
 
1505
                            "master shutdown: %s"),
 
1506
                     drizzle_error(drizzle));
 
1507
     return(packet_error);
 
1508
  }
 
1509
 
 
1510
  return(len - 1);
 
1511
}
 
1512
 
 
1513
 
 
1514
int32_t check_expected_error(Session* session __attribute__((unused)),
 
1515
                         Relay_log_info const *rli __attribute__((unused)),
 
1516
                         int32_t expected_error)
 
1517
{
 
1518
  switch (expected_error) {
 
1519
  case ER_NET_READ_ERROR:
 
1520
  case ER_NET_ERROR_ON_WRITE:
 
1521
  case ER_QUERY_INTERRUPTED:
 
1522
  case ER_SERVER_SHUTDOWN:
 
1523
  case ER_NEW_ABORTING_CONNECTION:
 
1524
    return(1);
 
1525
  default:
 
1526
    return(0);
 
1527
  }
 
1528
}
 
1529
 
 
1530
 
 
1531
/*
 
1532
  Check if the current error is of temporary nature of not.
 
1533
  Some errors are temporary in nature, such as
 
1534
  ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT.  Ndb also signals
 
1535
  that the error is temporary by pushing a warning with the error code
 
1536
  ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
 
1537
*/
 
1538
static int32_t has_temporary_error(Session *session)
 
1539
{
 
1540
  if (session->is_fatal_error)
 
1541
    return(0);
 
1542
 
 
1543
  if (session->main_da.is_error())
 
1544
  {
 
1545
    session->clear_error();
 
1546
    my_error(ER_LOCK_DEADLOCK, MYF(0));
 
1547
  }
 
1548
 
 
1549
  /*
 
1550
    If there is no message in Session, we can't say if it's a temporary
 
1551
    error or not. This is currently the case for Incident_log_event,
 
1552
    which sets no message. Return FALSE.
 
1553
  */
 
1554
  if (!session->is_error())
 
1555
    return(0);
 
1556
 
 
1557
  /*
 
1558
    Temporary error codes:
 
1559
    currently, InnoDB deadlock detected by InnoDB or lock
 
1560
    wait timeout (innodb_lock_wait_timeout exceeded
 
1561
  */
 
1562
  if (session->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
 
1563
      session->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
 
1564
    return(1);
 
1565
 
 
1566
  return(0);
 
1567
}
 
1568
 
 
1569
 
 
1570
/**
 
1571
  Applies the given event and advances the relay log position.
 
1572
 
 
1573
  In essence, this function does:
 
1574
 
 
1575
  @code
 
1576
    ev->apply_event(rli);
 
1577
    ev->update_pos(rli);
 
1578
  @endcode
 
1579
 
 
1580
  But it also does some maintainance, such as skipping events if
 
1581
  needed and reporting errors.
 
1582
 
 
1583
  If the @c skip flag is set, then it is tested whether the event
 
1584
  should be skipped, by looking at the slave_skip_counter and the
 
1585
  server id.  The skip flag should be set when calling this from a
 
1586
  replication thread but not set when executing an explicit BINLOG
 
1587
  statement.
 
1588
 
 
1589
  @retval 0 OK.
 
1590
 
 
1591
  @retval 1 Error calling ev->apply_event().
 
1592
 
 
1593
  @retval 2 No error calling ev->apply_event(), but error calling
 
1594
  ev->update_pos().
 
1595
*/
 
1596
int32_t apply_event_and_update_pos(Log_event* ev, Session* session, Relay_log_info* rli,
 
1597
                               bool skip)
 
1598
{
 
1599
  int32_t exec_res= 0;
 
1600
 
 
1601
  /*
 
1602
    Execute the event to change the database and update the binary
 
1603
    log coordinates, but first we set some data that is needed for
 
1604
    the thread.
 
1605
 
 
1606
    The event will be executed unless it is supposed to be skipped.
 
1607
 
 
1608
    Queries originating from this server must be skipped.  Low-level
 
1609
    events (Format_description_log_event, Rotate_log_event,
 
1610
    Stop_log_event) from this server must also be skipped. But for
 
1611
    those we don't want to modify 'group_master_log_pos', because
 
1612
    these events did not exist on the master.
 
1613
    Format_description_log_event is not completely skipped.
 
1614
 
 
1615
    Skip queries specified by the user in 'slave_skip_counter'.  We
 
1616
    can't however skip events that has something to do with the log
 
1617
    files themselves.
 
1618
 
 
1619
    Filtering on own server id is extremely important, to ignore
 
1620
    execution of events created by the creation/rotation of the relay
 
1621
    log (remember that now the relay log starts with its Format_desc,
 
1622
    has a Rotate etc).
 
1623
  */
 
1624
 
 
1625
  session->server_id = ev->server_id; // use the original server id for logging
 
1626
  session->set_time();                            // time the query
 
1627
  session->lex->current_select= 0;
 
1628
  if (!ev->when)
 
1629
    ev->when= my_time(0);
 
1630
  ev->session = session; // because up to this point, ev->session == 0
 
1631
 
 
1632
  if (skip)
 
1633
  {
 
1634
    int32_t reason= ev->shall_skip(rli);
 
1635
    if (reason == Log_event::EVENT_SKIP_COUNT)
 
1636
      --rli->slave_skip_counter;
 
1637
    pthread_mutex_unlock(&rli->data_lock);
 
1638
    if (reason == Log_event::EVENT_SKIP_NOT)
 
1639
      exec_res= ev->apply_event(rli);
 
1640
  }
 
1641
  else
 
1642
    exec_res= ev->apply_event(rli);
 
1643
 
 
1644
  if (exec_res == 0)
 
1645
  {
 
1646
    int32_t error= ev->update_pos(rli);
 
1647
    /*
 
1648
      The update should not fail, so print an error message and
 
1649
      return an error code.
 
1650
 
 
1651
      TODO: Replace this with a decent error message when merged
 
1652
      with BUG#24954 (which adds several new error message).
 
1653
    */
 
1654
    if (error)
 
1655
    {
 
1656
      char buf[22];
 
1657
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
 
1658
                  _("It was not possible to update the positions"
 
1659
                  " of the relay log information: the slave may"
 
1660
                  " be in an inconsistent state."
 
1661
                  " Stopped in %s position %s"),
 
1662
                  rli->group_relay_log_name.c_str(),
 
1663
                  llstr(rli->group_relay_log_pos, buf));
 
1664
      return(2);
 
1665
    }
 
1666
  }
 
1667
 
 
1668
  return(exec_res ? 1 : 0);
 
1669
}
 
1670
 
 
1671
 
 
1672
/**
 
1673
  Top-level function for executing the next event from the relay log.
 
1674
 
 
1675
  This function reads the event from the relay log, executes it, and
 
1676
  advances the relay log position.  It also handles errors, etc.
 
1677
 
 
1678
  This function may fail to apply the event for the following reasons:
 
1679
 
 
1680
   - The position specfied by the UNTIL condition of the START SLAVE
 
1681
     command is reached.
 
1682
 
 
1683
   - It was not possible to read the event from the log.
 
1684
 
 
1685
   - The slave is killed.
 
1686
 
 
1687
   - An error occurred when applying the event, and the event has been
 
1688
     tried slave_trans_retries times.  If the event has been retried
 
1689
     fewer times, 0 is returned.
 
1690
 
 
1691
   - init_master_info or init_relay_log_pos failed. (These are called
 
1692
     if a failure occurs when applying the event.)</li>
 
1693
 
 
1694
   - An error occurred when updating the binlog position.
 
1695
 
 
1696
  @retval 0 The event was applied.
 
1697
 
 
1698
  @retval 1 The event was not applied.
 
1699
*/
 
1700
static int32_t exec_relay_log_event(Session* session, Relay_log_info* rli)
 
1701
{
 
1702
  /*
 
1703
     We acquire this mutex since we need it for all operations except
 
1704
     event execution. But we will release it in places where we will
 
1705
     wait for something for example inside of next_event().
 
1706
   */
 
1707
  pthread_mutex_lock(&rli->data_lock);
 
1708
 
 
1709
  Log_event * ev = next_event(rli);
 
1710
 
 
1711
  assert(rli->sql_session==session);
 
1712
 
 
1713
  if (sql_slave_killed(session,rli))
 
1714
  {
 
1715
    pthread_mutex_unlock(&rli->data_lock);
 
1716
    delete ev;
 
1717
    return(1);
 
1718
  }
 
1719
  if (ev)
 
1720
  {
 
1721
    int32_t exec_res;
 
1722
 
 
1723
    /*
 
1724
      This tests if the position of the beginning of the current event
 
1725
      hits the UNTIL barrier.
 
1726
    */
 
1727
    if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
 
1728
        rli->is_until_satisfied((rli->is_in_group() || !ev->log_pos) ?
 
1729
                                rli->group_master_log_pos :
 
1730
                                ev->log_pos - ev->data_written))
 
1731
    {
 
1732
      char buf[22];
 
1733
      sql_print_information(_("Slave SQL thread stopped because it reached its"
 
1734
                              " UNTIL position %s"),
 
1735
                            llstr(rli->until_pos(), buf));
 
1736
      /*
 
1737
        Setting abort_slave flag because we do not want additional message about
 
1738
        error in query execution to be printed.
 
1739
      */
 
1740
      rli->abort_slave= 1;
 
1741
      pthread_mutex_unlock(&rli->data_lock);
 
1742
      delete ev;
 
1743
      return(1);
 
1744
    }
 
1745
    exec_res= apply_event_and_update_pos(ev, session, rli, true);
 
1746
 
 
1747
    /*
 
1748
      Format_description_log_event should not be deleted because it will be
 
1749
      used to read info about the relay log's format; it will be deleted when
 
1750
      the SQL thread does not need it, i.e. when this thread terminates.
 
1751
    */
 
1752
    if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
 
1753
    {
 
1754
      delete ev;
 
1755
    }
 
1756
 
 
1757
    /*
 
1758
      update_log_pos failed: this should not happen, so we don't
 
1759
      retry.
 
1760
    */
 
1761
    if (exec_res == 2)
 
1762
      return(1);
 
1763
 
 
1764
    if (slave_trans_retries)
 
1765
    {
 
1766
      int32_t temp_err= 0;
 
1767
      if (exec_res && (temp_err= has_temporary_error(session)))
 
1768
      {
 
1769
        const char *errmsg;
 
1770
        /*
 
1771
          We were in a transaction which has been rolled back because of a
 
1772
          temporary error;
 
1773
          let's seek back to BEGIN log event and retry it all again.
 
1774
          Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
 
1775
          there is no rollback since 5.0.13 (ref: manual).
 
1776
          We have to not only seek but also
 
1777
          a) init_master_info(), to seek back to hot relay log's start for later
 
1778
          (for when we will come back to this hot log after re-processing the
 
1779
          possibly existing old logs where BEGIN is: check_binlog_magic() will
 
1780
          then need the cache to be at position 0 (see comments at beginning of
 
1781
          init_master_info()).
 
1782
          b) init_relay_log_pos(), because the BEGIN may be an older relay log.
 
1783
        */
 
1784
        if (rli->trans_retries < slave_trans_retries)
 
1785
        {
 
1786
          if (rli->mi->init_master_info(0, 0, SLAVE_SQL))
 
1787
            sql_print_error(_("Failed to initialize the master info structure"));
 
1788
          else if (init_relay_log_pos(rli,
 
1789
                                      rli->group_relay_log_name.c_str(),
 
1790
                                      rli->group_relay_log_pos,
 
1791
                                      1, &errmsg, 1))
 
1792
            sql_print_error(_("Error initializing relay log position: %s"),
 
1793
                            errmsg);
 
1794
          else
 
1795
          {
 
1796
            exec_res= 0;
 
1797
            end_trans(session, ROLLBACK);
 
1798
            /* chance for concurrent connection to get more locks */
 
1799
            safe_sleep(session, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
 
1800
                       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
 
1801
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
 
1802
            rli->trans_retries++;
 
1803
            rli->retried_trans++;
 
1804
            pthread_mutex_unlock(&rli->data_lock);
 
1805
          }
 
1806
        }
 
1807
        else
 
1808
          sql_print_error(_("Slave SQL thread retried transaction %lu time(s) "
 
1809
                            "in vain, giving up. Consider raising the value of "
 
1810
                            "the slave_transaction_retries variable."),
 
1811
                          slave_trans_retries);
 
1812
      }
 
1813
      else if ((exec_res && !temp_err) ||
 
1814
               (opt_using_transactions &&
 
1815
                rli->group_relay_log_pos == rli->event_relay_log_pos))
 
1816
      {
 
1817
        /*
 
1818
          Only reset the retry counter if the entire group succeeded
 
1819
          or failed with a non-transient error.  On a successful
 
1820
          event, the execution will proceed as usual; in the case of a
 
1821
          non-transient error, the slave will stop with an error.
 
1822
         */
 
1823
        rli->trans_retries= 0; // restart from fresh
 
1824
      }
 
1825
    }
 
1826
    return(exec_res);
 
1827
  }
 
1828
  pthread_mutex_unlock(&rli->data_lock);
 
1829
  rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
 
1830
              ER(ER_SLAVE_RELAY_LOG_READ_FAILURE),
 
1831
              _("Could not parse relay log event entry. The possible reasons "
 
1832
                "are: the master's binary log is corrupted (you can check this "
 
1833
                "by running 'mysqlbinlog' on the binary log), the slave's "
 
1834
                "relay log is corrupted (you can check this by running "
 
1835
                "'mysqlbinlog' on the relay log), a network problem, or a bug "
 
1836
                "in the master's or slave's DRIZZLE code. If you want to check "
 
1837
                "the master's binary log or slave's relay log, you will be "
 
1838
                "able to know their names by issuing 'SHOW SLAVE STATUS' "
 
1839
                "on this slave."));
 
1840
  return(1);
 
1841
}
 
1842
 
 
1843
 
 
1844
/**
 
1845
  @brief Try to reconnect slave IO thread.
 
1846
 
 
1847
  @details Terminates current connection to master, sleeps for
 
1848
  @c mi->connect_retry msecs and initiates new connection with
 
1849
  @c safe_reconnect(). Variable pointed by @c retry_count is increased -
 
1850
  if it exceeds @c master_retry_count then connection is not re-established
 
1851
  and function signals error.
 
1852
  Unless @c suppres_warnings is TRUE, a warning is put in the server error log
 
1853
  when reconnecting. The warning message and messages used to report errors
 
1854
  are taken from @c messages array. In case @c master_retry_count is exceeded,
 
1855
  no messages are added to the log.
 
1856
 
 
1857
  @param[in]     session                 Thread context.
 
1858
  @param[in]     DRIZZLE               DRIZZLE connection.
 
1859
  @param[in]     mi                  Master connection information.
 
1860
  @param[in,out] retry_count         Number of attempts to reconnect.
 
1861
  @param[in]     suppress_warnings   TRUE when a normal net read timeout 
 
1862
                                     has caused to reconnecting.
 
1863
  @param[in]     messages            Messages to print/log, see 
 
1864
                                     reconnect_messages[] array.
 
1865
 
 
1866
  @retval        0                   OK.
 
1867
  @retval        1                   There was an error.
 
1868
*/
 
1869
 
 
1870
static int32_t try_to_reconnect(Session *session, DRIZZLE *drizzle, Master_info *mi,
 
1871
                                uint32_t *retry_count, bool suppress_warnings,
 
1872
                                const char *messages[SLAVE_RECON_MSG_MAX])
 
1873
{
 
1874
  mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
 
1875
  session->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
 
1876
  drizzle_disconnect(drizzle);
 
1877
  if ((*retry_count)++)
 
1878
  {
 
1879
    if (*retry_count > master_retry_count)
 
1880
      return 1;                             // Don't retry forever
 
1881
    safe_sleep(session, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
 
1882
               (void *) mi);
 
1883
  }
 
1884
  if (check_io_slave_killed(session, mi,
 
1885
                            _(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
 
1886
    return 1;
 
1887
  session->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
 
1888
  if (!suppress_warnings)
 
1889
  {
 
1890
    char buf[256], llbuff[22];
 
1891
    snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
 
1892
             IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
 
1893
    /*
 
1894
      Raise a warining during registering on master/requesting dump.
 
1895
      Log a message reading event.
 
1896
    */
 
1897
    if (_(messages[SLAVE_RECON_MSG_COMMAND])[0])
 
1898
    {
 
1899
      mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
 
1900
                 ER(ER_SLAVE_MASTER_COM_FAILURE),
 
1901
                 _(messages[SLAVE_RECON_MSG_COMMAND]), buf);
 
1902
    }
 
1903
    else
 
1904
    {
 
1905
      sql_print_information("%s",buf);
 
1906
    }
 
1907
  }
 
1908
  if (safe_reconnect(session, drizzle, mi, 1) || io_slave_killed(session, mi))
 
1909
  {
 
1910
    if (global_system_variables.log_warnings)
 
1911
      sql_print_information("%s",_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
 
1912
    return 1;
 
1913
  }
 
1914
  return 0;
 
1915
}
 
1916
 
 
1917
 
 
1918
/* Slave I/O Thread entry point */
 
1919
 
 
1920
pthread_handler_t handle_slave_io(void *arg)
 
1921
{
 
1922
  Session *session; // needs to be first for thread_stack
 
1923
  DRIZZLE *drizzle;
 
1924
  Master_info *mi = (Master_info*)arg;
 
1925
  Relay_log_info *rli= &mi->rli;
 
1926
  char llbuff[22];
 
1927
  uint32_t retry_count;
 
1928
  bool suppress_warnings;
 
1929
  uint32_t retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
 
1930
  my_thread_init();
 
1931
 
 
1932
  assert(mi->inited);
 
1933
  drizzle= NULL ;
 
1934
  retry_count= 0;
 
1935
 
 
1936
  pthread_mutex_lock(&mi->run_lock);
 
1937
  /* Inform waiting threads that slave has started */
 
1938
  mi->slave_run_id++;
 
1939
 
 
1940
  mi->events_till_disconnect = disconnect_slave_event_count;
 
1941
 
 
1942
  session= new Session;
 
1943
  Session_CHECK_SENTRY(session);
 
1944
  mi->io_session = session;
 
1945
 
 
1946
  pthread_detach_this_thread();
 
1947
  session->thread_stack= (char*) &session; // remember where our stack is
 
1948
  if (init_slave_thread(session, SLAVE_Session_IO))
 
1949
  {
 
1950
    pthread_cond_broadcast(&mi->start_cond);
 
1951
    pthread_mutex_unlock(&mi->run_lock);
 
1952
    sql_print_error(_("Failed during slave I/O thread initialization"));
 
1953
    goto err;
 
1954
  }
 
1955
  pthread_mutex_lock(&LOCK_thread_count);
 
1956
  threads.append(session);
 
1957
  pthread_mutex_unlock(&LOCK_thread_count);
 
1958
  mi->slave_running = 1;
 
1959
  mi->abort_slave = 0;
 
1960
  pthread_mutex_unlock(&mi->run_lock);
 
1961
  pthread_cond_broadcast(&mi->start_cond);
 
1962
 
 
1963
  if (!(mi->drizzle= drizzle = drizzle_create(NULL)))
 
1964
  {
 
1965
    mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
 
1966
               ER(ER_SLAVE_FATAL_ERROR), _("error in drizzle_create()"));
 
1967
    goto err;
 
1968
  }
 
1969
 
 
1970
  session->set_proc_info("Connecting to master");
 
1971
  // we can get killed during safe_connect
 
1972
  if (!safe_connect(session, drizzle, mi))
 
1973
  {
 
1974
    sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
 
1975
                            "replication started in log '%s' at position %s"),
 
1976
                          mi->getUsername(), mi->getHostname(), mi->getPort(),
 
1977
                          IO_RPL_LOG_NAME,
 
1978
                          llstr(mi->getLogPosition(), llbuff));
 
1979
  /*
 
1980
    Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
 
1981
    thread, since a replication event can become this much larger than
 
1982
    the corresponding packet (query) sent from client to master.
 
1983
  */
 
1984
    drizzle->net.max_packet_size= session->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
 
1985
  }
 
1986
  else
 
1987
  {
 
1988
    sql_print_information(_("Slave I/O thread killed while connecting to master"));
 
1989
    goto err;
 
1990
  }
 
1991
 
 
1992
connected:
 
1993
 
 
1994
  // TODO: the assignment below should be under mutex (5.0)
 
1995
  mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
 
1996
  session->slave_net = &drizzle->net;
 
1997
  session->set_proc_info("Checking master version");
 
1998
  if (get_master_version_and_clock(drizzle, mi))
 
1999
    goto err;
 
2000
  
 
2001
  if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
 
2002
  {
 
2003
    /*
 
2004
      Register ourselves with the master.
 
2005
    */
 
2006
    session->set_proc_info("Registering slave on master");
 
2007
    if (register_slave_on_master(drizzle, mi, &suppress_warnings))
 
2008
    {
 
2009
      if (!check_io_slave_killed(session, mi, "Slave I/O thread killed "
 
2010
                                 "while registering slave on master"))
 
2011
      {
 
2012
        sql_print_error(_("Slave I/O thread couldn't register on master"));
 
2013
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2014
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
 
2015
          goto err;
 
2016
      }
 
2017
      else
 
2018
        goto err;
 
2019
      goto connected;
 
2020
    }
 
2021
    if (!retry_count_reg)
 
2022
    {
 
2023
      retry_count_reg++;
 
2024
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2025
      if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2026
                           reconnect_messages[SLAVE_RECON_ACT_REG]))
 
2027
        goto err;
 
2028
      goto connected;
 
2029
    }
 
2030
  }
 
2031
 
 
2032
  while (!io_slave_killed(session,mi))
 
2033
  {
 
2034
    session->set_proc_info("Requesting binlog dump");
 
2035
    if (request_dump(drizzle, mi, &suppress_warnings))
 
2036
    {
 
2037
      sql_print_error(_("Failed on request_dump()"));
 
2038
      if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while \
 
2039
requesting master dump")) ||
 
2040
          try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2041
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
 
2042
        goto err;
 
2043
      goto connected;
 
2044
    }
 
2045
    if (!retry_count_dump)
 
2046
    {
 
2047
      retry_count_dump++;
 
2048
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2049
      if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2050
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
 
2051
        goto err;
 
2052
      goto connected;
 
2053
    }
 
2054
 
 
2055
    while (!io_slave_killed(session,mi))
 
2056
    {
 
2057
      uint32_t event_len;
 
2058
      /*
 
2059
        We say "waiting" because read_event() will wait if there's nothing to
 
2060
        read. But if there's something to read, it will not wait. The
 
2061
        important thing is to not confuse users by saying "reading" whereas
 
2062
        we're in fact receiving nothing.
 
2063
      */
 
2064
      session->set_proc_info(_("Waiting for master to send event"));
 
2065
      event_len= read_event(drizzle, mi, &suppress_warnings);
 
2066
      if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while "
 
2067
                                           "reading event")))
 
2068
        goto err;
 
2069
      if (!retry_count_event)
 
2070
      {
 
2071
        retry_count_event++;
 
2072
        sql_print_information(_("Forcing to reconnect slave I/O thread"));
 
2073
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2074
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
 
2075
          goto err;
 
2076
        goto connected;
 
2077
      }
 
2078
 
 
2079
      if (event_len == packet_error)
 
2080
      {
 
2081
        uint32_t drizzle_error_number= drizzle_errno(drizzle);
 
2082
        switch (drizzle_error_number) {
 
2083
        case CR_NET_PACKET_TOO_LARGE:
 
2084
          sql_print_error(_("Log entry on master is longer than "
 
2085
                            "max_allowed_packet (%ld) on "
 
2086
                            "slave. If the entry is correct, restart the "
 
2087
                            "server with a higher value of "
 
2088
                            "max_allowed_packet"),
 
2089
                          session->variables.max_allowed_packet);
 
2090
          goto err;
 
2091
        case ER_MASTER_FATAL_ERROR_READING_BINLOG:
 
2092
          sql_print_error(ER(drizzle_error_number), drizzle_error_number,
 
2093
                          drizzle_error(drizzle));
 
2094
          goto err;
 
2095
        case EE_OUTOFMEMORY:
 
2096
        case ER_OUTOFMEMORY:
 
2097
          sql_print_error(
 
2098
       _("Stopping slave I/O thread due to out-of-memory error from master"));
 
2099
          goto err;
 
2100
        }
 
2101
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
 
2102
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
 
2103
          goto err;
 
2104
        goto connected;
 
2105
      } // if (event_len == packet_error)
 
2106
 
 
2107
      retry_count=0;                    // ok event, reset retry counter
 
2108
      session->set_proc_info(_("Queueing master event to the relay log"));
 
2109
      if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
 
2110
      {
 
2111
        goto err;
 
2112
      }
 
2113
      if (mi->flush())
 
2114
      {
 
2115
        sql_print_error(_("Failed to flush master info file"));
 
2116
        goto err;
 
2117
      }
 
2118
      /*
 
2119
        See if the relay logs take too much space.
 
2120
        We don't lock mi->rli.log_space_lock here; this dirty read saves time
 
2121
        and does not introduce any problem:
 
2122
        - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
 
2123
        the clean value is 0), then we are reading only one more event as we
 
2124
        should, and we'll block only at the next event. No big deal.
 
2125
        - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
 
2126
        the clean value is 1), then we are going into wait_for_relay_log_space()
 
2127
        for no reason, but this function will do a clean read, notice the clean
 
2128
        value and exit immediately.
 
2129
      */
 
2130
      if (rli->log_space_limit && rli->log_space_limit <
 
2131
          rli->log_space_total &&
 
2132
          !rli->ignore_log_space_limit)
 
2133
        if (wait_for_relay_log_space(rli))
 
2134
        {
 
2135
          sql_print_error(_("Slave I/O thread aborted while waiting for "
 
2136
                            "relay log space"));
 
2137
          goto err;
 
2138
        }
 
2139
    }
 
2140
  }
 
2141
 
 
2142
// error = 0;
 
2143
err:
 
2144
// print the current replication position
 
2145
  sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
 
2146
                          "position %s"),
 
2147
                        IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
 
2148
  pthread_mutex_lock(&LOCK_thread_count);
 
2149
  session->query = session->db = 0; // extra safety
 
2150
  session->query_length= session->db_length= 0;
 
2151
  pthread_mutex_unlock(&LOCK_thread_count);
 
2152
  if (drizzle)
 
2153
  {
 
2154
    /*
 
2155
      Here we need to clear the active VIO before closing the
 
2156
      connection with the master.  The reason is that Session::awake()
 
2157
      might be called from terminate_slave_thread() because somebody
 
2158
      issued a STOP SLAVE.  If that happends, the close_active_vio()
 
2159
      can be called in the middle of closing the VIO associated with
 
2160
      the 'mysql' object, causing a crash.
 
2161
    */
 
2162
    drizzle_close(drizzle);
 
2163
    mi->drizzle=0;
 
2164
  }
 
2165
  write_ignored_events_info_to_relay_log(session, mi);
 
2166
  session->set_proc_info(_("Waiting for slave mutex on exit"));
 
2167
  pthread_mutex_lock(&mi->run_lock);
 
2168
 
 
2169
  /* Forget the relay log's format */
 
2170
  delete mi->rli.relay_log.description_event_for_queue;
 
2171
  mi->rli.relay_log.description_event_for_queue= 0;
 
2172
  assert(session->net.buff != 0);
 
2173
  net_end(&session->net); // destructor will not free it, because net.vio is 0
 
2174
  close_thread_tables(session);
 
2175
  pthread_mutex_lock(&LOCK_thread_count);
 
2176
  Session_CHECK_SENTRY(session);
 
2177
  delete session;
 
2178
  pthread_mutex_unlock(&LOCK_thread_count);
 
2179
  mi->abort_slave= 0;
 
2180
  mi->slave_running= 0;
 
2181
  mi->io_session= 0;
 
2182
  /*
 
2183
    Note: the order of the two following calls (first broadcast, then unlock)
 
2184
    is important. Otherwise a killer_thread can execute between the calls and
 
2185
    delete the mi structure leading to a crash! (see BUG#25306 for details)
 
2186
   */ 
 
2187
  pthread_cond_broadcast(&mi->stop_cond);       // tell the world we are done
 
2188
  pthread_mutex_unlock(&mi->run_lock);
 
2189
  my_thread_end();
 
2190
  pthread_exit(0);
 
2191
  return(0);                               // Can't return anything here
 
2192
}
 
2193
 
 
2194
 
 
2195
/* Slave SQL Thread entry point */
 
2196
 
 
2197
pthread_handler_t handle_slave_sql(void *arg)
 
2198
{
 
2199
  Session *session;                     /* needs to be first for thread_stack */
 
2200
  char llbuff[22],llbuff1[22];
 
2201
 
 
2202
  Relay_log_info* rli = &((Master_info*)arg)->rli;
 
2203
  const char *errmsg;
 
2204
 
 
2205
  my_thread_init();
 
2206
 
 
2207
  assert(rli->inited);
 
2208
  pthread_mutex_lock(&rli->run_lock);
 
2209
  assert(!rli->slave_running);
 
2210
  errmsg= 0;
 
2211
  rli->events_till_abort = abort_slave_event_count;
 
2212
 
 
2213
  session = new Session;
 
2214
  session->thread_stack = (char*)&session; // remember where our stack is
 
2215
  rli->sql_session= session;
 
2216
  
 
2217
  /* Inform waiting threads that slave has started */
 
2218
  rli->slave_run_id++;
 
2219
  rli->slave_running = 1;
 
2220
 
 
2221
  pthread_detach_this_thread();
 
2222
  if (init_slave_thread(session, SLAVE_Session_SQL))
 
2223
  {
 
2224
    /*
 
2225
      TODO: this is currently broken - slave start and change master
 
2226
      will be stuck if we fail here
 
2227
    */
 
2228
    pthread_cond_broadcast(&rli->start_cond);
 
2229
    pthread_mutex_unlock(&rli->run_lock);
 
2230
    sql_print_error(_("Failed during slave thread initialization"));
 
2231
    goto err;
 
2232
  }
 
2233
  session->init_for_queries();
 
2234
  session->temporary_tables = rli->save_temporary_tables; // restore temp tables
 
2235
  pthread_mutex_lock(&LOCK_thread_count);
 
2236
  threads.append(session);
 
2237
  pthread_mutex_unlock(&LOCK_thread_count);
 
2238
  /*
 
2239
    We are going to set slave_running to 1. Assuming slave I/O thread is
 
2240
    alive and connected, this is going to make Seconds_Behind_Master be 0
 
2241
    i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
 
2242
    the moment we start we can think we are caught up, and the next second we
 
2243
    start receiving data so we realize we are not caught up and
 
2244
    Seconds_Behind_Master grows. No big deal.
 
2245
  */
 
2246
  rli->abort_slave = 0;
 
2247
  pthread_mutex_unlock(&rli->run_lock);
 
2248
  pthread_cond_broadcast(&rli->start_cond);
 
2249
 
 
2250
  /*
 
2251
    Reset errors for a clean start (otherwise, if the master is idle, the SQL
 
2252
    thread may execute no Query_log_event, so the error will remain even
 
2253
    though there's no problem anymore). Do not reset the master timestamp
 
2254
    (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
 
2255
    as we are not sure that we are going to receive a query, we want to
 
2256
    remember the last master timestamp (to say how many seconds behind we are
 
2257
    now.
 
2258
    But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
 
2259
  */
 
2260
  rli->clear_error();
 
2261
 
 
2262
  //tell the I/O thread to take relay_log_space_limit into account from now on
 
2263
  pthread_mutex_lock(&rli->log_space_lock);
 
2264
  rli->ignore_log_space_limit= 0;
 
2265
  pthread_mutex_unlock(&rli->log_space_lock);
 
2266
  rli->trans_retries= 0; // start from "no error"
 
2267
 
 
2268
  if (init_relay_log_pos(rli,
 
2269
                         rli->group_relay_log_name.c_str(),
 
2270
                         rli->group_relay_log_pos,
 
2271
                         1 /*need data lock*/, &errmsg,
 
2272
                         1 /*look for a description_event*/))
 
2273
  {
 
2274
    sql_print_error(_("Error initializing relay log position: %s"),
 
2275
                    errmsg);
 
2276
    goto err;
 
2277
  }
 
2278
  Session_CHECK_SENTRY(session);
 
2279
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
 
2280
  /*
 
2281
    Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
 
2282
    correct position when it's called just after my_b_seek() (the questionable
 
2283
    stuff is those "seek is done on next read" comments in the my_b_seek()
 
2284
    source code).
 
2285
    The crude reality is that this assertion randomly fails whereas
 
2286
    replication seems to work fine. And there is no easy explanation why it
 
2287
    fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
 
2288
    init_relay_log_pos() called above). Maybe the assertion would be
 
2289
    meaningful if we held rli->data_lock between the my_b_seek() and the
 
2290
    assert().
 
2291
  */
 
2292
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
 
2293
  assert(rli->sql_session == session);
 
2294
 
 
2295
  if (global_system_variables.log_warnings)
 
2296
    sql_print_information(_("Slave SQL thread initialized, "
 
2297
                            "starting replication in log '%s' at "
 
2298
                            "position %s, relay log '%s' position: %s"),
 
2299
                            RPL_LOG_NAME,
 
2300
                          llstr(rli->group_master_log_pos,llbuff),
 
2301
                          rli->group_relay_log_name.c_str(),
 
2302
                          llstr(rli->group_relay_log_pos,llbuff1));
 
2303
 
 
2304
  /* execute init_slave variable */
 
2305
  if (sys_init_slave.value_length)
 
2306
  {
 
2307
    execute_init_command(session, &sys_init_slave, &LOCK_sys_init_slave);
 
2308
    if (session->is_slave_error)
 
2309
    {
 
2310
      sql_print_error(_("Slave SQL thread aborted. "
 
2311
                        "Can't execute init_slave query"));
 
2312
      goto err;
 
2313
    }
 
2314
  }
 
2315
 
 
2316
  /*
 
2317
    First check until condition - probably there is nothing to execute. We
 
2318
    do not want to wait for next event in this case.
 
2319
  */
 
2320
  pthread_mutex_lock(&rli->data_lock);
 
2321
  if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
 
2322
      rli->is_until_satisfied(rli->group_master_log_pos))
 
2323
  {
 
2324
    char buf[22];
 
2325
    sql_print_information(_("Slave SQL thread stopped because it reached its"
 
2326
                            " UNTIL position %s"), llstr(rli->until_pos(), buf));
 
2327
    pthread_mutex_unlock(&rli->data_lock);
 
2328
    goto err;
 
2329
  }
 
2330
  pthread_mutex_unlock(&rli->data_lock);
 
2331
 
 
2332
  /* Read queries from the IO/THREAD until this thread is killed */
 
2333
 
 
2334
  while (!sql_slave_killed(session,rli))
 
2335
  {
 
2336
    session->set_proc_info(_("Reading event from the relay log"));
 
2337
    assert(rli->sql_session == session);
 
2338
    Session_CHECK_SENTRY(session);
 
2339
    if (exec_relay_log_event(session,rli))
 
2340
    {
 
2341
      // do not scare the user if SQL thread was simply killed or stopped
 
2342
      if (!sql_slave_killed(session,rli))
 
2343
      {
 
2344
        /*
 
2345
          retrieve as much info as possible from the session and, error
 
2346
          codes and warnings and print this to the error log as to
 
2347
          allow the user to locate the error
 
2348
        */
 
2349
        uint32_t const last_errno= rli->last_error().number;
 
2350
 
 
2351
        if (session->is_error())
 
2352
        {
 
2353
          char const *const errmsg= session->main_da.message();
 
2354
 
 
2355
          if (last_errno == 0)
 
2356
          {
 
2357
            rli->report(ERROR_LEVEL, session->main_da.sql_errno(), "%s", errmsg);
 
2358
          }
 
2359
          else if (last_errno != session->main_da.sql_errno())
 
2360
          {
 
2361
            sql_print_error(_("Slave (additional info): %s Error_code: %d"),
 
2362
                            errmsg, session->main_da.sql_errno());
 
2363
          }
 
2364
        }
 
2365
 
 
2366
        /* Print any warnings issued */
 
2367
        List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
 
2368
        DRIZZLE_ERROR *err;
 
2369
        /*
 
2370
          Added controlled slave thread cancel for replication
 
2371
          of user-defined variables.
 
2372
        */
 
2373
        bool udf_error = false;
 
2374
        while ((err= it++))
 
2375
        {
 
2376
          if (err->code == ER_CANT_OPEN_LIBRARY)
 
2377
            udf_error = true;
 
2378
          sql_print_warning(_("Slave: %s Error_code: %d"),err->msg, err->code);
 
2379
        }
 
2380
        if (udf_error)
 
2381
          sql_print_error(_("Error loading user-defined library, slave SQL "
 
2382
                            "thread aborted. Install the missing library, "
 
2383
                            "and restart the slave SQL thread with "
 
2384
                            "\"SLAVE START\". We stopped at log '%s' "
 
2385
                            "position %s"),
 
2386
                          RPL_LOG_NAME, llstr(rli->group_master_log_pos,
 
2387
            llbuff));
 
2388
        else
 
2389
          sql_print_error(_("Error running query, slave SQL thread aborted. "
 
2390
                            "Fix the problem, and restart "
 
2391
                            "the slave SQL thread with \"SLAVE START\". "
 
2392
                            "We stopped at log '%s' position %s"),
 
2393
                          RPL_LOG_NAME,
 
2394
                          llstr(rli->group_master_log_pos, llbuff));
 
2395
      }
 
2396
      goto err;
 
2397
    }
 
2398
  }
 
2399
 
 
2400
  /* Thread stopped. Print the current replication position to the log */
 
2401
  sql_print_information(_("Slave SQL thread exiting, replication stopped in "
 
2402
                          "log '%s' at position %s"),
 
2403
                        RPL_LOG_NAME,
 
2404
                        llstr(rli->group_master_log_pos,llbuff));
 
2405
 
 
2406
 err:
 
2407
 
 
2408
  /*
 
2409
    Some events set some playgrounds, which won't be cleared because thread
 
2410
    stops. Stopping of this thread may not be known to these events ("stop"
 
2411
    request is detected only by the present function, not by events), so we
 
2412
    must "proactively" clear playgrounds:
 
2413
  */
 
2414
  rli->cleanup_context(session, 1);
 
2415
  pthread_mutex_lock(&LOCK_thread_count);
 
2416
  /*
 
2417
    Some extra safety, which should not been needed (normally, event deletion
 
2418
    should already have done these assignments (each event which sets these
 
2419
    variables is supposed to set them to 0 before terminating)).
 
2420
  */
 
2421
  session->query= session->db= session->catalog= 0;
 
2422
  session->query_length= session->db_length= 0;
 
2423
  pthread_mutex_unlock(&LOCK_thread_count);
 
2424
  session->set_proc_info("Waiting for slave mutex on exit");
 
2425
  pthread_mutex_lock(&rli->run_lock);
 
2426
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
 
2427
  pthread_mutex_lock(&rli->data_lock);
 
2428
  assert(rli->slave_running == 1); // tracking buffer overrun
 
2429
  /* When master_pos_wait() wakes up it will check this and terminate */
 
2430
  rli->slave_running= 0;
 
2431
  /* Forget the relay log's format */
 
2432
  delete rli->relay_log.description_event_for_exec;
 
2433
  rli->relay_log.description_event_for_exec= 0;
 
2434
  /* Wake up master_pos_wait() */
 
2435
  pthread_mutex_unlock(&rli->data_lock);
 
2436
  pthread_cond_broadcast(&rli->data_cond);
 
2437
  rli->ignore_log_space_limit= 0; /* don't need any lock */
 
2438
  rli->save_temporary_tables = session->temporary_tables;
 
2439
 
 
2440
  /*
 
2441
    TODO: see if we can do this conditionally in next_event() instead
 
2442
    to avoid unneeded position re-init
 
2443
  */
 
2444
  session->temporary_tables = 0; // remove tempation from destructor to close them
 
2445
  assert(session->net.buff != 0);
 
2446
  net_end(&session->net); // destructor will not free it, because we are weird
 
2447
  assert(rli->sql_session == session);
 
2448
  Session_CHECK_SENTRY(session);
 
2449
  rli->sql_session= 0;
 
2450
  pthread_mutex_lock(&LOCK_thread_count);
 
2451
  Session_CHECK_SENTRY(session);
 
2452
  delete session;
 
2453
  pthread_mutex_unlock(&LOCK_thread_count);
 
2454
 /*
 
2455
  Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
 
2456
  is important. Otherwise a killer_thread can execute between the calls and
 
2457
  delete the mi structure leading to a crash! (see BUG#25306 for details)
 
2458
 */ 
 
2459
  pthread_cond_broadcast(&rli->stop_cond);
 
2460
  pthread_mutex_unlock(&rli->run_lock);  // tell the world we are done
 
2461
  
 
2462
  my_thread_end();
 
2463
  pthread_exit(0);
 
2464
  return(0);                               // Can't return anything here
 
2465
}
 
2466
 
 
2467
 
 
2468
/*
 
2469
  process_io_create_file()
 
2470
*/
 
2471
 
 
2472
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev)
 
2473
{
 
2474
  int32_t error = 1;
 
2475
  uint32_t num_bytes;
 
2476
  bool cev_not_written;
 
2477
  Session *session = mi->io_session;
 
2478
  NET *net = &mi->drizzle->net;
 
2479
 
 
2480
  if (unlikely(!cev->is_valid()))
 
2481
    return(1);
 
2482
 
 
2483
  if (!rpl_filter->db_ok(cev->db))
 
2484
  {
 
2485
    skip_load_data_infile(net);
 
2486
    return(0);
 
2487
  }
 
2488
  assert(cev->inited_from_old);
 
2489
  session->file_id = cev->file_id = mi->file_id++;
 
2490
  session->server_id = cev->server_id;
 
2491
  cev_not_written = 1;
 
2492
 
 
2493
  if (unlikely(net_request_file(net,cev->fname)))
 
2494
  {
 
2495
    sql_print_error(_("Slave I/O: failed requesting download of '%s'"),
 
2496
                    cev->fname);
 
2497
    goto err;
 
2498
  }
 
2499
 
 
2500
  /*
 
2501
    This dummy block is so we could instantiate Append_block_log_event
 
2502
    once and then modify it slightly instead of doing it multiple times
 
2503
    in the loop
 
2504
  */
 
2505
  {
 
2506
    Append_block_log_event aev(session,0,0,0,0);
 
2507
 
 
2508
    for (;;)
 
2509
    {
 
2510
      if (unlikely((num_bytes=my_net_read(net)) == packet_error))
 
2511
      {
 
2512
        sql_print_error(_("Network read error downloading '%s' from master"),
 
2513
                        cev->fname);
 
2514
        goto err;
 
2515
      }
 
2516
      if (unlikely(!num_bytes)) /* eof */
 
2517
      {
 
2518
        /* 3.23 master wants it */
 
2519
        net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0);
 
2520
        /*
 
2521
          If we wrote Create_file_log_event, then we need to write
 
2522
          Execute_load_log_event. If we did not write Create_file_log_event,
 
2523
          then this is an empty file and we can just do as if the LOAD DATA
 
2524
          INFILE had not existed, i.e. write nothing.
 
2525
        */
 
2526
        if (unlikely(cev_not_written))
 
2527
          break;
 
2528
        Execute_load_log_event xev(session,0,0);
 
2529
        xev.log_pos = cev->log_pos;
 
2530
        if (unlikely(mi->rli.relay_log.append(&xev)))
 
2531
        {
 
2532
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2533
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2534
                     _("error writing Exec_load event to relay log"));
 
2535
          goto err;
 
2536
        }
 
2537
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
 
2538
        break;
 
2539
      }
 
2540
      if (unlikely(cev_not_written))
 
2541
      {
 
2542
        cev->block = net->read_pos;
 
2543
        cev->block_len = num_bytes;
 
2544
        if (unlikely(mi->rli.relay_log.append(cev)))
 
2545
        {
 
2546
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2547
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2548
                     _("error writing Create_file event to relay log"));
 
2549
          goto err;
 
2550
        }
 
2551
        cev_not_written=0;
 
2552
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
 
2553
      }
 
2554
      else
 
2555
      {
 
2556
        aev.block = net->read_pos;
 
2557
        aev.block_len = num_bytes;
 
2558
        aev.log_pos = cev->log_pos;
 
2559
        if (unlikely(mi->rli.relay_log.append(&aev)))
 
2560
        {
 
2561
          mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
 
2562
                     ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
 
2563
                     _("error writing Append_block event to relay log"));
 
2564
          goto err;
 
2565
        }
 
2566
        mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
 
2567
      }
 
2568
    }
 
2569
  }
 
2570
  error=0;
 
2571
err:
 
2572
  return(error);
 
2573
}
 
2574
 
 
2575
 
 
2576
/*
 
2577
  Start using a new binary log on the master
 
2578
 
 
2579
  SYNOPSIS
 
2580
    process_io_rotate()
 
2581
    mi                  master_info for the slave
 
2582
    rev                 The rotate log event read from the binary log
 
2583
 
 
2584
  DESCRIPTION
 
2585
    Updates the master info with the place in the next binary
 
2586
    log where we should start reading.
 
2587
    Rotate the relay log to avoid mixed-format relay logs.
 
2588
 
 
2589
  NOTES
 
2590
    We assume we already locked mi->data_lock
 
2591
 
 
2592
  RETURN VALUES
 
2593
    0           ok
 
2594
    1           Log event is illegal
 
2595
 
 
2596
*/
 
2597
 
 
2598
static int32_t process_io_rotate(Master_info *mi, Rotate_log_event *rev)
 
2599
{
 
2600
  safe_mutex_assert_owner(&mi->data_lock);
 
2601
 
 
2602
  if (unlikely(!rev->is_valid()))
 
2603
    return(1);
 
2604
 
 
2605
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
 
2606
  mi->setLogName(rev->new_log_ident);
 
2607
  mi->setLogPosition(rev->pos);
 
2608
  /*
 
2609
    If we do not do this, we will be getting the first
 
2610
    rotate event forever, so we need to not disconnect after one.
 
2611
  */
 
2612
  if (disconnect_slave_event_count)
 
2613
    mi->events_till_disconnect++;
 
2614
 
 
2615
  /*
 
2616
    If description_event_for_queue is format <4, there is conversion in the
 
2617
    relay log to the slave's format (4). And Rotate can mean upgrade or
 
2618
    nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
 
2619
    no need to reset description_event_for_queue now. And if it's nothing (same
 
2620
    master version as before), no need (still using the slave's format).
 
2621
  */
 
2622
  if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
 
2623
  {
 
2624
    delete mi->rli.relay_log.description_event_for_queue;
 
2625
    /* start from format 3 (DRIZZLE 4.0) again */
 
2626
    mi->rli.relay_log.description_event_for_queue= new
 
2627
      Format_description_log_event(3);
 
2628
  }
 
2629
  /*
 
2630
    Rotate the relay log makes binlog format detection easier (at next slave
 
2631
    start or mysqlbinlog)
 
2632
  */
 
2633
  rotate_relay_log(mi); /* will take the right mutexes */
 
2634
  return(0);
 
2635
}
 
2636
 
 
2637
/*
 
2638
  Reads a 3.23 event and converts it to the slave's format. This code was
 
2639
  copied from DRIZZLE 4.0.
 
2640
*/
 
2641
static int32_t queue_binlog_ver_1_event(Master_info *mi, const char *buf,
 
2642
                           uint32_t event_len)
 
2643
{
 
2644
  const char *errmsg = 0;
 
2645
  uint32_t inc_pos;
 
2646
  bool ignore_event= 0;
 
2647
  char *tmp_buf = 0;
 
2648
  Relay_log_info *rli= &mi->rli;
 
2649
 
 
2650
  /*
 
2651
    If we get Load event, we need to pass a non-reusable buffer
 
2652
    to read_log_event, so we do a trick
 
2653
  */
 
2654
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
 
2655
  {
 
2656
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
 
2657
    {
 
2658
      mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
 
2659
                 ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
 
2660
      return(1);
 
2661
    }
 
2662
    memcpy(tmp_buf,buf,event_len);
 
2663
    /*
 
2664
      Create_file constructor wants a 0 as last char of buffer, this 0 will
 
2665
      serve as the string-termination char for the file's name (which is at the
 
2666
      end of the buffer)
 
2667
      We must increment event_len, otherwise the event constructor will not see
 
2668
      this end 0, which leads to segfault.
 
2669
    */
 
2670
    tmp_buf[event_len++]=0;
 
2671
    int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
 
2672
    buf = (const char*)tmp_buf;
 
2673
  }
 
2674
  /*
 
2675
    This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
 
2676
    send the loaded file, and write it to the relay log in the form of
 
2677
    Append_block/Exec_load (the SQL thread needs the data, as that thread is not
 
2678
    connected to the master).
 
2679
  */
 
2680
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
 
2681
                                            mi->rli.relay_log.description_event_for_queue);
 
2682
  if (unlikely(!ev))
 
2683
  {
 
2684
    sql_print_error(_("Read invalid event from master: '%s', "
 
2685
                      "master could be corrupt but a more likely cause "
 
2686
                      "of this is a bug"),
 
2687
                    errmsg);
 
2688
    free((char*) tmp_buf);
 
2689
    return(1);
 
2690
  }
 
2691
 
 
2692
  pthread_mutex_lock(&mi->data_lock);
 
2693
  ev->log_pos= mi->getLogPosition(); /* 3.23 events don't contain log_pos */
 
2694
  switch (ev->get_type_code()) {
 
2695
  case STOP_EVENT:
 
2696
    ignore_event= 1;
 
2697
    inc_pos= event_len;
 
2698
    break;
 
2699
  case ROTATE_EVENT:
 
2700
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
 
2701
    {
 
2702
      delete ev;
 
2703
      pthread_mutex_unlock(&mi->data_lock);
 
2704
      return(1);
 
2705
    }
 
2706
    inc_pos= 0;
 
2707
    break;
 
2708
  case CREATE_FILE_EVENT:
 
2709
    /*
 
2710
      Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
 
2711
      queue_old_event() which is for 3.23 events which don't comprise
 
2712
      CREATE_FILE_EVENT. This is because read_log_event() above has just
 
2713
      transformed LOAD_EVENT into CREATE_FILE_EVENT.
 
2714
    */
 
2715
  {
 
2716
    /* We come here when and only when tmp_buf != 0 */
 
2717
    assert(tmp_buf != 0);
 
2718
    inc_pos=event_len;
 
2719
    ev->log_pos+= inc_pos;
 
2720
    int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
 
2721
    delete ev;
 
2722
    mi->incrementLogPosition(inc_pos);
 
2723
    pthread_mutex_unlock(&mi->data_lock);
 
2724
    free((char*)tmp_buf);
 
2725
    return(error);
 
2726
  }
 
2727
  default:
 
2728
    inc_pos= event_len;
 
2729
    break;
 
2730
  }
 
2731
  if (likely(!ignore_event))
 
2732
  {
 
2733
    if (ev->log_pos)
 
2734
      /*
 
2735
         Don't do it for fake Rotate events (see comment in
 
2736
      Log_event::Log_event(const char* buf...) in log_event.cc).
 
2737
      */
 
2738
      ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
 
2739
    if (unlikely(rli->relay_log.append(ev)))
 
2740
    {
 
2741
      delete ev;
 
2742
      pthread_mutex_unlock(&mi->data_lock);
 
2743
      return(1);
 
2744
    }
 
2745
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
2746
  }
 
2747
  delete ev;
 
2748
  mi->incrementLogPosition(inc_pos);
 
2749
  pthread_mutex_unlock(&mi->data_lock);
 
2750
  return(0);
 
2751
}
 
2752
 
 
2753
/*
 
2754
  Reads a 4.0 event and converts it to the slave's format. This code was copied
 
2755
  from queue_binlog_ver_1_event(), with some affordable simplifications.
 
2756
*/
 
2757
static int32_t queue_binlog_ver_3_event(Master_info *mi, const char *buf,
 
2758
                           uint32_t event_len)
 
2759
{
 
2760
  const char *errmsg = 0;
 
2761
  uint32_t inc_pos;
 
2762
  char *tmp_buf = 0;
 
2763
  Relay_log_info *rli= &mi->rli;
 
2764
 
 
2765
  /* read_log_event() will adjust log_pos to be end_log_pos */
 
2766
  Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
 
2767
                                            mi->rli.relay_log.description_event_for_queue);
 
2768
  if (unlikely(!ev))
 
2769
  {
 
2770
    sql_print_error(_("Read invalid event from master: '%s', "
 
2771
                      "master could be corrupt but a more likely cause of "
 
2772
                      "this is a bug"),
 
2773
                    errmsg);
 
2774
    free((char*) tmp_buf);
 
2775
    return(1);
 
2776
  }
 
2777
  pthread_mutex_lock(&mi->data_lock);
 
2778
  switch (ev->get_type_code()) {
 
2779
  case STOP_EVENT:
 
2780
    goto err;
 
2781
  case ROTATE_EVENT:
 
2782
    if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
 
2783
    {
 
2784
      delete ev;
 
2785
      pthread_mutex_unlock(&mi->data_lock);
 
2786
      return(1);
 
2787
    }
 
2788
    inc_pos= 0;
 
2789
    break;
 
2790
  default:
 
2791
    inc_pos= event_len;
 
2792
    break;
 
2793
  }
 
2794
  if (unlikely(rli->relay_log.append(ev)))
 
2795
  {
 
2796
    delete ev;
 
2797
    pthread_mutex_unlock(&mi->data_lock);
 
2798
    return(1);
 
2799
  }
 
2800
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
2801
  delete ev;
 
2802
  mi->incrementLogPosition(inc_pos);
 
2803
err:
 
2804
  pthread_mutex_unlock(&mi->data_lock);
 
2805
  return(0);
 
2806
}
 
2807
 
 
2808
/*
 
2809
  queue_old_event()
 
2810
 
 
2811
  Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
 
2812
  (exactly, slave's) format. To do the conversion, we create a 5.0 event from
 
2813
  the 3.23/4.0 bytes, then write this event to the relay log.
 
2814
 
 
2815
  TODO:
 
2816
    Test this code before release - it has to be tested on a separate
 
2817
    setup with 3.23 master or 4.0 master
 
2818
*/
 
2819
 
 
2820
static int32_t queue_old_event(Master_info *mi, const char *buf,
 
2821
                           uint32_t event_len)
 
2822
{
 
2823
  switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
 
2824
  {
 
2825
  case 1:
 
2826
      return(queue_binlog_ver_1_event(mi,buf,event_len));
 
2827
  case 3:
 
2828
      return(queue_binlog_ver_3_event(mi,buf,event_len));
 
2829
  default: /* unsupported format; eg version 2 */
 
2830
    return(1);
 
2831
  }
 
2832
}
 
2833
 
 
2834
/*
 
2835
  queue_event()
 
2836
 
 
2837
  If the event is 3.23/4.0, passes it to queue_old_event() which will convert
 
2838
  it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
 
2839
  no format conversion, it's pure read/write of bytes.
 
2840
  So a 5.0.0 slave's relay log can contain events in the slave's format or in
 
2841
  any >=5.0.0 format.
 
2842
*/
 
2843
 
 
2844
static int32_t queue_event(Master_info* mi,const char* buf, uint32_t event_len)
 
2845
{
 
2846
  int32_t error= 0;
 
2847
  String error_msg;
 
2848
  uint32_t inc_pos= 0;
 
2849
  Relay_log_info *rli= &mi->rli;
 
2850
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
 
2851
 
 
2852
 
 
2853
  if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
 
2854
      buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
 
2855
    return(queue_old_event(mi,buf,event_len));
 
2856
 
 
2857
  pthread_mutex_lock(&mi->data_lock);
 
2858
 
 
2859
  switch (buf[EVENT_TYPE_OFFSET]) {
 
2860
  case STOP_EVENT:
 
2861
    /*
 
2862
      We needn't write this event to the relay log. Indeed, it just indicates a
 
2863
      master server shutdown. The only thing this does is cleaning. But
 
2864
      cleaning is already done on a per-master-thread basis (as the master
 
2865
      server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
 
2866
      prepared statements' deletion are TODO only when we binlog prep stmts).
 
2867
 
 
2868
      We don't even increment mi->master_log_pos, because we may be just after
 
2869
      a Rotate event. Btw, in a few milliseconds we are going to have a Start
 
2870
      event from the next binlog (unless the master is presently running
 
2871
      without --log-bin).
 
2872
    */
 
2873
    goto err;
 
2874
  case ROTATE_EVENT:
 
2875
  {
 
2876
    Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
 
2877
    if (unlikely(process_io_rotate(mi,&rev)))
 
2878
    {
 
2879
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
2880
      goto err;
 
2881
    }
 
2882
    /*
 
2883
      Now the I/O thread has just changed its mi->master_log_name, so
 
2884
      incrementing mi->master_log_pos is nonsense.
 
2885
    */
 
2886
    inc_pos= 0;
 
2887
    break;
 
2888
  }
 
2889
  case FORMAT_DESCRIPTION_EVENT:
 
2890
  {
 
2891
    /*
 
2892
      Create an event, and save it (when we rotate the relay log, we will have
 
2893
      to write this event again).
 
2894
    */
 
2895
    /*
 
2896
      We are the only thread which reads/writes description_event_for_queue.
 
2897
      The relay_log struct does not move (though some members of it can
 
2898
      change), so we needn't any lock (no rli->data_lock, no log lock).
 
2899
    */
 
2900
    Format_description_log_event* tmp;
 
2901
    const char* errmsg;
 
2902
    if (!(tmp= (Format_description_log_event*)
 
2903
          Log_event::read_log_event(buf, event_len, &errmsg,
 
2904
                                    mi->rli.relay_log.description_event_for_queue)))
 
2905
    {
 
2906
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
2907
      goto err;
 
2908
    }
 
2909
    delete mi->rli.relay_log.description_event_for_queue;
 
2910
    mi->rli.relay_log.description_event_for_queue= tmp;
 
2911
    /*
 
2912
       Though this does some conversion to the slave's format, this will
 
2913
       preserve the master's binlog format version, and number of event types.
 
2914
    */
 
2915
    /*
 
2916
       If the event was not requested by the slave (the slave did not ask for
 
2917
       it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
 
2918
    */
 
2919
    inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
 
2920
  }
 
2921
  break;
 
2922
 
 
2923
  case HEARTBEAT_LOG_EVENT:
 
2924
  {
 
2925
    /*
 
2926
      HB (heartbeat) cannot come before RL (Relay)
 
2927
    */
 
2928
    char  llbuf[22];
 
2929
    Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
 
2930
    if (!hb.is_valid())
 
2931
    {
 
2932
      error= ER_SLAVE_HEARTBEAT_FAILURE;
 
2933
      error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
 
2934
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
 
2935
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
 
2936
      error_msg.append(STRING_WITH_LEN(" log_pos "));
 
2937
      llstr(hb.log_pos, llbuf);
 
2938
      error_msg.append(llbuf, strlen(llbuf));
 
2939
      goto err;
 
2940
    }
 
2941
    mi->received_heartbeats++;
 
2942
    /* 
 
2943
       compare local and event's versions of log_file, log_pos.
 
2944
       
 
2945
       Heartbeat is sent only after an event corresponding to the corrdinates
 
2946
       the heartbeat carries.
 
2947
       Slave can not have a difference in coordinates except in the only
 
2948
       special case when mi->master_log_name, master_log_pos have never
 
2949
       been updated by Rotate event i.e when slave does not have any history
 
2950
       with the master (and thereafter mi->master_log_pos is NULL).
 
2951
 
 
2952
       TODO: handling `when' for SHOW SLAVE STATUS' snds behind
 
2953
    */
 
2954
    if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
 
2955
        || mi->getLogPosition() != hb.log_pos)
 
2956
    {
 
2957
      /* missed events of heartbeat from the past */
 
2958
      error= ER_SLAVE_HEARTBEAT_FAILURE;
 
2959
      error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
 
2960
      error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
 
2961
      error_msg.append(hb.get_log_ident(), (uint32_t) strlen(hb.get_log_ident()));
 
2962
      error_msg.append(STRING_WITH_LEN(" log_pos "));
 
2963
      llstr(hb.log_pos, llbuf);
 
2964
      error_msg.append(llbuf, strlen(llbuf));
 
2965
      goto err;
 
2966
    }
 
2967
    goto skip_relay_logging;
 
2968
  }
 
2969
  break;
 
2970
    
 
2971
  default:
 
2972
    inc_pos= event_len;
 
2973
    break;
 
2974
  }
 
2975
 
 
2976
  /*
 
2977
     If this event is originating from this server, don't queue it.
 
2978
     We don't check this for 3.23 events because it's simpler like this; 3.23
 
2979
     will be filtered anyway by the SQL slave thread which also tests the
 
2980
     server id (we must also keep this test in the SQL thread, in case somebody
 
2981
     upgrades a 4.0 slave which has a not-filtered relay log).
 
2982
 
 
2983
     ANY event coming from ourselves can be ignored: it is obvious for queries;
 
2984
     for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
 
2985
     (--log-slave-updates would not log that) unless this slave is also its
 
2986
     direct master (an unsupported, useless setup!).
 
2987
  */
 
2988
 
 
2989
  pthread_mutex_lock(log_lock);
 
2990
 
 
2991
  if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
 
2992
      !mi->rli.replicate_same_server_id)
 
2993
  {
 
2994
    /*
 
2995
      Do not write it to the relay log.
 
2996
      a) We still want to increment mi->master_log_pos, so that we won't
 
2997
      re-read this event from the master if the slave IO thread is now
 
2998
      stopped/restarted (more efficient if the events we are ignoring are big
 
2999
      LOAD DATA INFILE).
 
3000
      b) We want to record that we are skipping events, for the information of
 
3001
      the slave SQL thread, otherwise that thread may let
 
3002
      rli->group_relay_log_pos stay too small if the last binlog's event is
 
3003
      ignored.
 
3004
      But events which were generated by this slave and which do not exist in
 
3005
      the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
 
3006
      mi->master_log_pos.
 
3007
    */
 
3008
    if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
 
3009
        buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
 
3010
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
 
3011
    {
 
3012
      mi->incrementLogPosition(inc_pos);
 
3013
      memcpy(rli->ign_master_log_name_end, mi->getLogName(), FN_REFLEN);
 
3014
      assert(rli->ign_master_log_name_end[0]);
 
3015
      rli->ign_master_log_pos_end= mi->getLogPosition();
 
3016
    }
 
3017
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
 
3018
  }
 
3019
  else
 
3020
  {
 
3021
    /* write the event to the relay log */
 
3022
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
 
3023
    {
 
3024
      mi->incrementLogPosition(inc_pos);
 
3025
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3026
    }
 
3027
    else
 
3028
    {
 
3029
      error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
 
3030
    }
 
3031
    rli->ign_master_log_name_end[0]= 0; // last event is not ignored
 
3032
  }
 
3033
  pthread_mutex_unlock(log_lock);
 
3034
 
 
3035
skip_relay_logging:
 
3036
  
 
3037
err:
 
3038
  pthread_mutex_unlock(&mi->data_lock);
 
3039
  if (error)
 
3040
    mi->report(ERROR_LEVEL, error, ER(error),
 
3041
               (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
 
3042
               _("could not queue event from master") :
 
3043
               error_msg.ptr());
 
3044
  return(error);
 
3045
}
 
3046
 
 
3047
 
 
3048
void end_relay_log_info(Relay_log_info* rli)
 
3049
{
 
3050
  if (!rli->inited)
 
3051
    return;
 
3052
  if (rli->info_fd >= 0)
 
3053
  {
 
3054
    end_io_cache(&rli->info_file);
 
3055
    (void) my_close(rli->info_fd, MYF(MY_WME));
 
3056
    rli->info_fd = -1;
 
3057
  }
 
3058
  if (rli->cur_log_fd >= 0)
 
3059
  {
 
3060
    end_io_cache(&rli->cache_buf);
 
3061
    (void)my_close(rli->cur_log_fd, MYF(MY_WME));
 
3062
    rli->cur_log_fd = -1;
 
3063
  }
 
3064
  rli->inited = 0;
 
3065
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
3066
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3067
  /*
 
3068
    Delete the slave's temporary tables from memory.
 
3069
    In the future there will be other actions than this, to ensure persistance
 
3070
    of slave's temp tables after shutdown.
 
3071
  */
 
3072
  rli->close_temporary_tables();
 
3073
  return;
 
3074
}
 
3075
 
 
3076
/*
 
3077
  Try to connect until successful or slave killed
 
3078
 
 
3079
  SYNPOSIS
 
3080
    safe_connect()
 
3081
    session                 Thread handler for slave
 
3082
    DRIZZLE               DRIZZLE connection handle
 
3083
    mi                  Replication handle
 
3084
 
 
3085
  RETURN
 
3086
    0   ok
 
3087
    #   Error
 
3088
*/
 
3089
 
 
3090
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi)
 
3091
{
 
3092
  return(connect_to_master(session, drizzle, mi, 0, 0));
 
3093
}
 
3094
 
 
3095
 
 
3096
/*
 
3097
  SYNPOSIS
 
3098
    connect_to_master()
 
3099
 
 
3100
  IMPLEMENTATION
 
3101
    Try to connect until successful or slave killed or we have retried
 
3102
    master_retry_count times
 
3103
*/
 
3104
 
 
3105
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
 
3106
                                 bool reconnect, bool suppress_warnings)
 
3107
{
 
3108
  int32_t slave_was_killed;
 
3109
  int32_t last_errno= -2;                           // impossible error
 
3110
  uint32_t err_count=0;
 
3111
  char llbuff[22];
 
3112
 
 
3113
  mi->events_till_disconnect = disconnect_slave_event_count;
 
3114
  uint32_t client_flag= CLIENT_REMEMBER_OPTIONS;
 
3115
  if (opt_slave_compressed_protocol)
 
3116
    client_flag=CLIENT_COMPRESS;                /* We will use compression */
 
3117
 
 
3118
  drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
 
3119
  drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
 
3120
 
 
3121
  while (!(slave_was_killed = io_slave_killed(session,mi)) &&
 
3122
         (reconnect ? drizzle_reconnect(drizzle) != 0 :
 
3123
          drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
 
3124
                             mi->getPort(), 0, client_flag) == 0))
 
3125
  {
 
3126
    /* Don't repeat last error */
 
3127
    if ((int32_t)drizzle_errno(drizzle) != last_errno)
 
3128
    {
 
3129
      last_errno=drizzle_errno(drizzle);
 
3130
      suppress_warnings= 0;
 
3131
      mi->report(ERROR_LEVEL, last_errno,
 
3132
                 _("error %s to master '%s@%s:%d'"
 
3133
                   " - retry-time: %d  retries: %u"),
 
3134
                 (reconnect ? _("reconnecting") : _("connecting")),
 
3135
                 mi->getUsername(), mi->getHostname(), mi->getPort(),
 
3136
                 mi->getConnectionRetry(), master_retry_count);
 
3137
    }
 
3138
    /*
 
3139
      By default we try forever. The reason is that failure will trigger
 
3140
      master election, so if the user did not set master_retry_count we
 
3141
      do not want to have election triggered on the first failure to
 
3142
      connect
 
3143
    */
 
3144
    if (++err_count == master_retry_count)
 
3145
    {
 
3146
      slave_was_killed=1;
 
3147
      break;
 
3148
    }
 
3149
    safe_sleep(session,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
 
3150
               (void*)mi);
 
3151
  }
 
3152
 
 
3153
  if (!slave_was_killed)
 
3154
  {
 
3155
    if (reconnect)
 
3156
    {
 
3157
      if (!suppress_warnings && global_system_variables.log_warnings)
 
3158
        sql_print_information(_("Slave: connected to master '%s@%s:%d', "
 
3159
                                "replication resumed in log '%s' at "
 
3160
                                "position %s"), mi->getUsername(),
 
3161
                                mi->getHostname(), mi->getPort(),
 
3162
                                IO_RPL_LOG_NAME,
 
3163
                                llstr(mi->getLogPosition(),llbuff));
 
3164
    }
 
3165
  }
 
3166
  drizzle->reconnect= 1;
 
3167
  return(slave_was_killed);
 
3168
}
 
3169
 
 
3170
 
 
3171
/*
 
3172
  safe_reconnect()
 
3173
 
 
3174
  IMPLEMENTATION
 
3175
    Try to connect until successful or slave killed or we have retried
 
3176
    master_retry_count times
 
3177
*/
 
3178
 
 
3179
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
 
3180
                          bool suppress_warnings)
 
3181
{
 
3182
  return(connect_to_master(session, drizzle, mi, 1, suppress_warnings));
 
3183
}
 
3184
 
 
3185
 
 
3186
/*
 
3187
  Store the file and position where the execute-slave thread are in the
 
3188
  relay log.
 
3189
 
 
3190
  SYNOPSIS
 
3191
    flush_relay_log_info()
 
3192
    rli                 Relay log information
 
3193
 
 
3194
  NOTES
 
3195
    - As this is only called by the slave thread, we don't need to
 
3196
      have a lock on this.
 
3197
    - If there is an active transaction, then we don't update the position
 
3198
      in the relay log.  This is to ensure that we re-execute statements
 
3199
      if we die in the middle of an transaction that was rolled back.
 
3200
    - As a transaction never spans binary logs, we don't have to handle the
 
3201
      case where we do a relay-log-rotation in the middle of the transaction.
 
3202
      If this would not be the case, we would have to ensure that we
 
3203
      don't delete the relay log file where the transaction started when
 
3204
      we switch to a new relay log file.
 
3205
 
 
3206
  TODO
 
3207
    - Change the log file information to a binary format to avoid calling
 
3208
      int64_t2str.
 
3209
 
 
3210
  RETURN VALUES
 
3211
    0   ok
 
3212
    1   write error
 
3213
*/
 
3214
 
 
3215
bool flush_relay_log_info(Relay_log_info* rli)
 
3216
{
 
3217
  bool error= 0;
 
3218
 
 
3219
  if (unlikely(rli->no_storage))
 
3220
    return(0);
 
3221
 
 
3222
  return(error);
 
3223
}
 
3224
 
 
3225
 
 
3226
/*
 
3227
  Called when we notice that the current "hot" log got rotated under our feet.
 
3228
*/
 
3229
 
 
3230
static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
 
3231
{
 
3232
  assert(rli->cur_log != &rli->cache_buf);
 
3233
  assert(rli->cur_log_fd == -1);
 
3234
 
 
3235
  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
 
3236
  if ((rli->cur_log_fd=open_binlog(cur_log, rli->event_relay_log_name.c_str(), errmsg)) < 0)
 
3237
    return(0);
 
3238
  /*
 
3239
    We want to start exactly where we was before:
 
3240
    relay_log_pos       Current log pos
 
3241
    pending             Number of bytes already processed from the event
 
3242
  */
 
3243
  rli->event_relay_log_pos= cmax(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
 
3244
  my_b_seek(cur_log,rli->event_relay_log_pos);
 
3245
  return(cur_log);
 
3246
}
 
3247
 
 
3248
 
 
3249
static Log_event* next_event(Relay_log_info* rli)
 
3250
{
 
3251
  Log_event* ev;
 
3252
  IO_CACHE* cur_log = rli->cur_log;
 
3253
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
 
3254
  const char* errmsg=0;
 
3255
  Session* session = rli->sql_session;
 
3256
 
 
3257
  assert(session != 0);
 
3258
 
 
3259
  if (abort_slave_event_count && !rli->events_till_abort--)
 
3260
    return(0);
 
3261
 
 
3262
  /*
 
3263
    For most operations we need to protect rli members with data_lock,
 
3264
    so we assume calling function acquired this mutex for us and we will
 
3265
    hold it for the most of the loop below However, we will release it
 
3266
    whenever it is worth the hassle,  and in the cases when we go into a
 
3267
    pthread_cond_wait() with the non-data_lock mutex
 
3268
  */
 
3269
  safe_mutex_assert_owner(&rli->data_lock);
 
3270
 
 
3271
  while (!sql_slave_killed(session,rli))
 
3272
  {
 
3273
    /*
 
3274
      We can have two kinds of log reading:
 
3275
      hot_log:
 
3276
        rli->cur_log points at the IO_CACHE of relay_log, which
 
3277
        is actively being updated by the I/O thread. We need to be careful
 
3278
        in this case and make sure that we are not looking at a stale log that
 
3279
        has already been rotated. If it has been, we reopen the log.
 
3280
 
 
3281
      The other case is much simpler:
 
3282
        We just have a read only log that nobody else will be updating.
 
3283
    */
 
3284
    bool hot_log;
 
3285
    if ((hot_log = (cur_log != &rli->cache_buf)))
 
3286
    {
 
3287
      assert(rli->cur_log_fd == -1); // foreign descriptor
 
3288
      pthread_mutex_lock(log_lock);
 
3289
 
 
3290
      /*
 
3291
        Reading xxx_file_id is safe because the log will only
 
3292
        be rotated when we hold relay_log.LOCK_log
 
3293
      */
 
3294
      if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
 
3295
      {
 
3296
        // The master has switched to a new log file; Reopen the old log file
 
3297
        cur_log=reopen_relay_log(rli, &errmsg);
 
3298
        pthread_mutex_unlock(log_lock);
 
3299
        if (!cur_log)                           // No more log files
 
3300
          goto err;
 
3301
        hot_log=0;                              // Using old binary log
 
3302
      }
 
3303
    }
 
3304
    /* 
 
3305
      As there is no guarantee that the relay is open (for example, an I/O
 
3306
      error during a write by the slave I/O thread may have closed it), we
 
3307
      have to test it.
 
3308
    */
 
3309
    if (!my_b_inited(cur_log))
 
3310
      goto err;
 
3311
    assert(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
 
3312
    assert(my_b_tell(cur_log) == rli->event_relay_log_pos);
 
3313
 
 
3314
    /*
 
3315
      Relay log is always in new format - if the master is 3.23, the
 
3316
      I/O thread will convert the format for us.
 
3317
      A problem: the description event may be in a previous relay log. So if
 
3318
      the slave has been shutdown meanwhile, we would have to look in old relay
 
3319
      logs, which may even have been deleted. So we need to write this
 
3320
      description event at the beginning of the relay log.
 
3321
      When the relay log is created when the I/O thread starts, easy: the
 
3322
      master will send the description event and we will queue it.
 
3323
      But if the relay log is created by new_file(): then the solution is:
 
3324
      DRIZZLE_BIN_LOG::open() will write the buffered description event.
 
3325
    */
 
3326
    if ((ev=Log_event::read_log_event(cur_log,0,
 
3327
                                      rli->relay_log.description_event_for_exec)))
 
3328
 
 
3329
    {
 
3330
      assert(session==rli->sql_session);
 
3331
      /*
 
3332
        read it while we have a lock, to avoid a mutex lock in
 
3333
        inc_event_relay_log_pos()
 
3334
      */
 
3335
      rli->future_event_relay_log_pos= my_b_tell(cur_log);
 
3336
      if (hot_log)
 
3337
        pthread_mutex_unlock(log_lock);
 
3338
      return(ev);
 
3339
    }
 
3340
    assert(session==rli->sql_session);
 
3341
    if (opt_reckless_slave)                     // For mysql-test
 
3342
      cur_log->error = 0;
 
3343
    if (cur_log->error < 0)
 
3344
    {
 
3345
      errmsg = "slave SQL thread aborted because of I/O error";
 
3346
      if (hot_log)
 
3347
        pthread_mutex_unlock(log_lock);
 
3348
      goto err;
 
3349
    }
 
3350
    if (!cur_log->error) /* EOF */
 
3351
    {
 
3352
      /*
 
3353
        On a hot log, EOF means that there are no more updates to
 
3354
        process and we must block until I/O thread adds some and
 
3355
        signals us to continue
 
3356
      */
 
3357
      if (hot_log)
 
3358
      {
 
3359
        /*
 
3360
          We say in Seconds_Behind_Master that we have "caught up". Note that
 
3361
          for example if network link is broken but I/O slave thread hasn't
 
3362
          noticed it (slave_net_timeout not elapsed), then we'll say "caught
 
3363
          up" whereas we're not really caught up. Fixing that would require
 
3364
          internally cutting timeout in smaller pieces in network read, no
 
3365
          thanks. Another example: SQL has caught up on I/O, now I/O has read
 
3366
          a new event and is queuing it; the false "0" will exist until SQL
 
3367
          finishes executing the new event; it will be look abnormal only if
 
3368
          the events have old timestamps (then you get "many", 0, "many").
 
3369
 
 
3370
          Transient phases like this can be fixed with implemeting
 
3371
          Heartbeat event which provides the slave the status of the
 
3372
          master at time the master does not have any new update to send.
 
3373
          Seconds_Behind_Master would be zero only when master has no
 
3374
          more updates in binlog for slave. The heartbeat can be sent
 
3375
          in a (small) fraction of slave_net_timeout. Until it's done
 
3376
          rli->last_master_timestamp is temporarely (for time of
 
3377
          waiting for the following event) reset whenever EOF is
 
3378
          reached.
 
3379
        */
 
3380
        time_t save_timestamp= rli->last_master_timestamp;
 
3381
        rli->last_master_timestamp= 0;
 
3382
 
 
3383
        assert(rli->relay_log.get_open_count() ==
 
3384
                    rli->cur_log_old_open_count);
 
3385
 
 
3386
        if (rli->ign_master_log_name_end[0])
 
3387
        {
 
3388
          /* We generate and return a Rotate, to make our positions advance */
 
3389
          ev= new Rotate_log_event(rli->ign_master_log_name_end,
 
3390
                                   0, rli->ign_master_log_pos_end,
 
3391
                                   Rotate_log_event::DUP_NAME);
 
3392
          rli->ign_master_log_name_end[0]= 0;
 
3393
          pthread_mutex_unlock(log_lock);
 
3394
          if (unlikely(!ev))
 
3395
          {
 
3396
            errmsg= "Slave SQL thread failed to create a Rotate event "
 
3397
              "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
 
3398
            goto err;
 
3399
          }
 
3400
          ev->server_id= 0; // don't be ignored by slave SQL thread
 
3401
          return(ev);
 
3402
        }
 
3403
 
 
3404
        /*
 
3405
          We can, and should release data_lock while we are waiting for
 
3406
          update. If we do not, show slave status will block
 
3407
        */
 
3408
        pthread_mutex_unlock(&rli->data_lock);
 
3409
 
 
3410
        /*
 
3411
          Possible deadlock :
 
3412
          - the I/O thread has reached log_space_limit
 
3413
          - the SQL thread has read all relay logs, but cannot purge for some
 
3414
          reason:
 
3415
            * it has already purged all logs except the current one
 
3416
            * there are other logs than the current one but they're involved in
 
3417
            a transaction that finishes in the current one (or is not finished)
 
3418
          Solution :
 
3419
          Wake up the possibly waiting I/O thread, and set a boolean asking
 
3420
          the I/O thread to temporarily ignore the log_space_limit
 
3421
          constraint, because we do not want the I/O thread to block because of
 
3422
          space (it's ok if it blocks for any other reason (e.g. because the
 
3423
          master does not send anything). Then the I/O thread stops waiting
 
3424
          and reads more events.
 
3425
          The SQL thread decides when the I/O thread should take log_space_limit
 
3426
          into account again : ignore_log_space_limit is reset to 0
 
3427
          in purge_first_log (when the SQL thread purges the just-read relay
 
3428
          log), and also when the SQL thread starts. We should also reset
 
3429
          ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
 
3430
          fact, no need as RESET SLAVE requires that the slave
 
3431
          be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
 
3432
          it stops.
 
3433
        */
 
3434
        pthread_mutex_lock(&rli->log_space_lock);
 
3435
        // prevent the I/O thread from blocking next times
 
3436
        rli->ignore_log_space_limit= 1;
 
3437
        /*
 
3438
          If the I/O thread is blocked, unblock it.  Ok to broadcast
 
3439
          after unlock, because the mutex is only destroyed in
 
3440
          ~Relay_log_info(), i.e. when rli is destroyed, and rli will
 
3441
          not be destroyed before we exit the present function.
 
3442
        */
 
3443
        pthread_mutex_unlock(&rli->log_space_lock);
 
3444
        pthread_cond_broadcast(&rli->log_space_cond);
 
3445
        // Note that wait_for_update_relay_log unlocks lock_log !
 
3446
        rli->relay_log.wait_for_update_relay_log(rli->sql_session);
 
3447
        // re-acquire data lock since we released it earlier
 
3448
        pthread_mutex_lock(&rli->data_lock);
 
3449
        rli->last_master_timestamp= save_timestamp;
 
3450
        continue;
 
3451
      }
 
3452
      /*
 
3453
        If the log was not hot, we need to move to the next log in
 
3454
        sequence. The next log could be hot or cold, we deal with both
 
3455
        cases separately after doing some common initialization
 
3456
      */
 
3457
      end_io_cache(cur_log);
 
3458
      assert(rli->cur_log_fd >= 0);
 
3459
      my_close(rli->cur_log_fd, MYF(MY_WME));
 
3460
      rli->cur_log_fd = -1;
 
3461
 
 
3462
      if (relay_log_purge)
 
3463
      {
 
3464
        /*
 
3465
          purge_first_log will properly set up relay log coordinates in rli.
 
3466
          If the group's coordinates are equal to the event's coordinates
 
3467
          (i.e. the relay log was not rotated in the middle of a group),
 
3468
          we can purge this relay log too.
 
3469
          We do uint64_t and string comparisons, this may be slow but
 
3470
          - purging the last relay log is nice (it can save 1GB of disk), so we
 
3471
          like to detect the case where we can do it, and given this,
 
3472
          - I see no better detection method
 
3473
          - purge_first_log is not called that often
 
3474
        */
 
3475
        if (rli->relay_log.purge_first_log
 
3476
            (rli,
 
3477
             rli->group_relay_log_pos == rli->event_relay_log_pos
 
3478
             && !strcmp(rli->group_relay_log_name.c_str(), rli->event_relay_log_name.c_str())))
 
3479
        {
 
3480
          errmsg = "Error purging processed logs";
 
3481
          goto err;
 
3482
        }
 
3483
      }
 
3484
      else
 
3485
      {
 
3486
        /*
 
3487
          If hot_log is set, then we already have a lock on
 
3488
          LOCK_log.  If not, we have to get the lock.
 
3489
 
 
3490
          According to Sasha, the only time this code will ever be executed
 
3491
          is if we are recovering from a bug.
 
3492
        */
 
3493
        if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
 
3494
        {
 
3495
          errmsg = "error switching to the next log";
 
3496
          goto err;
 
3497
        }
 
3498
        rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
 
3499
        rli->event_relay_log_name.assign(rli->linfo.log_file_name);
 
3500
        flush_relay_log_info(rli);
 
3501
      }
 
3502
 
 
3503
      /*
 
3504
        Now we want to open this next log. To know if it's a hot log (the one
 
3505
        being written by the I/O thread now) or a cold log, we can use
 
3506
        is_active(); if it is hot, we use the I/O cache; if it's cold we open
 
3507
        the file normally. But if is_active() reports that the log is hot, this
 
3508
        may change between the test and the consequence of the test. So we may
 
3509
        open the I/O cache whereas the log is now cold, which is nonsense.
 
3510
        To guard against this, we need to have LOCK_log.
 
3511
      */
 
3512
 
 
3513
      if (!hot_log) /* if hot_log, we already have this mutex */
 
3514
        pthread_mutex_lock(log_lock);
 
3515
      if (rli->relay_log.is_active(rli->linfo.log_file_name))
 
3516
      {
 
3517
#ifdef EXTRA_DEBUG
 
3518
        if (global_system_variables.log_warnings)
 
3519
          sql_print_information(_("next log '%s' is currently active"),
 
3520
                                rli->linfo.log_file_name);
 
3521
#endif
 
3522
        rli->cur_log= cur_log= rli->relay_log.get_log_file();
 
3523
        rli->cur_log_old_open_count= rli->relay_log.get_open_count();
 
3524
        assert(rli->cur_log_fd == -1);
 
3525
 
 
3526
        /*
 
3527
          Read pointer has to be at the start since we are the only
 
3528
          reader.
 
3529
          We must keep the LOCK_log to read the 4 first bytes, as this is a hot
 
3530
          log (same as when we call read_log_event() above: for a hot log we
 
3531
          take the mutex).
 
3532
        */
 
3533
        if (check_binlog_magic(cur_log,&errmsg))
 
3534
        {
 
3535
          if (!hot_log) pthread_mutex_unlock(log_lock);
 
3536
          goto err;
 
3537
        }
 
3538
        if (!hot_log) pthread_mutex_unlock(log_lock);
 
3539
        continue;
 
3540
      }
 
3541
      if (!hot_log) pthread_mutex_unlock(log_lock);
 
3542
      /*
 
3543
        if we get here, the log was not hot, so we will have to open it
 
3544
        ourselves. We are sure that the log is still not hot now (a log can get
 
3545
        from hot to cold, but not from cold to hot). No need for LOCK_log.
 
3546
      */
 
3547
#ifdef EXTRA_DEBUG
 
3548
      if (global_system_variables.log_warnings)
 
3549
        sql_print_information(_("next log '%s' is not active"),
 
3550
                              rli->linfo.log_file_name);
 
3551
#endif
 
3552
      // open_binlog() will check the magic header
 
3553
      if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
 
3554
                                       &errmsg)) <0)
 
3555
        goto err;
 
3556
    }
 
3557
    else
 
3558
    {
 
3559
      /*
 
3560
        Read failed with a non-EOF error.
 
3561
        TODO: come up with something better to handle this error
 
3562
      */
 
3563
      if (hot_log)
 
3564
        pthread_mutex_unlock(log_lock);
 
3565
      sql_print_error(_("Slave SQL thread: I/O error reading "
 
3566
                        "event(errno: %d  cur_log->error: %d)"),
 
3567
                      my_errno,cur_log->error);
 
3568
      // set read position to the beginning of the event
 
3569
      my_b_seek(cur_log,rli->event_relay_log_pos);
 
3570
      /* otherwise, we have had a partial read */
 
3571
      errmsg = _("Aborting slave SQL thread because of partial event read");
 
3572
      break;                                    // To end of function
 
3573
    }
 
3574
  }
 
3575
  if (!errmsg && global_system_variables.log_warnings)
 
3576
  {
 
3577
    sql_print_information(_("Error reading relay log event: %s"),
 
3578
                          _("slave SQL thread was killed"));
 
3579
    return(0);
 
3580
  }
 
3581
 
 
3582
err:
 
3583
  if (errmsg)
 
3584
    sql_print_error(_("Error reading relay log event: %s"), errmsg);
 
3585
  return(0);
 
3586
}
 
3587
 
 
3588
/*
 
3589
  Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
 
3590
  because of size is simpler because when we do it we already have all relevant
 
3591
  locks; here we don't, so this function is mainly taking locks).
 
3592
  Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
 
3593
  is void).
 
3594
*/
 
3595
 
 
3596
void rotate_relay_log(Master_info* mi)
 
3597
{
 
3598
  Relay_log_info* rli= &mi->rli;
 
3599
 
 
3600
  /* We don't lock rli->run_lock. This would lead to deadlocks. */
 
3601
  pthread_mutex_lock(&mi->run_lock);
 
3602
 
 
3603
  /*
 
3604
     We need to test inited because otherwise, new_file() will attempt to lock
 
3605
     LOCK_log, which may not be inited (if we're not a slave).
 
3606
  */
 
3607
  if (!rli->inited)
 
3608
  {
 
3609
    goto end;
 
3610
  }
 
3611
 
 
3612
  /* If the relay log is closed, new_file() will do nothing. */
 
3613
  rli->relay_log.new_file();
 
3614
 
 
3615
  /*
 
3616
    We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
 
3617
    be counted, so imagine a succession of FLUSH LOGS  and assume the slave
 
3618
    threads are started:
 
3619
    relay_log_space decreases by the size of the deleted relay log, but does
 
3620
    not increase, so flush-after-flush we may become negative, which is wrong.
 
3621
    Even if this will be corrected as soon as a query is replicated on the
 
3622
    slave (because the I/O thread will then call harvest_bytes_written() which
 
3623
    will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
 
3624
    output in SHOW SLAVE STATUS meanwhile. So we harvest now.
 
3625
    If the log is closed, then this will just harvest the last writes, probably
 
3626
    0 as they probably have been harvested.
 
3627
  */
 
3628
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
 
3629
end:
 
3630
  pthread_mutex_unlock(&mi->run_lock);
 
3631
  return;
 
3632
}
 
3633
 
 
3634
 
 
3635
/**
 
3636
   Detects, based on master's version (as found in the relay log), if master
 
3637
   has a certain bug.
 
3638
   @param rli Relay_log_info which tells the master's version
 
3639
   @param bug_id Number of the bug as found in bugs.mysql.com
 
3640
   @param report bool report error message, default TRUE
 
3641
   @return true if master has the bug, FALSE if it does not.
 
3642
*/
 
3643
bool rpl_master_has_bug(Relay_log_info *rli, uint32_t bug_id, bool report)
 
3644
{
 
3645
  struct st_version_range_for_one_bug {
 
3646
    uint32_t        bug_id;
 
3647
    const unsigned char introduced_in[3]; // first version with bug
 
3648
    const unsigned char fixed_in[3];      // first version with fix
 
3649
  };
 
3650
  static struct st_version_range_for_one_bug versions_for_all_bugs[]=
 
3651
  {
 
3652
    {24432, { 5, 0, 24 }, { 5, 0, 38 } },
 
3653
    {24432, { 5, 1, 12 }, { 5, 1, 17 } },
 
3654
    {33029, { 5, 0,  0 }, { 5, 0, 58 } },
 
3655
    {33029, { 5, 1,  0 }, { 5, 1, 12 } },
 
3656
  };
 
3657
  const unsigned char *master_ver=
 
3658
    rli->relay_log.description_event_for_exec->server_version_split;
 
3659
 
 
3660
  assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
 
3661
 
 
3662
  for (uint32_t i= 0;
 
3663
       i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
 
3664
  {
 
3665
    const unsigned char *introduced_in= versions_for_all_bugs[i].introduced_in,
 
3666
      *fixed_in= versions_for_all_bugs[i].fixed_in;
 
3667
    if ((versions_for_all_bugs[i].bug_id == bug_id) &&
 
3668
        (memcmp(introduced_in, master_ver, 3) <= 0) &&
 
3669
        (memcmp(fixed_in,      master_ver, 3) >  0))
 
3670
    {
 
3671
      if (!report)
 
3672
        return true;
 
3673
 
 
3674
      // a short message for SHOW SLAVE STATUS (message length constraints)
 
3675
      my_printf_error(ER_UNKNOWN_ERROR,
 
3676
                      _("master may suffer from"
 
3677
                        " http://bugs.mysql.com/bug.php?id=%u"
 
3678
                        " so slave stops; check error log on slave"
 
3679
                        " for more info"), MYF(0), bug_id);
 
3680
      // a verbose message for the error log
 
3681
      rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
 
3682
                  _("According to the master's version ('%s'),"
 
3683
                    " it is probable that master suffers from this bug:"
 
3684
                    " http://bugs.mysql.com/bug.php?id=%u"
 
3685
                    " and thus replicating the current binary log event"
 
3686
                    " may make the slave's data become different from the"
 
3687
                    " master's data."
 
3688
                    " To take no risk, slave refuses to replicate"
 
3689
                    " this event and stops."
 
3690
                    " We recommend that all updates be stopped on the"
 
3691
                    " master and slave, that the data of both be"
 
3692
                    " manually synchronized,"
 
3693
                    " that master's binary logs be deleted,"
 
3694
                    " that master be upgraded to a version at least"
 
3695
                    " equal to '%d.%d.%d'. Then replication can be"
 
3696
                    " restarted."),
 
3697
                  rli->relay_log.description_event_for_exec->server_version,
 
3698
                  bug_id,
 
3699
                  fixed_in[0], fixed_in[1], fixed_in[2]);
 
3700
      return true;
 
3701
    }
 
3702
  }
 
3703
  return false;
 
3704
}
 
3705
 
 
3706
/**
 
3707
   BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
 
3708
   exclusive, if one statement in a SP generated AUTO_INCREMENT value
 
3709
   by the top statement, all statements after it would be considered
 
3710
   generated AUTO_INCREMENT value by the top statement, and a
 
3711
   erroneous INSERT_ID value might be associated with these statement,
 
3712
   which could cause duplicate entry error and stop the slave.
 
3713
 
 
3714
   Detect buggy master to work around.
 
3715
 */
 
3716
bool rpl_master_erroneous_autoinc(Session *session)
 
3717
{
 
3718
  if (active_mi && active_mi->rli.sql_session == session)
 
3719
  {
 
3720
    Relay_log_info *rli= &active_mi->rli;
 
3721
    return rpl_master_has_bug(rli, 33029, false);
 
3722
  }
 
3723
  return false;
 
3724
}
 
3725
 
 
3726
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
3727
template class I_List_iterator<i_string>;
 
3728
template class I_List_iterator<i_string_pair>;
 
3729
#endif
 
3730
 
 
3731
/**
 
3732
  @} (end of group Replication)
 
3733
*/