~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication/rli.cc

  • Committer: Brian Aker
  • Date: 2008-12-23 07:19:26 UTC
  • Revision ID: brian@tangent.org-20081223071926-69z2ugpftfz1lfnm
Remove dead variables.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <drizzled/server_includes.h>
 
17
#include <drizzled/replication/mi.h>
 
18
#include <drizzled/replication/rli.h>
 
19
// For check_binlog_magic
 
20
#include <drizzled/replication/replication.h>
 
21
#include <drizzled/replication/utility.h>
 
22
#include <drizzled/data_home.h>
 
23
#include <drizzled/sql_parse.h>
 
24
#include <drizzled/gettext.h>
 
25
#include <drizzled/log_event.h>
 
26
#include <drizzled/sql_base.h>
 
27
 
 
28
#if TIME_WITH_SYS_TIME
 
29
# include <sys/time.h>
 
30
# include <time.h>
 
31
#else
 
32
# if HAVE_SYS_TIME_H
 
33
#  include <sys/time.h>
 
34
# else
 
35
#  include <time.h>
 
36
# endif
 
37
#endif
 
38
 
 
39
 
 
40
 
 
41
static int32_t count_relay_log_space(Relay_log_info* rli);
 
42
 
 
43
// Defined in slave.cc
 
44
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val);
 
45
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
 
46
                              const char *default_val);
 
47
 
 
48
 
 
49
Relay_log_info::Relay_log_info()
 
50
  :Slave_reporting_capability("SQL"),
 
51
   no_storage(false), replicate_same_server_id(::replicate_same_server_id),
 
52
   info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
 
53
#if defined(HAVE_purify) && HAVE_purify
 
54
   is_fake(false),
 
55
#endif
 
56
   cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
 
57
   group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
 
58
   last_master_timestamp(0), slave_skip_counter(0),
 
59
   abort_pos_wait(0), slave_run_id(0), sql_session(0),
 
60
   inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
 
61
   until_log_pos(0), retried_trans(0),
 
62
   tables_to_lock(0), tables_to_lock_count(0),
 
63
   last_event_start_time(0), m_flags(0)
 
64
{
 
65
  group_relay_log_name[0]= event_relay_log_name[0]=
 
66
    group_master_log_name[0]= 0;
 
67
  until_log_name[0]= ign_master_log_name_end[0]= 0;
 
68
  memset(&info_file, 0, sizeof(info_file));
 
69
  memset(&cache_buf, 0, sizeof(cache_buf));
 
70
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
 
71
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
 
72
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
 
73
  pthread_cond_init(&data_cond, NULL);
 
74
  pthread_cond_init(&start_cond, NULL);
 
75
  pthread_cond_init(&stop_cond, NULL);
 
76
  pthread_cond_init(&log_space_cond, NULL);
 
77
  relay_log.init_pthread_objects();
 
78
  return;
 
79
}
 
80
 
 
81
 
 
82
Relay_log_info::~Relay_log_info()
 
83
{
 
84
  pthread_mutex_destroy(&run_lock);
 
85
  pthread_mutex_destroy(&data_lock);
 
86
  pthread_mutex_destroy(&log_space_lock);
 
87
  pthread_cond_destroy(&data_cond);
 
88
  pthread_cond_destroy(&start_cond);
 
89
  pthread_cond_destroy(&stop_cond);
 
90
  pthread_cond_destroy(&log_space_cond);
 
91
  relay_log.cleanup();
 
92
  return;
 
93
}
 
94
 
 
95
 
 
96
int32_t init_relay_log_info(Relay_log_info* rli,
 
97
                            const char* info_fname)
 
98
{
 
99
  char fname[FN_REFLEN+128];
 
100
  int32_t info_fd;
 
101
  const char* msg = 0;
 
102
  int32_t error = 0;
 
103
  assert(!rli->no_storage);         // Don't init if there is no storage
 
104
 
 
105
  if (rli->inited)                       // Set if this function called
 
106
    return(0);
 
107
  fn_format(fname, info_fname, drizzle_data_home, "", 4+32);
 
108
  pthread_mutex_lock(&rli->data_lock);
 
109
  info_fd = rli->info_fd;
 
110
  rli->cur_log_fd = -1;
 
111
  rli->slave_skip_counter=0;
 
112
  rli->abort_pos_wait=0;
 
113
  rli->log_space_limit= relay_log_space_limit;
 
114
  rli->log_space_total= 0;
 
115
  rli->tables_to_lock= 0;
 
116
  rli->tables_to_lock_count= 0;
 
117
 
 
118
  /*
 
119
    The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
 
120
    Note that the I/O thread flushes it to disk after writing every
 
121
    event, in flush_master_info(mi, 1).
 
122
  */
 
123
 
 
124
  /*
 
125
    For the maximum log size, we choose max_relay_log_size if it is
 
126
    non-zero, max_binlog_size otherwise. If later the user does SET
 
127
    GLOBAL on one of these variables, fix_max_binlog_size and
 
128
    fix_max_relay_log_size will reconsider the choice (for example
 
129
    if the user changes max_relay_log_size to zero, we have to
 
130
    switch to using max_binlog_size for the relay log) and update
 
131
    rli->relay_log.max_size (and drizzle_bin_log.max_size).
 
132
  */
 
133
  {
 
134
    char buf[FN_REFLEN];
 
135
    const char *ln;
 
136
    static bool name_warning_sent= 0;
 
137
    ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
 
138
                                     1, buf);
 
139
    /* We send the warning only at startup, not after every RESET SLAVE */
 
140
    if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
 
141
    {
 
142
      /*
 
143
        User didn't give us info to name the relay log index file.
 
144
        Picking `hostname`-relay-bin.index like we do, causes replication to
 
145
        fail if this slave's hostname is changed later. So, we would like to
 
146
        instead require a name. But as we don't want to break many existing
 
147
        setups, we only give warning, not error.
 
148
      */
 
149
      sql_print_warning(_("Neither --relay-log nor --relay-log-index were used;"
 
150
                        " so replication "
 
151
                        "may break when this Drizzle server acts as a "
 
152
                        "slave and has his hostname changed!! Please "
 
153
                        "use '--relay-log=%s' to avoid this problem."), ln);
 
154
      name_warning_sent= 1;
 
155
    }
 
156
    /*
 
157
      note, that if open() fails, we'll still have index file open
 
158
      but a destructor will take care of that
 
159
    */
 
160
    if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln) ||
 
161
        rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
 
162
                            (max_relay_log_size ? max_relay_log_size :
 
163
                            max_binlog_size), 1))
 
164
    {
 
165
      pthread_mutex_unlock(&rli->data_lock);
 
166
      sql_print_error(_("Failed in open_log() called from "
 
167
                        "init_relay_log_info()"));
 
168
      return(1);
 
169
    }
 
170
  }
 
171
 
 
172
  /* if file does not exist */
 
173
  if (access(fname,F_OK))
 
174
  {
 
175
    /* Create a new file */
 
176
  }
 
177
  else // file exists
 
178
  {
 
179
    /* Open up fname here and pull out the relay.info data */
 
180
  }
 
181
 
 
182
  /*
 
183
    Now change the cache from READ to WRITE - must do this
 
184
    before flush_relay_log_info
 
185
  */
 
186
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
 
187
  if ((error= flush_relay_log_info(rli)))
 
188
    sql_print_error(_("Failed to flush relay log info file"));
 
189
  if (count_relay_log_space(rli))
 
190
  {
 
191
    msg=_("Error counting relay log space");
 
192
    goto err;
 
193
  }
 
194
  rli->inited= 1;
 
195
  pthread_mutex_unlock(&rli->data_lock);
 
196
  return(error);
 
197
 
 
198
err:
 
199
  sql_print_error("%s",msg);
 
200
  end_io_cache(&rli->info_file);
 
201
  if (info_fd >= 0)
 
202
    my_close(info_fd, MYF(0));
 
203
  rli->info_fd= -1;
 
204
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
205
  pthread_mutex_unlock(&rli->data_lock);
 
206
  return(1);
 
207
}
 
208
 
 
209
 
 
210
static inline int32_t add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
 
211
{
 
212
  struct stat s;
 
213
  if (stat(linfo->log_file_name,&s))
 
214
  {
 
215
    sql_print_error(_("log %s listed in the index, but failed to stat"),
 
216
                    linfo->log_file_name);
 
217
    return(1);
 
218
  }
 
219
  rli->log_space_total += s.st_size;
 
220
  return(0);
 
221
}
 
222
 
 
223
 
 
224
static int32_t count_relay_log_space(Relay_log_info* rli)
 
225
{
 
226
  LOG_INFO linfo;
 
227
  rli->log_space_total= 0;
 
228
  if (rli->relay_log.find_log_pos(&linfo, NULL, 1))
 
229
  {
 
230
    sql_print_error(_("Could not find first log while counting relay "
 
231
                      "log space"));
 
232
    return(1);
 
233
  }
 
234
  do
 
235
  {
 
236
    if (add_relay_log(rli,&linfo))
 
237
      return(1);
 
238
  } while (!rli->relay_log.find_next_log(&linfo, 1));
 
239
  /*
 
240
     As we have counted everything, including what may have written in a
 
241
     preceding write, we must reset bytes_written, or we may count some space
 
242
     twice.
 
243
  */
 
244
  rli->relay_log.reset_bytes_written();
 
245
  return(0);
 
246
}
 
247
 
 
248
 
 
249
/*
 
250
   Reset UNTIL condition for Relay_log_info
 
251
 
 
252
   SYNOPSYS
 
253
    clear_until_condition()
 
254
      rli - Relay_log_info structure where UNTIL condition should be reset
 
255
 */
 
256
 
 
257
void Relay_log_info::clear_until_condition()
 
258
{
 
259
  until_condition= Relay_log_info::UNTIL_NONE;
 
260
  until_log_name[0]= 0;
 
261
  until_log_pos= 0;
 
262
  return;
 
263
}
 
264
 
 
265
 
 
266
/*
 
267
  Open the given relay log
 
268
 
 
269
  SYNOPSIS
 
270
    init_relay_log_pos()
 
271
    rli                 Relay information (will be initialized)
 
272
    log                 Name of relay log file to read from. NULL = First log
 
273
    pos                 Position in relay log file
 
274
    need_data_lock      Set to 1 if this functions should do mutex locks
 
275
    errmsg              Store pointer to error message here
 
276
    look_for_description_event
 
277
                        1 if we should look for such an event. We only need
 
278
                        this when the SQL thread starts and opens an existing
 
279
                        relay log and has to execute it (possibly from an
 
280
                        offset >4); then we need to read the first event of
 
281
                        the relay log to be able to parse the events we have
 
282
                        to execute.
 
283
 
 
284
  DESCRIPTION
 
285
  - Close old open relay log files.
 
286
  - If we are using the same relay log as the running IO-thread, then set
 
287
    rli->cur_log to point to the same IO_CACHE entry.
 
288
  - If not, open the 'log' binary file.
 
289
 
 
290
  TODO
 
291
    - check proper initialization of group_master_log_name/group_master_log_pos
 
292
 
 
293
  RETURN VALUES
 
294
    0   ok
 
295
    1   error.  errmsg is set to point to the error message
 
296
*/
 
297
 
 
298
int32_t init_relay_log_pos(Relay_log_info* rli,const char* log,
 
299
                       uint64_t pos, bool need_data_lock,
 
300
                       const char** errmsg,
 
301
                       bool look_for_description_event)
 
302
{
 
303
  *errmsg=0;
 
304
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
 
305
 
 
306
  if (need_data_lock)
 
307
    pthread_mutex_lock(&rli->data_lock);
 
308
 
 
309
  /*
 
310
    Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
 
311
    is, too, and init_slave() too; these 2 functions allocate a description
 
312
    event in init_relay_log_pos, which is not freed by the terminating SQL slave
 
313
    thread as that thread is not started by these functions. So we have to free
 
314
    the description_event here, in case, so that there is no memory leak in
 
315
    running, say, CHANGE MASTER.
 
316
  */
 
317
  delete rli->relay_log.description_event_for_exec;
 
318
  /*
 
319
    By default the relay log is in binlog format 3 (4.0).
 
320
    Even if format is 4, this will work enough to read the first event
 
321
    (Format_desc) (remember that format 4 is just lenghtened compared to format
 
322
    3; format 3 is a prefix of format 4).
 
323
  */
 
324
  rli->relay_log.description_event_for_exec= new
 
325
    Format_description_log_event(3);
 
326
 
 
327
  pthread_mutex_lock(log_lock);
 
328
 
 
329
  /* Close log file and free buffers if it's already open */
 
330
  if (rli->cur_log_fd >= 0)
 
331
  {
 
332
    end_io_cache(&rli->cache_buf);
 
333
    my_close(rli->cur_log_fd, MYF(MY_WME));
 
334
    rli->cur_log_fd = -1;
 
335
  }
 
336
 
 
337
  rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
 
338
 
 
339
  /*
 
340
    Test to see if the previous run was with the skip of purging
 
341
    If yes, we do not purge when we restart
 
342
  */
 
343
  if (rli->relay_log.find_log_pos(&rli->linfo, NULL, 1))
 
344
  {
 
345
    *errmsg="Could not find first log during relay log initialization";
 
346
    goto err;
 
347
  }
 
348
 
 
349
  if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
 
350
  {
 
351
    *errmsg="Could not find target log during relay log initialization";
 
352
    goto err;
 
353
  }
 
354
 
 
355
  rli->group_relay_log_name.assign(rli->linfo.log_file_name);
 
356
  rli->event_relay_log_name.assign(rli->linfo.log_file_name);
 
357
 
 
358
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
 
359
  {
 
360
    /*
 
361
      The IO thread is using this log file.
 
362
      In this case, we will use the same IO_CACHE pointer to
 
363
      read data as the IO thread is using to write data.
 
364
    */
 
365
    my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
 
366
    if (check_binlog_magic(rli->cur_log,errmsg))
 
367
      goto err;
 
368
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
 
369
  }
 
370
  else
 
371
  {
 
372
    /*
 
373
      Open the relay log and set rli->cur_log to point at this one
 
374
    */
 
375
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
 
376
                                     rli->linfo.log_file_name,errmsg)) < 0)
 
377
      goto err;
 
378
    rli->cur_log = &rli->cache_buf;
 
379
  }
 
380
  /*
 
381
    In all cases, check_binlog_magic() has been called so we're at offset 4 for
 
382
    sure.
 
383
  */
 
384
  if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
 
385
  {
 
386
    Log_event* ev;
 
387
    while (look_for_description_event)
 
388
    {
 
389
      /*
 
390
        Read the possible Format_description_log_event; if position
 
391
        was 4, no need, it will be read naturally.
 
392
      */
 
393
      if (my_b_tell(rli->cur_log) >= pos)
 
394
        break;
 
395
 
 
396
      /*
 
397
        Because of we have rli->data_lock and log_lock, we can safely read an
 
398
        event
 
399
      */
 
400
      if (!(ev=Log_event::read_log_event(rli->cur_log,0,
 
401
                                         rli->relay_log.description_event_for_exec)))
 
402
      {
 
403
        if (rli->cur_log->error) /* not EOF */
 
404
        {
 
405
          *errmsg= "I/O error reading event at position 4";
 
406
          goto err;
 
407
        }
 
408
        break;
 
409
      }
 
410
      else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
 
411
      {
 
412
        delete rli->relay_log.description_event_for_exec;
 
413
        rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
 
414
        /*
 
415
          As ev was returned by read_log_event, it has passed is_valid(), so
 
416
          malloc() in ctor worked, no need to check again.
 
417
        */
 
418
        /*
 
419
          Ok, we found a Format_description event. But it is not sure that this
 
420
          describes the whole relay log; indeed, one can have this sequence
 
421
          (starting from position 4):
 
422
          Format_desc (of slave)
 
423
          Rotate (of master)
 
424
          Format_desc (of master)
 
425
          So the Format_desc which really describes the rest of the relay log
 
426
          is the 3rd event (it can't be further than that, because we rotate
 
427
          the relay log when we queue a Rotate event from the master).
 
428
          But what describes the Rotate is the first Format_desc.
 
429
          So what we do is:
 
430
          go on searching for Format_description events, until you exceed the
 
431
          position (argument 'pos') or until you find another event than Rotate
 
432
          or Format_desc.
 
433
        */
 
434
      }
 
435
      else
 
436
      {
 
437
        look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
 
438
        delete ev;
 
439
      }
 
440
    }
 
441
    my_b_seek(rli->cur_log,(off_t)pos);
 
442
 
 
443
  }
 
444
 
 
445
err:
 
446
  /*
 
447
    If we don't purge, we can't honour relay_log_space_limit ;
 
448
    silently discard it
 
449
  */
 
450
  if (!relay_log_purge)
 
451
    rli->log_space_limit= 0;
 
452
  pthread_cond_broadcast(&rli->data_cond);
 
453
 
 
454
  pthread_mutex_unlock(log_lock);
 
455
 
 
456
  if (need_data_lock)
 
457
    pthread_mutex_unlock(&rli->data_lock);
 
458
  if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
 
459
    *errmsg= "Invalid Format_description log event; could be out of memory";
 
460
 
 
461
  return ((*errmsg) ? 1 : 0);
 
462
}
 
463
 
 
464
 
 
465
/*
 
466
  Waits until the SQL thread reaches (has executed up to) the
 
467
  log/position or timed out.
 
468
 
 
469
  SYNOPSIS
 
470
    wait_for_pos()
 
471
    session             client thread that sent SELECT MASTER_POS_WAIT
 
472
    log_name        log name to wait for
 
473
    log_pos         position to wait for
 
474
    timeout         timeout in seconds before giving up waiting
 
475
 
 
476
  NOTES
 
477
    timeout is int64_t whereas it should be uint32_t ; but this is
 
478
    to catch if the user submitted a negative timeout.
 
479
 
 
480
  RETURN VALUES
 
481
    -2          improper arguments (log_pos<0)
 
482
                or slave not running, or master info changed
 
483
                during the function's execution,
 
484
                or client thread killed. -2 is translated to NULL by caller
 
485
    -1          timed out
 
486
    >=0         number of log events the function had to wait
 
487
                before reaching the desired log/position
 
488
 */
 
489
 
 
490
int32_t Relay_log_info::wait_for_pos(Session* session, String* log_name,
 
491
                                    int64_t log_pos,
 
492
                                    int64_t timeout)
 
493
{
 
494
  int32_t event_count = 0;
 
495
  uint32_t init_abort_pos_wait;
 
496
  int32_t error=0;
 
497
  struct timespec abstime; // for timeout checking
 
498
  const char *msg;
 
499
 
 
500
  if (!inited)
 
501
    return(-2);
 
502
 
 
503
  set_timespec(abstime,timeout);
 
504
  pthread_mutex_lock(&data_lock);
 
505
  msg= session->enter_cond(&data_cond, &data_lock,
 
506
                       "Waiting for the slave SQL thread to "
 
507
                       "advance position");
 
508
  /*
 
509
     This function will abort when it notices that some CHANGE MASTER or
 
510
     RESET MASTER has changed the master info.
 
511
     To catch this, these commands modify abort_pos_wait ; We just monitor
 
512
     abort_pos_wait and see if it has changed.
 
513
     Why do we have this mechanism instead of simply monitoring slave_running
 
514
     in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
 
515
     the SQL thread be stopped?
 
516
     This is becasue if someones does:
 
517
     STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
 
518
     the change may happen very quickly and we may not notice that
 
519
     slave_running briefly switches between 1/0/1.
 
520
  */
 
521
  init_abort_pos_wait= abort_pos_wait;
 
522
 
 
523
  /*
 
524
    We'll need to
 
525
    handle all possible log names comparisons (e.g. 999 vs 1000).
 
526
    We use uint32_t for string->number conversion ; this is no
 
527
    stronger limitation than in find_uniq_filename in sql/log.cc
 
528
  */
 
529
  uint32_t log_name_extension;
 
530
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
 
531
  size_t len= cmin(log_name->length(), (uint32_t)FN_REFLEN-1);
 
532
  strncpy(log_name_tmp, log_name->ptr(), len);
 
533
  log_name_tmp[len]= '\0';
 
534
 
 
535
  char *p= fn_ext(log_name_tmp);
 
536
  char *p_end;
 
537
  if (!*p || log_pos<0)
 
538
  {
 
539
    error= -2; //means improper arguments
 
540
    goto err;
 
541
  }
 
542
  // Convert 0-3 to 4
 
543
  log_pos= cmax(log_pos, (int64_t)BIN_LOG_HEADER_SIZE);
 
544
  /* p points to '.' */
 
545
  log_name_extension= strtoul(++p, &p_end, 10);
 
546
  /*
 
547
    p_end points to the first invalid character.
 
548
    If it equals to p, no digits were found, error.
 
549
    If it contains '\0' it means conversion went ok.
 
550
  */
 
551
  if (p_end==p || *p_end)
 
552
  {
 
553
    error= -2;
 
554
    goto err;
 
555
  }
 
556
 
 
557
  /* The "compare and wait" main loop */
 
558
  while (!session->killed &&
 
559
         init_abort_pos_wait == abort_pos_wait &&
 
560
         slave_running)
 
561
  {
 
562
    bool pos_reached;
 
563
    int32_t cmp_result= 0;
 
564
 
 
565
    /*
 
566
      group_master_log_name can be "", if we are just after a fresh
 
567
      replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
 
568
      (before we have executed one Rotate event from the master) or
 
569
      (rare) if the user is doing a weird slave setup (see next
 
570
      paragraph).  If group_master_log_name is "", we assume we don't
 
571
      have enough info to do the comparison yet, so we just wait until
 
572
      more data. In this case master_log_pos is always 0 except if
 
573
      somebody (wrongly) sets this slave to be a slave of itself
 
574
      without using --replicate-same-server-id (an unsupported
 
575
      configuration which does nothing), then group_master_log_pos
 
576
      will grow and group_master_log_name will stay "".
 
577
    */
 
578
    if (group_master_log_name.length())
 
579
    {
 
580
      const char *basename= (group_master_log_name.c_str() +
 
581
                             dirname_length(group_master_log_name.c_str()));
 
582
      /*
 
583
        First compare the parts before the extension.
 
584
        Find the dot in the master's log basename,
 
585
        and protect against user's input error :
 
586
        if the names do not match up to '.' included, return error
 
587
      */
 
588
      char *q= (char*)(fn_ext(basename)+1);
 
589
      if (strncmp(basename, log_name_tmp, (int32_t)(q-basename)))
 
590
      {
 
591
        error= -2;
 
592
        break;
 
593
      }
 
594
      // Now compare extensions.
 
595
      char *q_end;
 
596
      uint32_t group_master_log_name_extension= strtoul(q, &q_end, 10);
 
597
      if (group_master_log_name_extension < log_name_extension)
 
598
        cmp_result= -1 ;
 
599
      else
 
600
        cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
 
601
 
 
602
      pos_reached= ((!cmp_result && group_master_log_pos >= (uint64_t)log_pos) ||
 
603
                    cmp_result > 0);
 
604
      if (pos_reached || session->killed)
 
605
        break;
 
606
    }
 
607
 
 
608
    //wait for master update, with optional timeout.
 
609
 
 
610
    /*
 
611
      We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
 
612
      will wake us up.
 
613
    */
 
614
    if (timeout > 0)
 
615
    {
 
616
      /*
 
617
        Note that pthread_cond_timedwait checks for the timeout
 
618
        before for the condition ; i.e. it returns ETIMEDOUT
 
619
        if the system time equals or exceeds the time specified by abstime
 
620
        before the condition variable is signaled or broadcast, _or_ if
 
621
        the absolute time specified by abstime has already passed at the time
 
622
        of the call.
 
623
        For that reason, pthread_cond_timedwait will do the "timeoutting" job
 
624
        even if its condition is always immediately signaled (case of a loaded
 
625
        master).
 
626
      */
 
627
      error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
 
628
    }
 
629
    else
 
630
      pthread_cond_wait(&data_cond, &data_lock);
 
631
    if (error == ETIMEDOUT || error == ETIME)
 
632
    {
 
633
      error= -1;
 
634
      break;
 
635
    }
 
636
    error=0;
 
637
    event_count++;
 
638
  }
 
639
 
 
640
err:
 
641
  session->exit_cond(msg);
 
642
  if (session->killed || init_abort_pos_wait != abort_pos_wait ||
 
643
      !slave_running)
 
644
  {
 
645
    error= -2;
 
646
  }
 
647
  return( error ? error : event_count );
 
648
}
 
649
 
 
650
 
 
651
void Relay_log_info::inc_group_relay_log_pos(uint64_t log_pos,
 
652
                                                bool skip_lock)
 
653
{
 
654
  if (!skip_lock)
 
655
    pthread_mutex_lock(&data_lock);
 
656
  inc_event_relay_log_pos();
 
657
  group_relay_log_pos= event_relay_log_pos;
 
658
  group_relay_log_name.assign(event_relay_log_name);
 
659
 
 
660
  notify_group_relay_log_name_update();
 
661
 
 
662
  /*
 
663
    If the slave does not support transactions and replicates a transaction,
 
664
    users should not trust group_master_log_pos (which they can display with
 
665
    SHOW SLAVE STATUS or read from relay-log.info), because to compute
 
666
    group_master_log_pos the slave relies on log_pos stored in the master's
 
667
    binlog, but if we are in a master's transaction these positions are always
 
668
    the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
 
669
    not advance as it should on the non-transactional slave (it advances by
 
670
    big leaps, whereas it should advance by small leaps).
 
671
  */
 
672
  /*
 
673
    In 4.x we used the event's len to compute the positions here. This is
 
674
    wrong if the event was 3.23/4.0 and has been converted to 5.0, because
 
675
    then the event's len is not what is was in the master's binlog, so this
 
676
    will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
 
677
    replication: Exec_master_log_pos is wrong). Only way to solve this is to
 
678
    have the original offset of the end of the event the relay log. This is
 
679
    what we do in 5.0: log_pos has become "end_log_pos" (because the real use
 
680
    of log_pos in 4.0 was to compute the end_log_pos; so better to store
 
681
    end_log_pos instead of begin_log_pos.
 
682
    If we had not done this fix here, the problem would also have appeared
 
683
    when the slave and master are 5.0 but with different event length (for
 
684
    example the slave is more recent than the master and features the event
 
685
    UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
 
686
    SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
 
687
    value which would lead to badly broken replication.
 
688
    Even the relay_log_pos will be corrupted in this case, because the len is
 
689
    the relay log is not "val".
 
690
    With the end_log_pos solution, we avoid computations involving lengthes.
 
691
  */
 
692
  if (log_pos) // 3.23 binlogs don't have log_posx
 
693
  {
 
694
    group_master_log_pos= log_pos;
 
695
  }
 
696
  pthread_cond_broadcast(&data_cond);
 
697
  if (!skip_lock)
 
698
    pthread_mutex_unlock(&data_lock);
 
699
  return;
 
700
}
 
701
 
 
702
 
 
703
void Relay_log_info::close_temporary_tables()
 
704
{
 
705
  Table *table,*next;
 
706
 
 
707
  for (table=save_temporary_tables ; table ; table=next)
 
708
  {
 
709
    next=table->next;
 
710
    /*
 
711
      Don't ask for disk deletion. For now, anyway they will be deleted when
 
712
      slave restarts, but it is a better intention to not delete them.
 
713
    */
 
714
    close_temporary(table, 1, 0);
 
715
  }
 
716
  save_temporary_tables= 0;
 
717
  slave_open_temp_tables= 0;
 
718
  return;
 
719
}
 
720
 
 
721
/*
 
722
  purge_relay_logs()
 
723
 
 
724
  NOTES
 
725
    Assumes to have a run lock on rli and that no slave thread are running.
 
726
*/
 
727
 
 
728
int32_t purge_relay_logs(Relay_log_info* rli, Session *session, bool just_reset,
 
729
                     const char** errmsg)
 
730
{
 
731
  int32_t error=0;
 
732
 
 
733
  /*
 
734
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
 
735
    Indeed, rli->inited==0 does not imply that they already are empty.
 
736
    It could be that slave's info initialization partly succeeded :
 
737
    for example if relay-log.info existed but *relay-bin*.*
 
738
    have been manually removed, init_relay_log_info reads the old
 
739
    relay-log.info and fills rli->master_log_*, then init_relay_log_info
 
740
    checks for the existence of the relay log, this fails and
 
741
    init_relay_log_info leaves rli->inited to 0.
 
742
    In that pathological case, rli->master_log_pos* will be properly reinited
 
743
    at the next START SLAVE (as RESET SLAVE or CHANGE
 
744
    MASTER, the callers of purge_relay_logs, will delete bogus *.info files
 
745
    or replace them with correct files), however if the user does SHOW SLAVE
 
746
    STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
 
747
    In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
 
748
    to display fine in any case.
 
749
  */
 
750
 
 
751
  rli->group_master_log_name[0]= 0;
 
752
  rli->group_master_log_pos= 0;
 
753
 
 
754
  if (!rli->inited)
 
755
  {
 
756
    return(0);
 
757
  }
 
758
 
 
759
  assert(rli->slave_running == 0);
 
760
  assert(rli->mi->slave_running == 0);
 
761
 
 
762
  rli->slave_skip_counter=0;
 
763
  pthread_mutex_lock(&rli->data_lock);
 
764
 
 
765
  /*
 
766
    we close the relay log fd possibly left open by the slave SQL thread,
 
767
    to be able to delete it; the relay log fd possibly left open by the slave
 
768
    I/O thread will be closed naturally in reset_logs() by the
 
769
    close(LOG_CLOSE_TO_BE_OPENED) call
 
770
  */
 
771
  if (rli->cur_log_fd >= 0)
 
772
  {
 
773
    end_io_cache(&rli->cache_buf);
 
774
    my_close(rli->cur_log_fd, MYF(MY_WME));
 
775
    rli->cur_log_fd= -1;
 
776
  }
 
777
 
 
778
  if (rli->relay_log.reset_logs(session))
 
779
  {
 
780
    *errmsg = "Failed during log reset";
 
781
    error=1;
 
782
    goto err;
 
783
  }
 
784
  /* Save name of used relay log file */
 
785
  rli->group_relay_log_name.assign(rli->relay_log.get_log_fname());
 
786
  rli->event_relay_log_name.assign(rli->relay_log.get_log_fname());
 
787
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
 
788
  if (count_relay_log_space(rli))
 
789
  {
 
790
    *errmsg= "Error counting relay log space";
 
791
    goto err;
 
792
  }
 
793
  if (!just_reset)
 
794
    error= init_relay_log_pos(rli, rli->group_relay_log_name.c_str(),
 
795
                              rli->group_relay_log_pos,
 
796
                              0 /* do not need data lock */, errmsg, 0);
 
797
 
 
798
err:
 
799
  pthread_mutex_unlock(&rli->data_lock);
 
800
  return(error);
 
801
}
 
802
 
 
803
 
 
804
/*
 
805
     Check if condition stated in UNTIL clause of START SLAVE is reached.
 
806
   SYNOPSYS
 
807
     Relay_log_info::is_until_satisfied()
 
808
     master_beg_pos    position of the beginning of to be executed event
 
809
                       (not log_pos member of the event that points to the
 
810
                        beginning of the following event)
 
811
 
 
812
 
 
813
   DESCRIPTION
 
814
     Checks if UNTIL condition is reached. Uses caching result of last
 
815
     comparison of current log file name and target log file name. So cached
 
816
     value should be invalidated if current log file name changes
 
817
     (see Relay_log_info::notify_... functions).
 
818
 
 
819
     This caching is needed to avoid of expensive string comparisons and
 
820
     strtol() conversions needed for log names comparison. We don't need to
 
821
     compare them each time this function is called, we only need to do this
 
822
     when current log name changes. If we have UNTIL_MASTER_POS condition we
 
823
     need to do this only after Rotate_log_event::do_apply_event() (which is
 
824
     rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
 
825
     condition then we should invalidate cached comarison value after
 
826
     inc_group_relay_log_pos() which called for each group of events (so we
 
827
     have some benefit if we have something like queries that use
 
828
     autoincrement or if we have transactions).
 
829
 
 
830
     Should be called ONLY if until_condition != UNTIL_NONE !
 
831
   RETURN VALUE
 
832
     true - condition met or error happened (condition seems to have
 
833
            bad log file name)
 
834
     false - condition not met
 
835
*/
 
836
 
 
837
bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
 
838
{
 
839
  const char *log_name;
 
840
  uint64_t log_pos;
 
841
 
 
842
  assert(until_condition != UNTIL_NONE);
 
843
 
 
844
  if (until_condition == UNTIL_MASTER_POS)
 
845
  {
 
846
    log_name= group_master_log_name.c_str();
 
847
    log_pos= master_beg_pos;
 
848
  }
 
849
  else
 
850
  { /* until_condition == UNTIL_RELAY_POS */
 
851
    log_name= group_relay_log_name.c_str();
 
852
    log_pos= group_relay_log_pos;
 
853
  }
 
854
 
 
855
  if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
 
856
  {
 
857
    /*
 
858
      We have no cached comparison results so we should compare log names
 
859
      and cache result.
 
860
      If we are after RESET SLAVE, and the SQL slave thread has not processed
 
861
      any event yet, it could be that group_master_log_name is "". In that case,
 
862
      just wait for more events (as there is no sensible comparison to do).
 
863
    */
 
864
 
 
865
    if (*log_name)
 
866
    {
 
867
      const char *basename= log_name + dirname_length(log_name);
 
868
 
 
869
      const char *q= (const char*)(fn_ext(basename)+1);
 
870
      if (strncmp(basename, until_log_name, (int32_t)(q-basename)) == 0)
 
871
      {
 
872
        /* Now compare extensions. */
 
873
        char *q_end;
 
874
        uint32_t log_name_extension= strtoul(q, &q_end, 10);
 
875
        if (log_name_extension < until_log_name_extension)
 
876
          until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
 
877
        else
 
878
          until_log_names_cmp_result=
 
879
            (log_name_extension > until_log_name_extension) ?
 
880
            UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
 
881
      }
 
882
      else
 
883
      {
 
884
        /* Probably error so we aborting */
 
885
        sql_print_error(_("Slave SQL thread is stopped because UNTIL "
 
886
                          "condition is bad."));
 
887
        return(true);
 
888
      }
 
889
    }
 
890
    else
 
891
      return(until_log_pos == 0);
 
892
  }
 
893
 
 
894
  return(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
 
895
           log_pos >= until_log_pos) ||
 
896
          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
 
897
}
 
898
 
 
899
 
 
900
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
 
901
                                  time_t event_creation_time)
 
902
{
 
903
  extern uint32_t debug_not_change_ts_if_art_event;
 
904
  clear_flag(IN_STMT);
 
905
 
 
906
  /*
 
907
    If in a transaction, and if the slave supports transactions, just
 
908
    inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
 
909
    (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
 
910
    BEGIN/COMMIT, not with SET AUTOCOMMIT= .
 
911
 
 
912
    CAUTION: opt_using_transactions means innodb || bdb ; suppose the
 
913
    master supports InnoDB and BDB, but the slave supports only BDB,
 
914
    problems will arise: - suppose an InnoDB table is created on the
 
915
    master, - then it will be MyISAM on the slave - but as
 
916
    opt_using_transactions is true, the slave will believe he is
 
917
    transactional with the MyISAM table. And problems will come when
 
918
    one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
 
919
    resume at BEGIN whereas there has not been any rollback).  This is
 
920
    the problem of using opt_using_transactions instead of a finer
 
921
    "does the slave support _transactional handler used on the
 
922
    master_".
 
923
 
 
924
    More generally, we'll have problems when a query mixes a
 
925
    transactional handler and MyISAM and STOP SLAVE is issued in the
 
926
    middle of the "transaction". START SLAVE will resume at BEGIN
 
927
    while the MyISAM table has already been updated.
 
928
  */
 
929
  if ((sql_session->options & OPTION_BEGIN) && opt_using_transactions)
 
930
    inc_event_relay_log_pos();
 
931
  else
 
932
  {
 
933
    inc_group_relay_log_pos(event_master_log_pos);
 
934
    flush_relay_log_info(this);
 
935
    /*
 
936
      Note that Rotate_log_event::do_apply_event() does not call this
 
937
      function, so there is no chance that a fake rotate event resets
 
938
      last_master_timestamp.  Note that we update without mutex
 
939
      (probably ok - except in some very rare cases, only consequence
 
940
      is that value may take some time to display in
 
941
      Seconds_Behind_Master - not critical).
 
942
    */
 
943
    if (!(event_creation_time == 0 && debug_not_change_ts_if_art_event > 0))
 
944
      last_master_timestamp= event_creation_time;
 
945
  }
 
946
}
 
947
 
 
948
void Relay_log_info::cleanup_context(Session *session, bool error)
 
949
{
 
950
  assert(sql_session == session);
 
951
  /*
 
952
    1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
 
953
    may have opened tables, which we cannot be sure have been closed (because
 
954
    maybe the Rows_log_event have not been found or will not be, because slave
 
955
    SQL thread is stopping, or relay log has a missing tail etc). So we close
 
956
    all thread's tables. And so the table mappings have to be cancelled.
 
957
    2) Rows_log_event::do_apply_event() may even have started statements or
 
958
    transactions on them, which we need to rollback in case of error.
 
959
    3) If finding a Format_description_log_event after a BEGIN, we also need
 
960
    to rollback before continuing with the next events.
 
961
    4) so we need this "context cleanup" function.
 
962
  */
 
963
  if (error)
 
964
  {
 
965
    ha_autocommit_or_rollback(session, 1); // if a "statement transaction"
 
966
    end_trans(session, ROLLBACK); // if a "real transaction"
 
967
  }
 
968
  m_table_map.clear_tables();
 
969
  close_thread_tables(session);
 
970
  clear_tables_to_lock();
 
971
  clear_flag(IN_STMT);
 
972
  /*
 
973
    Cleanup for the flags that have been set at do_apply_event.
 
974
  */
 
975
  session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
 
976
  session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
 
977
  last_event_start_time= 0;
 
978
  return;
 
979
}
 
980
 
 
981
 
 
982
bool Relay_log_info::is_in_group() const {
 
983
  return (sql_session->options & OPTION_BEGIN) ||
 
984
      (m_flags & (1UL << IN_STMT));
 
985
}
 
986
 
 
987
 
 
988
void Relay_log_info::clear_tables_to_lock()
 
989
{
 
990
  while (tables_to_lock)
 
991
  {
 
992
    unsigned char* to_free= reinterpret_cast<unsigned char*>(tables_to_lock);
 
993
    if (tables_to_lock->m_tabledef_valid)
 
994
    {
 
995
      tables_to_lock->m_tabledef.table_def::~table_def();
 
996
      tables_to_lock->m_tabledef_valid= false;
 
997
    }
 
998
    tables_to_lock=
 
999
      static_cast<RPL_TableList*>(tables_to_lock->next_global);
 
1000
    tables_to_lock_count--;
 
1001
    free(to_free);
 
1002
  }
 
1003
  assert(tables_to_lock == NULL && tables_to_lock_count == 0);
 
1004
}