~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication/rli.cc

  • Committer: Monty Taylor
  • Date: 2008-11-16 06:29:53 UTC
  • mto: (584.1.9 devel)
  • mto: This revision was merged to the branch mainline in revision 589.
  • Revision ID: monty@inaugust.com-20081116062953-ivdltjmfe009b5fr
Moved stuff into item/

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <drizzled/server_includes.h>
 
17
#include <drizzled/replication/mi.h>
 
18
#include <drizzled/replication/rli.h>
 
19
// For check_binlog_magic
 
20
#include <drizzled/replication/replication.h>
 
21
#include <drizzled/replication/utility.h>
 
22
#include <drizzled/data_home.h>
 
23
#include <drizzled/sql_parse.h>
 
24
#include <drizzled/gettext.h>
 
25
 
 
26
#if TIME_WITH_SYS_TIME
 
27
# include <sys/time.h>
 
28
# include <time.h>
 
29
#else
 
30
# if HAVE_SYS_TIME_H
 
31
#  include <sys/time.h>
 
32
# else
 
33
#  include <time.h>
 
34
# endif
 
35
#endif
 
36
 
 
37
 
 
38
 
 
39
static int32_t count_relay_log_space(Relay_log_info* rli);
 
40
 
 
41
// Defined in slave.cc
 
42
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val);
 
43
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
 
44
                          const char *default_val);
 
45
 
 
46
 
 
47
Relay_log_info::Relay_log_info()
 
48
  :Slave_reporting_capability("SQL"),
 
49
   no_storage(false), replicate_same_server_id(::replicate_same_server_id),
 
50
   info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
 
51
#if defined(HAVE_purify) && HAVE_purify
 
52
   is_fake(false),
 
53
#endif
 
54
   cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
 
55
   group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
 
56
   last_master_timestamp(0), slave_skip_counter(0),
 
57
   abort_pos_wait(0), slave_run_id(0), sql_session(0),
 
58
   inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
 
59
   until_log_pos(0), retried_trans(0),
 
60
   tables_to_lock(0), tables_to_lock_count(0),
 
61
   last_event_start_time(0), m_flags(0)
 
62
{
 
63
  group_relay_log_name[0]= event_relay_log_name[0]=
 
64
    group_master_log_name[0]= 0;
 
65
  until_log_name[0]= ign_master_log_name_end[0]= 0;
 
66
  memset(&info_file, 0, sizeof(info_file));
 
67
  memset(&cache_buf, 0, sizeof(cache_buf));
 
68
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
 
69
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
 
70
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
 
71
  pthread_cond_init(&data_cond, NULL);
 
72
  pthread_cond_init(&start_cond, NULL);
 
73
  pthread_cond_init(&stop_cond, NULL);
 
74
  pthread_cond_init(&log_space_cond, NULL);
 
75
  relay_log.init_pthread_objects();
 
76
  return;
 
77
}
 
78
 
 
79
 
 
80
Relay_log_info::~Relay_log_info()
 
81
{
 
82
  pthread_mutex_destroy(&run_lock);
 
83
  pthread_mutex_destroy(&data_lock);
 
84
  pthread_mutex_destroy(&log_space_lock);
 
85
  pthread_cond_destroy(&data_cond);
 
86
  pthread_cond_destroy(&start_cond);
 
87
  pthread_cond_destroy(&stop_cond);
 
88
  pthread_cond_destroy(&log_space_cond);
 
89
  relay_log.cleanup();
 
90
  return;
 
91
}
 
92
 
 
93
 
 
94
int32_t init_relay_log_info(Relay_log_info* rli,
 
95
                            const char* info_fname)
 
96
{
 
97
  char fname[FN_REFLEN+128];
 
98
  int32_t info_fd;
 
99
  const char* msg = 0;
 
100
  int32_t error = 0;
 
101
  assert(!rli->no_storage);         // Don't init if there is no storage
 
102
 
 
103
  if (rli->inited)                       // Set if this function called
 
104
    return(0);
 
105
  fn_format(fname, info_fname, drizzle_data_home, "", 4+32);
 
106
  pthread_mutex_lock(&rli->data_lock);
 
107
  info_fd = rli->info_fd;
 
108
  rli->cur_log_fd = -1;
 
109
  rli->slave_skip_counter=0;
 
110
  rli->abort_pos_wait=0;
 
111
  rli->log_space_limit= relay_log_space_limit;
 
112
  rli->log_space_total= 0;
 
113
  rli->tables_to_lock= 0;
 
114
  rli->tables_to_lock_count= 0;
 
115
 
 
116
  /*
 
117
    The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
 
118
    Note that the I/O thread flushes it to disk after writing every
 
119
    event, in flush_master_info(mi, 1).
 
120
  */
 
121
 
 
122
  /*
 
123
    For the maximum log size, we choose max_relay_log_size if it is
 
124
    non-zero, max_binlog_size otherwise. If later the user does SET
 
125
    GLOBAL on one of these variables, fix_max_binlog_size and
 
126
    fix_max_relay_log_size will reconsider the choice (for example
 
127
    if the user changes max_relay_log_size to zero, we have to
 
128
    switch to using max_binlog_size for the relay log) and update
 
129
    rli->relay_log.max_size (and mysql_bin_log.max_size).
 
130
  */
 
131
  {
 
132
    char buf[FN_REFLEN];
 
133
    const char *ln;
 
134
    static bool name_warning_sent= 0;
 
135
    ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
 
136
                                     1, buf);
 
137
    /* We send the warning only at startup, not after every RESET SLAVE */
 
138
    if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
 
139
    {
 
140
      /*
 
141
        User didn't give us info to name the relay log index file.
 
142
        Picking `hostname`-relay-bin.index like we do, causes replication to
 
143
        fail if this slave's hostname is changed later. So, we would like to
 
144
        instead require a name. But as we don't want to break many existing
 
145
        setups, we only give warning, not error.
 
146
      */
 
147
      sql_print_warning(_("Neither --relay-log nor --relay-log-index were used;"
 
148
                        " so replication "
 
149
                        "may break when this MySQL server acts as a "
 
150
                        "slave and has his hostname changed!! Please "
 
151
                        "use '--relay-log=%s' to avoid this problem."), ln);
 
152
      name_warning_sent= 1;
 
153
    }
 
154
    /*
 
155
      note, that if open() fails, we'll still have index file open
 
156
      but a destructor will take care of that
 
157
    */
 
158
    if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln) ||
 
159
        rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
 
160
                            (max_relay_log_size ? max_relay_log_size :
 
161
                            max_binlog_size), 1))
 
162
    {
 
163
      pthread_mutex_unlock(&rli->data_lock);
 
164
      sql_print_error(_("Failed in open_log() called from "
 
165
                        "init_relay_log_info()"));
 
166
      return(1);
 
167
    }
 
168
  }
 
169
 
 
170
  /* if file does not exist */
 
171
  if (access(fname,F_OK))
 
172
  {
 
173
    /* Create a new file */
 
174
  }
 
175
  else // file exists
 
176
  {
 
177
    /* Open up fname here and pull out the relay.info data */
 
178
  }
 
179
 
 
180
  /*
 
181
    Now change the cache from READ to WRITE - must do this
 
182
    before flush_relay_log_info
 
183
  */
 
184
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
 
185
  if ((error= flush_relay_log_info(rli)))
 
186
    sql_print_error(_("Failed to flush relay log info file"));
 
187
  if (count_relay_log_space(rli))
 
188
  {
 
189
    msg=_("Error counting relay log space");
 
190
    goto err;
 
191
  }
 
192
  rli->inited= 1;
 
193
  pthread_mutex_unlock(&rli->data_lock);
 
194
  return(error);
 
195
 
 
196
err:
 
197
  sql_print_error("%s",msg);
 
198
  end_io_cache(&rli->info_file);
 
199
  if (info_fd >= 0)
 
200
    my_close(info_fd, MYF(0));
 
201
  rli->info_fd= -1;
 
202
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
203
  pthread_mutex_unlock(&rli->data_lock);
 
204
  return(1);
 
205
}
 
206
 
 
207
 
 
208
static inline int32_t add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
 
209
{
 
210
  struct stat s;
 
211
  if (stat(linfo->log_file_name,&s))
 
212
  {
 
213
    sql_print_error(_("log %s listed in the index, but failed to stat"),
 
214
                    linfo->log_file_name);
 
215
    return(1);
 
216
  }
 
217
  rli->log_space_total += s.st_size;
 
218
  return(0);
 
219
}
 
220
 
 
221
 
 
222
static int32_t count_relay_log_space(Relay_log_info* rli)
 
223
{
 
224
  LOG_INFO linfo;
 
225
  rli->log_space_total= 0;
 
226
  if (rli->relay_log.find_log_pos(&linfo, NULL, 1))
 
227
  {
 
228
    sql_print_error(_("Could not find first log while counting relay "
 
229
                      "log space"));
 
230
    return(1);
 
231
  }
 
232
  do
 
233
  {
 
234
    if (add_relay_log(rli,&linfo))
 
235
      return(1);
 
236
  } while (!rli->relay_log.find_next_log(&linfo, 1));
 
237
  /*
 
238
     As we have counted everything, including what may have written in a
 
239
     preceding write, we must reset bytes_written, or we may count some space
 
240
     twice.
 
241
  */
 
242
  rli->relay_log.reset_bytes_written();
 
243
  return(0);
 
244
}
 
245
 
 
246
 
 
247
/*
 
248
   Reset UNTIL condition for Relay_log_info
 
249
 
 
250
   SYNOPSYS
 
251
    clear_until_condition()
 
252
      rli - Relay_log_info structure where UNTIL condition should be reset
 
253
 */
 
254
 
 
255
void Relay_log_info::clear_until_condition()
 
256
{
 
257
  until_condition= Relay_log_info::UNTIL_NONE;
 
258
  until_log_name[0]= 0;
 
259
  until_log_pos= 0;
 
260
  return;
 
261
}
 
262
 
 
263
 
 
264
/*
 
265
  Open the given relay log
 
266
 
 
267
  SYNOPSIS
 
268
    init_relay_log_pos()
 
269
    rli                 Relay information (will be initialized)
 
270
    log                 Name of relay log file to read from. NULL = First log
 
271
    pos                 Position in relay log file
 
272
    need_data_lock      Set to 1 if this functions should do mutex locks
 
273
    errmsg              Store pointer to error message here
 
274
    look_for_description_event
 
275
                        1 if we should look for such an event. We only need
 
276
                        this when the SQL thread starts and opens an existing
 
277
                        relay log and has to execute it (possibly from an
 
278
                        offset >4); then we need to read the first event of
 
279
                        the relay log to be able to parse the events we have
 
280
                        to execute.
 
281
 
 
282
  DESCRIPTION
 
283
  - Close old open relay log files.
 
284
  - If we are using the same relay log as the running IO-thread, then set
 
285
    rli->cur_log to point to the same IO_CACHE entry.
 
286
  - If not, open the 'log' binary file.
 
287
 
 
288
  TODO
 
289
    - check proper initialization of group_master_log_name/group_master_log_pos
 
290
 
 
291
  RETURN VALUES
 
292
    0   ok
 
293
    1   error.  errmsg is set to point to the error message
 
294
*/
 
295
 
 
296
int32_t init_relay_log_pos(Relay_log_info* rli,const char* log,
 
297
                       uint64_t pos, bool need_data_lock,
 
298
                       const char** errmsg,
 
299
                       bool look_for_description_event)
 
300
{
 
301
  *errmsg=0;
 
302
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
 
303
 
 
304
  if (need_data_lock)
 
305
    pthread_mutex_lock(&rli->data_lock);
 
306
 
 
307
  /*
 
308
    Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
 
309
    is, too, and init_slave() too; these 2 functions allocate a description
 
310
    event in init_relay_log_pos, which is not freed by the terminating SQL slave
 
311
    thread as that thread is not started by these functions. So we have to free
 
312
    the description_event here, in case, so that there is no memory leak in
 
313
    running, say, CHANGE MASTER.
 
314
  */
 
315
  delete rli->relay_log.description_event_for_exec;
 
316
  /*
 
317
    By default the relay log is in binlog format 3 (4.0).
 
318
    Even if format is 4, this will work enough to read the first event
 
319
    (Format_desc) (remember that format 4 is just lenghtened compared to format
 
320
    3; format 3 is a prefix of format 4).
 
321
  */
 
322
  rli->relay_log.description_event_for_exec= new
 
323
    Format_description_log_event(3);
 
324
 
 
325
  pthread_mutex_lock(log_lock);
 
326
 
 
327
  /* Close log file and free buffers if it's already open */
 
328
  if (rli->cur_log_fd >= 0)
 
329
  {
 
330
    end_io_cache(&rli->cache_buf);
 
331
    my_close(rli->cur_log_fd, MYF(MY_WME));
 
332
    rli->cur_log_fd = -1;
 
333
  }
 
334
 
 
335
  rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
 
336
 
 
337
  /*
 
338
    Test to see if the previous run was with the skip of purging
 
339
    If yes, we do not purge when we restart
 
340
  */
 
341
  if (rli->relay_log.find_log_pos(&rli->linfo, NULL, 1))
 
342
  {
 
343
    *errmsg="Could not find first log during relay log initialization";
 
344
    goto err;
 
345
  }
 
346
 
 
347
  if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
 
348
  {
 
349
    *errmsg="Could not find target log during relay log initialization";
 
350
    goto err;
 
351
  }
 
352
 
 
353
  rli->group_relay_log_name.assign(rli->linfo.log_file_name);
 
354
  rli->event_relay_log_name.assign(rli->linfo.log_file_name);
 
355
 
 
356
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
 
357
  {
 
358
    /*
 
359
      The IO thread is using this log file.
 
360
      In this case, we will use the same IO_CACHE pointer to
 
361
      read data as the IO thread is using to write data.
 
362
    */
 
363
    my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
 
364
    if (check_binlog_magic(rli->cur_log,errmsg))
 
365
      goto err;
 
366
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
 
367
  }
 
368
  else
 
369
  {
 
370
    /*
 
371
      Open the relay log and set rli->cur_log to point at this one
 
372
    */
 
373
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
 
374
                                     rli->linfo.log_file_name,errmsg)) < 0)
 
375
      goto err;
 
376
    rli->cur_log = &rli->cache_buf;
 
377
  }
 
378
  /*
 
379
    In all cases, check_binlog_magic() has been called so we're at offset 4 for
 
380
    sure.
 
381
  */
 
382
  if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
 
383
  {
 
384
    Log_event* ev;
 
385
    while (look_for_description_event)
 
386
    {
 
387
      /*
 
388
        Read the possible Format_description_log_event; if position
 
389
        was 4, no need, it will be read naturally.
 
390
      */
 
391
      if (my_b_tell(rli->cur_log) >= pos)
 
392
        break;
 
393
 
 
394
      /*
 
395
        Because of we have rli->data_lock and log_lock, we can safely read an
 
396
        event
 
397
      */
 
398
      if (!(ev=Log_event::read_log_event(rli->cur_log,0,
 
399
                                         rli->relay_log.description_event_for_exec)))
 
400
      {
 
401
        if (rli->cur_log->error) /* not EOF */
 
402
        {
 
403
          *errmsg= "I/O error reading event at position 4";
 
404
          goto err;
 
405
        }
 
406
        break;
 
407
      }
 
408
      else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
 
409
      {
 
410
        delete rli->relay_log.description_event_for_exec;
 
411
        rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
 
412
        /*
 
413
          As ev was returned by read_log_event, it has passed is_valid(), so
 
414
          my_malloc() in ctor worked, no need to check again.
 
415
        */
 
416
        /*
 
417
          Ok, we found a Format_description event. But it is not sure that this
 
418
          describes the whole relay log; indeed, one can have this sequence
 
419
          (starting from position 4):
 
420
          Format_desc (of slave)
 
421
          Rotate (of master)
 
422
          Format_desc (of master)
 
423
          So the Format_desc which really describes the rest of the relay log
 
424
          is the 3rd event (it can't be further than that, because we rotate
 
425
          the relay log when we queue a Rotate event from the master).
 
426
          But what describes the Rotate is the first Format_desc.
 
427
          So what we do is:
 
428
          go on searching for Format_description events, until you exceed the
 
429
          position (argument 'pos') or until you find another event than Rotate
 
430
          or Format_desc.
 
431
        */
 
432
      }
 
433
      else
 
434
      {
 
435
        look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
 
436
        delete ev;
 
437
      }
 
438
    }
 
439
    my_b_seek(rli->cur_log,(off_t)pos);
 
440
 
 
441
  }
 
442
 
 
443
err:
 
444
  /*
 
445
    If we don't purge, we can't honour relay_log_space_limit ;
 
446
    silently discard it
 
447
  */
 
448
  if (!relay_log_purge)
 
449
    rli->log_space_limit= 0;
 
450
  pthread_cond_broadcast(&rli->data_cond);
 
451
 
 
452
  pthread_mutex_unlock(log_lock);
 
453
 
 
454
  if (need_data_lock)
 
455
    pthread_mutex_unlock(&rli->data_lock);
 
456
  if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
 
457
    *errmsg= "Invalid Format_description log event; could be out of memory";
 
458
 
 
459
  return ((*errmsg) ? 1 : 0);
 
460
}
 
461
 
 
462
 
 
463
/*
 
464
  Waits until the SQL thread reaches (has executed up to) the
 
465
  log/position or timed out.
 
466
 
 
467
  SYNOPSIS
 
468
    wait_for_pos()
 
469
    session             client thread that sent SELECT MASTER_POS_WAIT
 
470
    log_name        log name to wait for
 
471
    log_pos         position to wait for
 
472
    timeout         timeout in seconds before giving up waiting
 
473
 
 
474
  NOTES
 
475
    timeout is int64_t whereas it should be uint32_t ; but this is
 
476
    to catch if the user submitted a negative timeout.
 
477
 
 
478
  RETURN VALUES
 
479
    -2          improper arguments (log_pos<0)
 
480
                or slave not running, or master info changed
 
481
                during the function's execution,
 
482
                or client thread killed. -2 is translated to NULL by caller
 
483
    -1          timed out
 
484
    >=0         number of log events the function had to wait
 
485
                before reaching the desired log/position
 
486
 */
 
487
 
 
488
int32_t Relay_log_info::wait_for_pos(Session* session, String* log_name,
 
489
                                    int64_t log_pos,
 
490
                                    int64_t timeout)
 
491
{
 
492
  int32_t event_count = 0;
 
493
  uint32_t init_abort_pos_wait;
 
494
  int32_t error=0;
 
495
  struct timespec abstime; // for timeout checking
 
496
  const char *msg;
 
497
 
 
498
  if (!inited)
 
499
    return(-2);
 
500
 
 
501
  set_timespec(abstime,timeout);
 
502
  pthread_mutex_lock(&data_lock);
 
503
  msg= session->enter_cond(&data_cond, &data_lock,
 
504
                       "Waiting for the slave SQL thread to "
 
505
                       "advance position");
 
506
  /*
 
507
     This function will abort when it notices that some CHANGE MASTER or
 
508
     RESET MASTER has changed the master info.
 
509
     To catch this, these commands modify abort_pos_wait ; We just monitor
 
510
     abort_pos_wait and see if it has changed.
 
511
     Why do we have this mechanism instead of simply monitoring slave_running
 
512
     in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
 
513
     the SQL thread be stopped?
 
514
     This is becasue if someones does:
 
515
     STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
 
516
     the change may happen very quickly and we may not notice that
 
517
     slave_running briefly switches between 1/0/1.
 
518
  */
 
519
  init_abort_pos_wait= abort_pos_wait;
 
520
 
 
521
  /*
 
522
    We'll need to
 
523
    handle all possible log names comparisons (e.g. 999 vs 1000).
 
524
    We use uint32_t for string->number conversion ; this is no
 
525
    stronger limitation than in find_uniq_filename in sql/log.cc
 
526
  */
 
527
  uint32_t log_name_extension;
 
528
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
 
529
 
 
530
  strmake(log_name_tmp, log_name->ptr(), cmin(log_name->length(), (uint32_t)FN_REFLEN-1));
 
531
 
 
532
  char *p= fn_ext(log_name_tmp);
 
533
  char *p_end;
 
534
  if (!*p || log_pos<0)
 
535
  {
 
536
    error= -2; //means improper arguments
 
537
    goto err;
 
538
  }
 
539
  // Convert 0-3 to 4
 
540
  log_pos= cmax(log_pos, (int64_t)BIN_LOG_HEADER_SIZE);
 
541
  /* p points to '.' */
 
542
  log_name_extension= strtoul(++p, &p_end, 10);
 
543
  /*
 
544
    p_end points to the first invalid character.
 
545
    If it equals to p, no digits were found, error.
 
546
    If it contains '\0' it means conversion went ok.
 
547
  */
 
548
  if (p_end==p || *p_end)
 
549
  {
 
550
    error= -2;
 
551
    goto err;
 
552
  }
 
553
 
 
554
  /* The "compare and wait" main loop */
 
555
  while (!session->killed &&
 
556
         init_abort_pos_wait == abort_pos_wait &&
 
557
         slave_running)
 
558
  {
 
559
    bool pos_reached;
 
560
    int32_t cmp_result= 0;
 
561
 
 
562
    /*
 
563
      group_master_log_name can be "", if we are just after a fresh
 
564
      replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
 
565
      (before we have executed one Rotate event from the master) or
 
566
      (rare) if the user is doing a weird slave setup (see next
 
567
      paragraph).  If group_master_log_name is "", we assume we don't
 
568
      have enough info to do the comparison yet, so we just wait until
 
569
      more data. In this case master_log_pos is always 0 except if
 
570
      somebody (wrongly) sets this slave to be a slave of itself
 
571
      without using --replicate-same-server-id (an unsupported
 
572
      configuration which does nothing), then group_master_log_pos
 
573
      will grow and group_master_log_name will stay "".
 
574
    */
 
575
    if (group_master_log_name.length())
 
576
    {
 
577
      const char *basename= (group_master_log_name.c_str() +
 
578
                             dirname_length(group_master_log_name.c_str()));
 
579
      /*
 
580
        First compare the parts before the extension.
 
581
        Find the dot in the master's log basename,
 
582
        and protect against user's input error :
 
583
        if the names do not match up to '.' included, return error
 
584
      */
 
585
      char *q= (char*)(fn_ext(basename)+1);
 
586
      if (strncmp(basename, log_name_tmp, (int32_t)(q-basename)))
 
587
      {
 
588
        error= -2;
 
589
        break;
 
590
      }
 
591
      // Now compare extensions.
 
592
      char *q_end;
 
593
      uint32_t group_master_log_name_extension= strtoul(q, &q_end, 10);
 
594
      if (group_master_log_name_extension < log_name_extension)
 
595
        cmp_result= -1 ;
 
596
      else
 
597
        cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
 
598
 
 
599
      pos_reached= ((!cmp_result && group_master_log_pos >= (uint64_t)log_pos) ||
 
600
                    cmp_result > 0);
 
601
      if (pos_reached || session->killed)
 
602
        break;
 
603
    }
 
604
 
 
605
    //wait for master update, with optional timeout.
 
606
 
 
607
    /*
 
608
      We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
 
609
      will wake us up.
 
610
    */
 
611
    if (timeout > 0)
 
612
    {
 
613
      /*
 
614
        Note that pthread_cond_timedwait checks for the timeout
 
615
        before for the condition ; i.e. it returns ETIMEDOUT
 
616
        if the system time equals or exceeds the time specified by abstime
 
617
        before the condition variable is signaled or broadcast, _or_ if
 
618
        the absolute time specified by abstime has already passed at the time
 
619
        of the call.
 
620
        For that reason, pthread_cond_timedwait will do the "timeoutting" job
 
621
        even if its condition is always immediately signaled (case of a loaded
 
622
        master).
 
623
      */
 
624
      error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
 
625
    }
 
626
    else
 
627
      pthread_cond_wait(&data_cond, &data_lock);
 
628
    if (error == ETIMEDOUT || error == ETIME)
 
629
    {
 
630
      error= -1;
 
631
      break;
 
632
    }
 
633
    error=0;
 
634
    event_count++;
 
635
  }
 
636
 
 
637
err:
 
638
  session->exit_cond(msg);
 
639
  if (session->killed || init_abort_pos_wait != abort_pos_wait ||
 
640
      !slave_running)
 
641
  {
 
642
    error= -2;
 
643
  }
 
644
  return( error ? error : event_count );
 
645
}
 
646
 
 
647
 
 
648
void Relay_log_info::inc_group_relay_log_pos(uint64_t log_pos,
 
649
                                                bool skip_lock)
 
650
{
 
651
  if (!skip_lock)
 
652
    pthread_mutex_lock(&data_lock);
 
653
  inc_event_relay_log_pos();
 
654
  group_relay_log_pos= event_relay_log_pos;
 
655
  group_relay_log_name.assign(event_relay_log_name);
 
656
 
 
657
  notify_group_relay_log_name_update();
 
658
 
 
659
  /*
 
660
    If the slave does not support transactions and replicates a transaction,
 
661
    users should not trust group_master_log_pos (which they can display with
 
662
    SHOW SLAVE STATUS or read from relay-log.info), because to compute
 
663
    group_master_log_pos the slave relies on log_pos stored in the master's
 
664
    binlog, but if we are in a master's transaction these positions are always
 
665
    the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
 
666
    not advance as it should on the non-transactional slave (it advances by
 
667
    big leaps, whereas it should advance by small leaps).
 
668
  */
 
669
  /*
 
670
    In 4.x we used the event's len to compute the positions here. This is
 
671
    wrong if the event was 3.23/4.0 and has been converted to 5.0, because
 
672
    then the event's len is not what is was in the master's binlog, so this
 
673
    will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
 
674
    replication: Exec_master_log_pos is wrong). Only way to solve this is to
 
675
    have the original offset of the end of the event the relay log. This is
 
676
    what we do in 5.0: log_pos has become "end_log_pos" (because the real use
 
677
    of log_pos in 4.0 was to compute the end_log_pos; so better to store
 
678
    end_log_pos instead of begin_log_pos.
 
679
    If we had not done this fix here, the problem would also have appeared
 
680
    when the slave and master are 5.0 but with different event length (for
 
681
    example the slave is more recent than the master and features the event
 
682
    UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
 
683
    SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
 
684
    value which would lead to badly broken replication.
 
685
    Even the relay_log_pos will be corrupted in this case, because the len is
 
686
    the relay log is not "val".
 
687
    With the end_log_pos solution, we avoid computations involving lengthes.
 
688
  */
 
689
  if (log_pos) // 3.23 binlogs don't have log_posx
 
690
  {
 
691
    group_master_log_pos= log_pos;
 
692
  }
 
693
  pthread_cond_broadcast(&data_cond);
 
694
  if (!skip_lock)
 
695
    pthread_mutex_unlock(&data_lock);
 
696
  return;
 
697
}
 
698
 
 
699
 
 
700
void Relay_log_info::close_temporary_tables()
 
701
{
 
702
  Table *table,*next;
 
703
 
 
704
  for (table=save_temporary_tables ; table ; table=next)
 
705
  {
 
706
    next=table->next;
 
707
    /*
 
708
      Don't ask for disk deletion. For now, anyway they will be deleted when
 
709
      slave restarts, but it is a better intention to not delete them.
 
710
    */
 
711
    close_temporary(table, 1, 0);
 
712
  }
 
713
  save_temporary_tables= 0;
 
714
  slave_open_temp_tables= 0;
 
715
  return;
 
716
}
 
717
 
 
718
/*
 
719
  purge_relay_logs()
 
720
 
 
721
  NOTES
 
722
    Assumes to have a run lock on rli and that no slave thread are running.
 
723
*/
 
724
 
 
725
int32_t purge_relay_logs(Relay_log_info* rli, Session *session, bool just_reset,
 
726
                     const char** errmsg)
 
727
{
 
728
  int32_t error=0;
 
729
 
 
730
  /*
 
731
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
 
732
    Indeed, rli->inited==0 does not imply that they already are empty.
 
733
    It could be that slave's info initialization partly succeeded :
 
734
    for example if relay-log.info existed but *relay-bin*.*
 
735
    have been manually removed, init_relay_log_info reads the old
 
736
    relay-log.info and fills rli->master_log_*, then init_relay_log_info
 
737
    checks for the existence of the relay log, this fails and
 
738
    init_relay_log_info leaves rli->inited to 0.
 
739
    In that pathological case, rli->master_log_pos* will be properly reinited
 
740
    at the next START SLAVE (as RESET SLAVE or CHANGE
 
741
    MASTER, the callers of purge_relay_logs, will delete bogus *.info files
 
742
    or replace them with correct files), however if the user does SHOW SLAVE
 
743
    STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
 
744
    In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
 
745
    to display fine in any case.
 
746
  */
 
747
 
 
748
  rli->group_master_log_name[0]= 0;
 
749
  rli->group_master_log_pos= 0;
 
750
 
 
751
  if (!rli->inited)
 
752
  {
 
753
    return(0);
 
754
  }
 
755
 
 
756
  assert(rli->slave_running == 0);
 
757
  assert(rli->mi->slave_running == 0);
 
758
 
 
759
  rli->slave_skip_counter=0;
 
760
  pthread_mutex_lock(&rli->data_lock);
 
761
 
 
762
  /*
 
763
    we close the relay log fd possibly left open by the slave SQL thread,
 
764
    to be able to delete it; the relay log fd possibly left open by the slave
 
765
    I/O thread will be closed naturally in reset_logs() by the
 
766
    close(LOG_CLOSE_TO_BE_OPENED) call
 
767
  */
 
768
  if (rli->cur_log_fd >= 0)
 
769
  {
 
770
    end_io_cache(&rli->cache_buf);
 
771
    my_close(rli->cur_log_fd, MYF(MY_WME));
 
772
    rli->cur_log_fd= -1;
 
773
  }
 
774
 
 
775
  if (rli->relay_log.reset_logs(session))
 
776
  {
 
777
    *errmsg = "Failed during log reset";
 
778
    error=1;
 
779
    goto err;
 
780
  }
 
781
  /* Save name of used relay log file */
 
782
  rli->group_relay_log_name.assign(rli->relay_log.get_log_fname());
 
783
  rli->event_relay_log_name.assign(rli->relay_log.get_log_fname());
 
784
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
 
785
  if (count_relay_log_space(rli))
 
786
  {
 
787
    *errmsg= "Error counting relay log space";
 
788
    goto err;
 
789
  }
 
790
  if (!just_reset)
 
791
    error= init_relay_log_pos(rli, rli->group_relay_log_name.c_str(),
 
792
                              rli->group_relay_log_pos,
 
793
                              0 /* do not need data lock */, errmsg, 0);
 
794
 
 
795
err:
 
796
  pthread_mutex_unlock(&rli->data_lock);
 
797
  return(error);
 
798
}
 
799
 
 
800
 
 
801
/*
 
802
     Check if condition stated in UNTIL clause of START SLAVE is reached.
 
803
   SYNOPSYS
 
804
     Relay_log_info::is_until_satisfied()
 
805
     master_beg_pos    position of the beginning of to be executed event
 
806
                       (not log_pos member of the event that points to the
 
807
                        beginning of the following event)
 
808
 
 
809
 
 
810
   DESCRIPTION
 
811
     Checks if UNTIL condition is reached. Uses caching result of last
 
812
     comparison of current log file name and target log file name. So cached
 
813
     value should be invalidated if current log file name changes
 
814
     (see Relay_log_info::notify_... functions).
 
815
 
 
816
     This caching is needed to avoid of expensive string comparisons and
 
817
     strtol() conversions needed for log names comparison. We don't need to
 
818
     compare them each time this function is called, we only need to do this
 
819
     when current log name changes. If we have UNTIL_MASTER_POS condition we
 
820
     need to do this only after Rotate_log_event::do_apply_event() (which is
 
821
     rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
 
822
     condition then we should invalidate cached comarison value after
 
823
     inc_group_relay_log_pos() which called for each group of events (so we
 
824
     have some benefit if we have something like queries that use
 
825
     autoincrement or if we have transactions).
 
826
 
 
827
     Should be called ONLY if until_condition != UNTIL_NONE !
 
828
   RETURN VALUE
 
829
     true - condition met or error happened (condition seems to have
 
830
            bad log file name)
 
831
     false - condition not met
 
832
*/
 
833
 
 
834
bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
 
835
{
 
836
  const char *log_name;
 
837
  uint64_t log_pos;
 
838
 
 
839
  assert(until_condition != UNTIL_NONE);
 
840
 
 
841
  if (until_condition == UNTIL_MASTER_POS)
 
842
  {
 
843
    log_name= group_master_log_name.c_str();
 
844
    log_pos= master_beg_pos;
 
845
  }
 
846
  else
 
847
  { /* until_condition == UNTIL_RELAY_POS */
 
848
    log_name= group_relay_log_name.c_str();
 
849
    log_pos= group_relay_log_pos;
 
850
  }
 
851
 
 
852
  if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
 
853
  {
 
854
    /*
 
855
      We have no cached comparison results so we should compare log names
 
856
      and cache result.
 
857
      If we are after RESET SLAVE, and the SQL slave thread has not processed
 
858
      any event yet, it could be that group_master_log_name is "". In that case,
 
859
      just wait for more events (as there is no sensible comparison to do).
 
860
    */
 
861
 
 
862
    if (*log_name)
 
863
    {
 
864
      const char *basename= log_name + dirname_length(log_name);
 
865
 
 
866
      const char *q= (const char*)(fn_ext(basename)+1);
 
867
      if (strncmp(basename, until_log_name, (int32_t)(q-basename)) == 0)
 
868
      {
 
869
        /* Now compare extensions. */
 
870
        char *q_end;
 
871
        uint32_t log_name_extension= strtoul(q, &q_end, 10);
 
872
        if (log_name_extension < until_log_name_extension)
 
873
          until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
 
874
        else
 
875
          until_log_names_cmp_result=
 
876
            (log_name_extension > until_log_name_extension) ?
 
877
            UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
 
878
      }
 
879
      else
 
880
      {
 
881
        /* Probably error so we aborting */
 
882
        sql_print_error(_("Slave SQL thread is stopped because UNTIL "
 
883
                          "condition is bad."));
 
884
        return(true);
 
885
      }
 
886
    }
 
887
    else
 
888
      return(until_log_pos == 0);
 
889
  }
 
890
 
 
891
  return(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
 
892
           log_pos >= until_log_pos) ||
 
893
          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
 
894
}
 
895
 
 
896
 
 
897
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
 
898
                                  time_t event_creation_time)
 
899
{
 
900
  extern uint32_t debug_not_change_ts_if_art_event;
 
901
  clear_flag(IN_STMT);
 
902
 
 
903
  /*
 
904
    If in a transaction, and if the slave supports transactions, just
 
905
    inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
 
906
    (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
 
907
    BEGIN/COMMIT, not with SET AUTOCOMMIT= .
 
908
 
 
909
    CAUTION: opt_using_transactions means innodb || bdb ; suppose the
 
910
    master supports InnoDB and BDB, but the slave supports only BDB,
 
911
    problems will arise: - suppose an InnoDB table is created on the
 
912
    master, - then it will be MyISAM on the slave - but as
 
913
    opt_using_transactions is true, the slave will believe he is
 
914
    transactional with the MyISAM table. And problems will come when
 
915
    one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
 
916
    resume at BEGIN whereas there has not been any rollback).  This is
 
917
    the problem of using opt_using_transactions instead of a finer
 
918
    "does the slave support _transactional handler used on the
 
919
    master_".
 
920
 
 
921
    More generally, we'll have problems when a query mixes a
 
922
    transactional handler and MyISAM and STOP SLAVE is issued in the
 
923
    middle of the "transaction". START SLAVE will resume at BEGIN
 
924
    while the MyISAM table has already been updated.
 
925
  */
 
926
  if ((sql_session->options & OPTION_BEGIN) && opt_using_transactions)
 
927
    inc_event_relay_log_pos();
 
928
  else
 
929
  {
 
930
    inc_group_relay_log_pos(event_master_log_pos);
 
931
    flush_relay_log_info(this);
 
932
    /*
 
933
      Note that Rotate_log_event::do_apply_event() does not call this
 
934
      function, so there is no chance that a fake rotate event resets
 
935
      last_master_timestamp.  Note that we update without mutex
 
936
      (probably ok - except in some very rare cases, only consequence
 
937
      is that value may take some time to display in
 
938
      Seconds_Behind_Master - not critical).
 
939
    */
 
940
    if (!(event_creation_time == 0 && debug_not_change_ts_if_art_event > 0))
 
941
      last_master_timestamp= event_creation_time;
 
942
  }
 
943
}
 
944
 
 
945
void Relay_log_info::cleanup_context(Session *session, bool error)
 
946
{
 
947
  assert(sql_session == session);
 
948
  /*
 
949
    1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
 
950
    may have opened tables, which we cannot be sure have been closed (because
 
951
    maybe the Rows_log_event have not been found or will not be, because slave
 
952
    SQL thread is stopping, or relay log has a missing tail etc). So we close
 
953
    all thread's tables. And so the table mappings have to be cancelled.
 
954
    2) Rows_log_event::do_apply_event() may even have started statements or
 
955
    transactions on them, which we need to rollback in case of error.
 
956
    3) If finding a Format_description_log_event after a BEGIN, we also need
 
957
    to rollback before continuing with the next events.
 
958
    4) so we need this "context cleanup" function.
 
959
  */
 
960
  if (error)
 
961
  {
 
962
    ha_autocommit_or_rollback(session, 1); // if a "statement transaction"
 
963
    end_trans(session, ROLLBACK); // if a "real transaction"
 
964
  }
 
965
  m_table_map.clear_tables();
 
966
  close_thread_tables(session);
 
967
  clear_tables_to_lock();
 
968
  clear_flag(IN_STMT);
 
969
  /*
 
970
    Cleanup for the flags that have been set at do_apply_event.
 
971
  */
 
972
  session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
 
973
  session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
 
974
  last_event_start_time= 0;
 
975
  return;
 
976
}
 
977
 
 
978
void Relay_log_info::clear_tables_to_lock()
 
979
{
 
980
  while (tables_to_lock)
 
981
  {
 
982
    unsigned char* to_free= reinterpret_cast<unsigned char*>(tables_to_lock);
 
983
    if (tables_to_lock->m_tabledef_valid)
 
984
    {
 
985
      tables_to_lock->m_tabledef.table_def::~table_def();
 
986
      tables_to_lock->m_tabledef_valid= false;
 
987
    }
 
988
    tables_to_lock=
 
989
      static_cast<RPL_TableList*>(tables_to_lock->next_global);
 
990
    tables_to_lock_count--;
 
991
    free(to_free);
 
992
  }
 
993
  assert(tables_to_lock == NULL && tables_to_lock_count == 0);
 
994
}