~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/replication/rli.cc

  • Committer: Brian Aker
  • Date: 2009-01-23 02:15:04 UTC
  • mfrom: (798.2.32 drizzle)
  • Revision ID: brian@tangent.org-20090123021504-2j99e6hxab1ew601
Merge for replication removal.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2000-2003 MySQL AB
2
 
 
3
 
   This program is free software; you can redistribute it and/or modify
4
 
   it under the terms of the GNU General Public License as published by
5
 
   the Free Software Foundation; version 2 of the License.
6
 
 
7
 
   This program is distributed in the hope that it will be useful,
8
 
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 
   GNU General Public License for more details.
11
 
 
12
 
   You should have received a copy of the GNU General Public License
13
 
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
 
 
16
 
#include <drizzled/server_includes.h>
17
 
#include <drizzled/replication/mi.h>
18
 
#include <drizzled/replication/rli.h>
19
 
// For check_binlog_magic
20
 
#include <drizzled/replication/replication.h>
21
 
#include <drizzled/replication/utility.h>
22
 
#include <drizzled/data_home.h>
23
 
#include <drizzled/sql_parse.h>
24
 
#include <drizzled/gettext.h>
25
 
#include <drizzled/log_event.h>
26
 
#include <drizzled/sql_base.h>
27
 
 
28
 
#if TIME_WITH_SYS_TIME
29
 
# include <sys/time.h>
30
 
# include <time.h>
31
 
#else
32
 
# if HAVE_SYS_TIME_H
33
 
#  include <sys/time.h>
34
 
# else
35
 
#  include <time.h>
36
 
# endif
37
 
#endif
38
 
 
39
 
 
40
 
 
41
 
static int32_t count_relay_log_space(Relay_log_info* rli);
42
 
 
43
 
// Defined in slave.cc
44
 
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val);
45
 
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
46
 
                              const char *default_val);
47
 
 
48
 
 
49
 
Relay_log_info::Relay_log_info()
50
 
  :Slave_reporting_capability("SQL"),
51
 
   no_storage(false), replicate_same_server_id(::replicate_same_server_id),
52
 
   info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
53
 
   cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
54
 
#if defined(HAVE_purify) && HAVE_purify
55
 
   is_fake(false),
56
 
#endif
57
 
   group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
58
 
   last_master_timestamp(0), slave_skip_counter(0),
59
 
   abort_pos_wait(0), slave_run_id(0), sql_session(0),
60
 
   inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
61
 
   until_log_pos(0), retried_trans(0),
62
 
   tables_to_lock(0), tables_to_lock_count(0),
63
 
   last_event_start_time(0), m_flags(0)
64
 
{
65
 
  group_relay_log_name[0]= event_relay_log_name[0]=
66
 
    group_master_log_name[0]= 0;
67
 
  until_log_name[0]= ign_master_log_name_end[0]= 0;
68
 
  memset(&info_file, 0, sizeof(info_file));
69
 
  memset(&cache_buf, 0, sizeof(cache_buf));
70
 
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
71
 
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
72
 
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
73
 
  pthread_cond_init(&data_cond, NULL);
74
 
  pthread_cond_init(&start_cond, NULL);
75
 
  pthread_cond_init(&stop_cond, NULL);
76
 
  pthread_cond_init(&log_space_cond, NULL);
77
 
  relay_log.init_pthread_objects();
78
 
  return;
79
 
}
80
 
 
81
 
 
82
 
Relay_log_info::~Relay_log_info()
83
 
{
84
 
  pthread_mutex_destroy(&run_lock);
85
 
  pthread_mutex_destroy(&data_lock);
86
 
  pthread_mutex_destroy(&log_space_lock);
87
 
  pthread_cond_destroy(&data_cond);
88
 
  pthread_cond_destroy(&start_cond);
89
 
  pthread_cond_destroy(&stop_cond);
90
 
  pthread_cond_destroy(&log_space_cond);
91
 
  relay_log.cleanup();
92
 
  return;
93
 
}
94
 
 
95
 
 
96
 
int32_t init_relay_log_info(Relay_log_info* rli,
97
 
                            const char* info_fname)
98
 
{
99
 
  char fname[FN_REFLEN+128];
100
 
  int32_t info_fd;
101
 
  const char* msg = 0;
102
 
  int32_t error = 0;
103
 
  assert(!rli->no_storage);         // Don't init if there is no storage
104
 
 
105
 
  if (rli->inited)                       // Set if this function called
106
 
    return(0);
107
 
  fn_format(fname, info_fname, drizzle_data_home, "", 4+32);
108
 
  pthread_mutex_lock(&rli->data_lock);
109
 
  info_fd = rli->info_fd;
110
 
  rli->cur_log_fd = -1;
111
 
  rli->slave_skip_counter=0;
112
 
  rli->abort_pos_wait=0;
113
 
  rli->log_space_limit= relay_log_space_limit;
114
 
  rli->log_space_total= 0;
115
 
  rli->tables_to_lock= 0;
116
 
  rli->tables_to_lock_count= 0;
117
 
 
118
 
  /*
119
 
    The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
120
 
    Note that the I/O thread flushes it to disk after writing every
121
 
    event, in flush_master_info(mi, 1).
122
 
  */
123
 
 
124
 
  /*
125
 
    For the maximum log size, we choose max_relay_log_size if it is
126
 
    non-zero, max_binlog_size otherwise. If later the user does SET
127
 
    GLOBAL on one of these variables, fix_max_binlog_size and
128
 
    fix_max_relay_log_size will reconsider the choice (for example
129
 
    if the user changes max_relay_log_size to zero, we have to
130
 
    switch to using max_binlog_size for the relay log) and update
131
 
    rli->relay_log.max_size (and drizzle_bin_log.max_size).
132
 
  */
133
 
  {
134
 
    char buf[FN_REFLEN];
135
 
    const char *ln;
136
 
    static bool name_warning_sent= 0;
137
 
    ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
138
 
                                     1, buf);
139
 
    /* We send the warning only at startup, not after every RESET SLAVE */
140
 
    if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
141
 
    {
142
 
      /*
143
 
        User didn't give us info to name the relay log index file.
144
 
        Picking `hostname`-relay-bin.index like we do, causes replication to
145
 
        fail if this slave's hostname is changed later. So, we would like to
146
 
        instead require a name. But as we don't want to break many existing
147
 
        setups, we only give warning, not error.
148
 
      */
149
 
      errmsg_printf(ERRMSG_LVL_WARN, _("Neither --relay-log nor --relay-log-index were used;"
150
 
                        " so replication "
151
 
                        "may break when this Drizzle server acts as a "
152
 
                        "slave and has his hostname changed!! Please "
153
 
                        "use '--relay-log=%s' to avoid this problem."), ln);
154
 
      name_warning_sent= 1;
155
 
    }
156
 
    /*
157
 
      note, that if open() fails, we'll still have index file open
158
 
      but a destructor will take care of that
159
 
    */
160
 
    if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln) ||
161
 
        rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
162
 
                            (max_relay_log_size ? max_relay_log_size :
163
 
                            max_binlog_size), 1))
164
 
    {
165
 
      pthread_mutex_unlock(&rli->data_lock);
166
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed in open_log() called from "
167
 
                        "init_relay_log_info()"));
168
 
      return(1);
169
 
    }
170
 
  }
171
 
 
172
 
  /* if file does not exist */
173
 
  if (access(fname,F_OK))
174
 
  {
175
 
    /* Create a new file */
176
 
  }
177
 
  else // file exists
178
 
  {
179
 
    /* Open up fname here and pull out the relay.info data */
180
 
  }
181
 
 
182
 
  /*
183
 
    Now change the cache from READ to WRITE - must do this
184
 
    before flush_relay_log_info
185
 
  */
186
 
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
187
 
  if ((error= flush_relay_log_info(rli)))
188
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to flush relay log info file"));
189
 
  if (count_relay_log_space(rli))
190
 
  {
191
 
    msg=_("Error counting relay log space");
192
 
    goto err;
193
 
  }
194
 
  rli->inited= 1;
195
 
  pthread_mutex_unlock(&rli->data_lock);
196
 
  return(error);
197
 
 
198
 
err:
199
 
  errmsg_printf(ERRMSG_LVL_ERROR, "%s",msg);
200
 
  end_io_cache(&rli->info_file);
201
 
  if (info_fd >= 0)
202
 
    my_close(info_fd, MYF(0));
203
 
  rli->info_fd= -1;
204
 
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
205
 
  pthread_mutex_unlock(&rli->data_lock);
206
 
  return(1);
207
 
}
208
 
 
209
 
 
210
 
static inline int32_t add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
211
 
{
212
 
  struct stat s;
213
 
  if (stat(linfo->log_file_name,&s))
214
 
  {
215
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("log %s listed in the index, but failed to stat"),
216
 
                    linfo->log_file_name);
217
 
    return(1);
218
 
  }
219
 
  rli->log_space_total += s.st_size;
220
 
  return(0);
221
 
}
222
 
 
223
 
 
224
 
static int32_t count_relay_log_space(Relay_log_info* rli)
225
 
{
226
 
  LOG_INFO linfo;
227
 
  rli->log_space_total= 0;
228
 
  if (rli->relay_log.find_log_pos(&linfo, NULL, 1))
229
 
  {
230
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Could not find first log while counting relay "
231
 
                      "log space"));
232
 
    return(1);
233
 
  }
234
 
  do
235
 
  {
236
 
    if (add_relay_log(rli,&linfo))
237
 
      return(1);
238
 
  } while (!rli->relay_log.find_next_log(&linfo, 1));
239
 
  /*
240
 
     As we have counted everything, including what may have written in a
241
 
     preceding write, we must reset bytes_written, or we may count some space
242
 
     twice.
243
 
  */
244
 
  rli->relay_log.reset_bytes_written();
245
 
  return(0);
246
 
}
247
 
 
248
 
 
249
 
/*
250
 
   Reset UNTIL condition for Relay_log_info
251
 
 
252
 
   SYNOPSYS
253
 
    clear_until_condition()
254
 
      rli - Relay_log_info structure where UNTIL condition should be reset
255
 
 */
256
 
 
257
 
void Relay_log_info::clear_until_condition()
258
 
{
259
 
  until_condition= Relay_log_info::UNTIL_NONE;
260
 
  until_log_name[0]= 0;
261
 
  until_log_pos= 0;
262
 
  return;
263
 
}
264
 
 
265
 
 
266
 
/*
267
 
  Open the given relay log
268
 
 
269
 
  SYNOPSIS
270
 
    init_relay_log_pos()
271
 
    rli                 Relay information (will be initialized)
272
 
    log                 Name of relay log file to read from. NULL = First log
273
 
    pos                 Position in relay log file
274
 
    need_data_lock      Set to 1 if this functions should do mutex locks
275
 
    errmsg              Store pointer to error message here
276
 
    look_for_description_event
277
 
                        1 if we should look for such an event. We only need
278
 
                        this when the SQL thread starts and opens an existing
279
 
                        relay log and has to execute it (possibly from an
280
 
                        offset >4); then we need to read the first event of
281
 
                        the relay log to be able to parse the events we have
282
 
                        to execute.
283
 
 
284
 
  DESCRIPTION
285
 
  - Close old open relay log files.
286
 
  - If we are using the same relay log as the running IO-thread, then set
287
 
    rli->cur_log to point to the same IO_CACHE entry.
288
 
  - If not, open the 'log' binary file.
289
 
 
290
 
  TODO
291
 
    - check proper initialization of group_master_log_name/group_master_log_pos
292
 
 
293
 
  RETURN VALUES
294
 
    0   ok
295
 
    1   error.  errmsg is set to point to the error message
296
 
*/
297
 
 
298
 
int32_t init_relay_log_pos(Relay_log_info* rli,const char* log,
299
 
                       uint64_t pos, bool need_data_lock,
300
 
                       const char** errmsg,
301
 
                       bool look_for_description_event)
302
 
{
303
 
  *errmsg=0;
304
 
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
305
 
 
306
 
  if (need_data_lock)
307
 
    pthread_mutex_lock(&rli->data_lock);
308
 
 
309
 
  /*
310
 
    Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
311
 
    is, too, and init_slave() too; these 2 functions allocate a description
312
 
    event in init_relay_log_pos, which is not freed by the terminating SQL slave
313
 
    thread as that thread is not started by these functions. So we have to free
314
 
    the description_event here, in case, so that there is no memory leak in
315
 
    running, say, CHANGE MASTER.
316
 
  */
317
 
  delete rli->relay_log.description_event_for_exec;
318
 
  /*
319
 
    By default the relay log is in binlog format 3 (4.0).
320
 
    Even if format is 4, this will work enough to read the first event
321
 
    (Format_desc) (remember that format 4 is just lenghtened compared to format
322
 
    3; format 3 is a prefix of format 4).
323
 
  */
324
 
  rli->relay_log.description_event_for_exec= new
325
 
    Format_description_log_event(3);
326
 
 
327
 
  pthread_mutex_lock(log_lock);
328
 
 
329
 
  /* Close log file and free buffers if it's already open */
330
 
  if (rli->cur_log_fd >= 0)
331
 
  {
332
 
    end_io_cache(&rli->cache_buf);
333
 
    my_close(rli->cur_log_fd, MYF(MY_WME));
334
 
    rli->cur_log_fd = -1;
335
 
  }
336
 
 
337
 
  rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
338
 
 
339
 
  /*
340
 
    Test to see if the previous run was with the skip of purging
341
 
    If yes, we do not purge when we restart
342
 
  */
343
 
  if (rli->relay_log.find_log_pos(&rli->linfo, NULL, 1))
344
 
  {
345
 
    *errmsg="Could not find first log during relay log initialization";
346
 
    goto err;
347
 
  }
348
 
 
349
 
  if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
350
 
  {
351
 
    *errmsg="Could not find target log during relay log initialization";
352
 
    goto err;
353
 
  }
354
 
 
355
 
  rli->group_relay_log_name.assign(rli->linfo.log_file_name);
356
 
  rli->event_relay_log_name.assign(rli->linfo.log_file_name);
357
 
 
358
 
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
359
 
  {
360
 
    /*
361
 
      The IO thread is using this log file.
362
 
      In this case, we will use the same IO_CACHE pointer to
363
 
      read data as the IO thread is using to write data.
364
 
    */
365
 
    my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
366
 
    if (check_binlog_magic(rli->cur_log,errmsg))
367
 
      goto err;
368
 
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
369
 
  }
370
 
  else
371
 
  {
372
 
    /*
373
 
      Open the relay log and set rli->cur_log to point at this one
374
 
    */
375
 
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
376
 
                                     rli->linfo.log_file_name,errmsg)) < 0)
377
 
      goto err;
378
 
    rli->cur_log = &rli->cache_buf;
379
 
  }
380
 
  /*
381
 
    In all cases, check_binlog_magic() has been called so we're at offset 4 for
382
 
    sure.
383
 
  */
384
 
  if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
385
 
  {
386
 
    Log_event* ev;
387
 
    while (look_for_description_event)
388
 
    {
389
 
      /*
390
 
        Read the possible Format_description_log_event; if position
391
 
        was 4, no need, it will be read naturally.
392
 
      */
393
 
      if (my_b_tell(rli->cur_log) >= pos)
394
 
        break;
395
 
 
396
 
      /*
397
 
        Because of we have rli->data_lock and log_lock, we can safely read an
398
 
        event
399
 
      */
400
 
      if (!(ev=Log_event::read_log_event(rli->cur_log,0,
401
 
                                         rli->relay_log.description_event_for_exec)))
402
 
      {
403
 
        if (rli->cur_log->error) /* not EOF */
404
 
        {
405
 
          *errmsg= "I/O error reading event at position 4";
406
 
          goto err;
407
 
        }
408
 
        break;
409
 
      }
410
 
      else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
411
 
      {
412
 
        delete rli->relay_log.description_event_for_exec;
413
 
        rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
414
 
        /*
415
 
          As ev was returned by read_log_event, it has passed is_valid(), so
416
 
          malloc() in ctor worked, no need to check again.
417
 
        */
418
 
        /*
419
 
          Ok, we found a Format_description event. But it is not sure that this
420
 
          describes the whole relay log; indeed, one can have this sequence
421
 
          (starting from position 4):
422
 
          Format_desc (of slave)
423
 
          Rotate (of master)
424
 
          Format_desc (of master)
425
 
          So the Format_desc which really describes the rest of the relay log
426
 
          is the 3rd event (it can't be further than that, because we rotate
427
 
          the relay log when we queue a Rotate event from the master).
428
 
          But what describes the Rotate is the first Format_desc.
429
 
          So what we do is:
430
 
          go on searching for Format_description events, until you exceed the
431
 
          position (argument 'pos') or until you find another event than Rotate
432
 
          or Format_desc.
433
 
        */
434
 
      }
435
 
      else
436
 
      {
437
 
        look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
438
 
        delete ev;
439
 
      }
440
 
    }
441
 
    my_b_seek(rli->cur_log,(off_t)pos);
442
 
 
443
 
  }
444
 
 
445
 
err:
446
 
  /*
447
 
    If we don't purge, we can't honour relay_log_space_limit ;
448
 
    silently discard it
449
 
  */
450
 
  if (!relay_log_purge)
451
 
    rli->log_space_limit= 0;
452
 
  pthread_cond_broadcast(&rli->data_cond);
453
 
 
454
 
  pthread_mutex_unlock(log_lock);
455
 
 
456
 
  if (need_data_lock)
457
 
    pthread_mutex_unlock(&rli->data_lock);
458
 
  if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
459
 
    *errmsg= "Invalid Format_description log event; could be out of memory";
460
 
 
461
 
  return ((*errmsg) ? 1 : 0);
462
 
}
463
 
 
464
 
 
465
 
/*
466
 
  Waits until the SQL thread reaches (has executed up to) the
467
 
  log/position or timed out.
468
 
 
469
 
  SYNOPSIS
470
 
    wait_for_pos()
471
 
    session             client thread that sent SELECT MASTER_POS_WAIT
472
 
    log_name        log name to wait for
473
 
    log_pos         position to wait for
474
 
    timeout         timeout in seconds before giving up waiting
475
 
 
476
 
  NOTES
477
 
    timeout is int64_t whereas it should be uint32_t ; but this is
478
 
    to catch if the user submitted a negative timeout.
479
 
 
480
 
  RETURN VALUES
481
 
    -2          improper arguments (log_pos<0)
482
 
                or slave not running, or master info changed
483
 
                during the function's execution,
484
 
                or client thread killed. -2 is translated to NULL by caller
485
 
    -1          timed out
486
 
    >=0         number of log events the function had to wait
487
 
                before reaching the desired log/position
488
 
 */
489
 
 
490
 
int32_t Relay_log_info::wait_for_pos(Session* session, String* log_name,
491
 
                                    int64_t log_pos,
492
 
                                    int64_t timeout)
493
 
{
494
 
  int32_t event_count = 0;
495
 
  uint32_t init_abort_pos_wait;
496
 
  int32_t error=0;
497
 
  struct timespec abstime; // for timeout checking
498
 
  const char *msg;
499
 
 
500
 
  if (!inited)
501
 
    return(-2);
502
 
 
503
 
  set_timespec(abstime,timeout);
504
 
  pthread_mutex_lock(&data_lock);
505
 
  msg= session->enter_cond(&data_cond, &data_lock,
506
 
                       "Waiting for the slave SQL thread to "
507
 
                       "advance position");
508
 
  /*
509
 
     This function will abort when it notices that some CHANGE MASTER or
510
 
     RESET MASTER has changed the master info.
511
 
     To catch this, these commands modify abort_pos_wait ; We just monitor
512
 
     abort_pos_wait and see if it has changed.
513
 
     Why do we have this mechanism instead of simply monitoring slave_running
514
 
     in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
515
 
     the SQL thread be stopped?
516
 
     This is becasue if someones does:
517
 
     STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
518
 
     the change may happen very quickly and we may not notice that
519
 
     slave_running briefly switches between 1/0/1.
520
 
  */
521
 
  init_abort_pos_wait= abort_pos_wait;
522
 
 
523
 
  /*
524
 
    We'll need to
525
 
    handle all possible log names comparisons (e.g. 999 vs 1000).
526
 
    We use uint32_t for string->number conversion ; this is no
527
 
    stronger limitation than in find_uniq_filename in sql/log.cc
528
 
  */
529
 
  uint32_t log_name_extension;
530
 
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
531
 
  size_t len= cmin(log_name->length(), (uint32_t)FN_REFLEN-1);
532
 
  strncpy(log_name_tmp, log_name->ptr(), len);
533
 
  log_name_tmp[len]= '\0';
534
 
 
535
 
  char *p= fn_ext(log_name_tmp);
536
 
  char *p_end;
537
 
  if (!*p || log_pos<0)
538
 
  {
539
 
    error= -2; //means improper arguments
540
 
    goto err;
541
 
  }
542
 
  // Convert 0-3 to 4
543
 
  log_pos= cmax(log_pos, (int64_t)BIN_LOG_HEADER_SIZE);
544
 
  /* p points to '.' */
545
 
  log_name_extension= strtoul(++p, &p_end, 10);
546
 
  /*
547
 
    p_end points to the first invalid character.
548
 
    If it equals to p, no digits were found, error.
549
 
    If it contains '\0' it means conversion went ok.
550
 
  */
551
 
  if (p_end==p || *p_end)
552
 
  {
553
 
    error= -2;
554
 
    goto err;
555
 
  }
556
 
 
557
 
  /* The "compare and wait" main loop */
558
 
  while (!session->killed &&
559
 
         init_abort_pos_wait == abort_pos_wait &&
560
 
         slave_running)
561
 
  {
562
 
    bool pos_reached;
563
 
    int32_t cmp_result= 0;
564
 
 
565
 
    /*
566
 
      group_master_log_name can be "", if we are just after a fresh
567
 
      replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
568
 
      (before we have executed one Rotate event from the master) or
569
 
      (rare) if the user is doing a weird slave setup (see next
570
 
      paragraph).  If group_master_log_name is "", we assume we don't
571
 
      have enough info to do the comparison yet, so we just wait until
572
 
      more data. In this case master_log_pos is always 0 except if
573
 
      somebody (wrongly) sets this slave to be a slave of itself
574
 
      without using --replicate-same-server-id (an unsupported
575
 
      configuration which does nothing), then group_master_log_pos
576
 
      will grow and group_master_log_name will stay "".
577
 
    */
578
 
    if (group_master_log_name.length())
579
 
    {
580
 
      const char *basename= (group_master_log_name.c_str() +
581
 
                             dirname_length(group_master_log_name.c_str()));
582
 
      /*
583
 
        First compare the parts before the extension.
584
 
        Find the dot in the master's log basename,
585
 
        and protect against user's input error :
586
 
        if the names do not match up to '.' included, return error
587
 
      */
588
 
      char *q= (char*)(fn_ext(basename)+1);
589
 
      if (strncmp(basename, log_name_tmp, (int32_t)(q-basename)))
590
 
      {
591
 
        error= -2;
592
 
        break;
593
 
      }
594
 
      // Now compare extensions.
595
 
      char *q_end;
596
 
      uint32_t group_master_log_name_extension= strtoul(q, &q_end, 10);
597
 
      if (group_master_log_name_extension < log_name_extension)
598
 
        cmp_result= -1 ;
599
 
      else
600
 
        cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
601
 
 
602
 
      pos_reached= ((!cmp_result && group_master_log_pos >= (uint64_t)log_pos) ||
603
 
                    cmp_result > 0);
604
 
      if (pos_reached || session->killed)
605
 
        break;
606
 
    }
607
 
 
608
 
    //wait for master update, with optional timeout.
609
 
 
610
 
    /*
611
 
      We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
612
 
      will wake us up.
613
 
    */
614
 
    if (timeout > 0)
615
 
    {
616
 
      /*
617
 
        Note that pthread_cond_timedwait checks for the timeout
618
 
        before for the condition ; i.e. it returns ETIMEDOUT
619
 
        if the system time equals or exceeds the time specified by abstime
620
 
        before the condition variable is signaled or broadcast, _or_ if
621
 
        the absolute time specified by abstime has already passed at the time
622
 
        of the call.
623
 
        For that reason, pthread_cond_timedwait will do the "timeoutting" job
624
 
        even if its condition is always immediately signaled (case of a loaded
625
 
        master).
626
 
      */
627
 
      error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
628
 
    }
629
 
    else
630
 
      pthread_cond_wait(&data_cond, &data_lock);
631
 
    if (error == ETIMEDOUT || error == ETIME)
632
 
    {
633
 
      error= -1;
634
 
      break;
635
 
    }
636
 
    error=0;
637
 
    event_count++;
638
 
  }
639
 
 
640
 
err:
641
 
  session->exit_cond(msg);
642
 
  if (session->killed || init_abort_pos_wait != abort_pos_wait ||
643
 
      !slave_running)
644
 
  {
645
 
    error= -2;
646
 
  }
647
 
  return( error ? error : event_count );
648
 
}
649
 
 
650
 
 
651
 
void Relay_log_info::inc_group_relay_log_pos(uint64_t log_pos,
652
 
                                                bool skip_lock)
653
 
{
654
 
  if (!skip_lock)
655
 
    pthread_mutex_lock(&data_lock);
656
 
  inc_event_relay_log_pos();
657
 
  group_relay_log_pos= event_relay_log_pos;
658
 
  group_relay_log_name.assign(event_relay_log_name);
659
 
 
660
 
  notify_group_relay_log_name_update();
661
 
 
662
 
  /*
663
 
    If the slave does not support transactions and replicates a transaction,
664
 
    users should not trust group_master_log_pos (which they can display with
665
 
    SHOW SLAVE STATUS or read from relay-log.info), because to compute
666
 
    group_master_log_pos the slave relies on log_pos stored in the master's
667
 
    binlog, but if we are in a master's transaction these positions are always
668
 
    the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
669
 
    not advance as it should on the non-transactional slave (it advances by
670
 
    big leaps, whereas it should advance by small leaps).
671
 
  */
672
 
  /*
673
 
    In 4.x we used the event's len to compute the positions here. This is
674
 
    wrong if the event was 3.23/4.0 and has been converted to 5.0, because
675
 
    then the event's len is not what is was in the master's binlog, so this
676
 
    will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
677
 
    replication: Exec_master_log_pos is wrong). Only way to solve this is to
678
 
    have the original offset of the end of the event the relay log. This is
679
 
    what we do in 5.0: log_pos has become "end_log_pos" (because the real use
680
 
    of log_pos in 4.0 was to compute the end_log_pos; so better to store
681
 
    end_log_pos instead of begin_log_pos.
682
 
    If we had not done this fix here, the problem would also have appeared
683
 
    when the slave and master are 5.0 but with different event length (for
684
 
    example the slave is more recent than the master and features the event
685
 
    UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
686
 
    SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
687
 
    value which would lead to badly broken replication.
688
 
    Even the relay_log_pos will be corrupted in this case, because the len is
689
 
    the relay log is not "val".
690
 
    With the end_log_pos solution, we avoid computations involving lengthes.
691
 
  */
692
 
  if (log_pos) // 3.23 binlogs don't have log_posx
693
 
  {
694
 
    group_master_log_pos= log_pos;
695
 
  }
696
 
  pthread_cond_broadcast(&data_cond);
697
 
  if (!skip_lock)
698
 
    pthread_mutex_unlock(&data_lock);
699
 
  return;
700
 
}
701
 
 
702
 
 
703
 
void Relay_log_info::close_temporary_tables()
704
 
{
705
 
  Table *table,*next;
706
 
 
707
 
  for (table=save_temporary_tables ; table ; table=next)
708
 
  {
709
 
    next=table->next;
710
 
    /*
711
 
      Don't ask for disk deletion. For now, anyway they will be deleted when
712
 
      slave restarts, but it is a better intention to not delete them.
713
 
    */
714
 
    close_temporary(table, 1, 0);
715
 
  }
716
 
  save_temporary_tables= 0;
717
 
  slave_open_temp_tables= 0;
718
 
  return;
719
 
}
720
 
 
721
 
/*
722
 
  purge_relay_logs()
723
 
 
724
 
  NOTES
725
 
    Assumes to have a run lock on rli and that no slave thread are running.
726
 
*/
727
 
 
728
 
int32_t purge_relay_logs(Relay_log_info* rli, Session *session, bool just_reset,
729
 
                     const char** errmsg)
730
 
{
731
 
  int32_t error=0;
732
 
 
733
 
  /*
734
 
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
735
 
    Indeed, rli->inited==0 does not imply that they already are empty.
736
 
    It could be that slave's info initialization partly succeeded :
737
 
    for example if relay-log.info existed but *relay-bin*.*
738
 
    have been manually removed, init_relay_log_info reads the old
739
 
    relay-log.info and fills rli->master_log_*, then init_relay_log_info
740
 
    checks for the existence of the relay log, this fails and
741
 
    init_relay_log_info leaves rli->inited to 0.
742
 
    In that pathological case, rli->master_log_pos* will be properly reinited
743
 
    at the next START SLAVE (as RESET SLAVE or CHANGE
744
 
    MASTER, the callers of purge_relay_logs, will delete bogus *.info files
745
 
    or replace them with correct files), however if the user does SHOW SLAVE
746
 
    STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
747
 
    In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
748
 
    to display fine in any case.
749
 
  */
750
 
 
751
 
  rli->group_master_log_name[0]= 0;
752
 
  rli->group_master_log_pos= 0;
753
 
 
754
 
  if (!rli->inited)
755
 
  {
756
 
    return(0);
757
 
  }
758
 
 
759
 
  assert(rli->slave_running == 0);
760
 
  assert(rli->mi->slave_running == 0);
761
 
 
762
 
  rli->slave_skip_counter=0;
763
 
  pthread_mutex_lock(&rli->data_lock);
764
 
 
765
 
  /*
766
 
    we close the relay log fd possibly left open by the slave SQL thread,
767
 
    to be able to delete it; the relay log fd possibly left open by the slave
768
 
    I/O thread will be closed naturally in reset_logs() by the
769
 
    close(LOG_CLOSE_TO_BE_OPENED) call
770
 
  */
771
 
  if (rli->cur_log_fd >= 0)
772
 
  {
773
 
    end_io_cache(&rli->cache_buf);
774
 
    my_close(rli->cur_log_fd, MYF(MY_WME));
775
 
    rli->cur_log_fd= -1;
776
 
  }
777
 
 
778
 
  if (rli->relay_log.reset_logs(session))
779
 
  {
780
 
    *errmsg = "Failed during log reset";
781
 
    error=1;
782
 
    goto err;
783
 
  }
784
 
  /* Save name of used relay log file */
785
 
  rli->group_relay_log_name.assign(rli->relay_log.get_log_fname());
786
 
  rli->event_relay_log_name.assign(rli->relay_log.get_log_fname());
787
 
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
788
 
  if (count_relay_log_space(rli))
789
 
  {
790
 
    *errmsg= "Error counting relay log space";
791
 
    goto err;
792
 
  }
793
 
  if (!just_reset)
794
 
    error= init_relay_log_pos(rli, rli->group_relay_log_name.c_str(),
795
 
                              rli->group_relay_log_pos,
796
 
                              0 /* do not need data lock */, errmsg, 0);
797
 
 
798
 
err:
799
 
  pthread_mutex_unlock(&rli->data_lock);
800
 
  return(error);
801
 
}
802
 
 
803
 
 
804
 
/*
805
 
     Check if condition stated in UNTIL clause of START SLAVE is reached.
806
 
   SYNOPSYS
807
 
     Relay_log_info::is_until_satisfied()
808
 
     master_beg_pos    position of the beginning of to be executed event
809
 
                       (not log_pos member of the event that points to the
810
 
                        beginning of the following event)
811
 
 
812
 
 
813
 
   DESCRIPTION
814
 
     Checks if UNTIL condition is reached. Uses caching result of last
815
 
     comparison of current log file name and target log file name. So cached
816
 
     value should be invalidated if current log file name changes
817
 
     (see Relay_log_info::notify_... functions).
818
 
 
819
 
     This caching is needed to avoid of expensive string comparisons and
820
 
     strtol() conversions needed for log names comparison. We don't need to
821
 
     compare them each time this function is called, we only need to do this
822
 
     when current log name changes. If we have UNTIL_MASTER_POS condition we
823
 
     need to do this only after Rotate_log_event::do_apply_event() (which is
824
 
     rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
825
 
     condition then we should invalidate cached comarison value after
826
 
     inc_group_relay_log_pos() which called for each group of events (so we
827
 
     have some benefit if we have something like queries that use
828
 
     autoincrement or if we have transactions).
829
 
 
830
 
     Should be called ONLY if until_condition != UNTIL_NONE !
831
 
   RETURN VALUE
832
 
     true - condition met or error happened (condition seems to have
833
 
            bad log file name)
834
 
     false - condition not met
835
 
*/
836
 
 
837
 
bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
838
 
{
839
 
  const char *log_name;
840
 
  uint64_t log_pos;
841
 
 
842
 
  assert(until_condition != UNTIL_NONE);
843
 
 
844
 
  if (until_condition == UNTIL_MASTER_POS)
845
 
  {
846
 
    log_name= group_master_log_name.c_str();
847
 
    log_pos= master_beg_pos;
848
 
  }
849
 
  else
850
 
  { /* until_condition == UNTIL_RELAY_POS */
851
 
    log_name= group_relay_log_name.c_str();
852
 
    log_pos= group_relay_log_pos;
853
 
  }
854
 
 
855
 
  if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
856
 
  {
857
 
    /*
858
 
      We have no cached comparison results so we should compare log names
859
 
      and cache result.
860
 
      If we are after RESET SLAVE, and the SQL slave thread has not processed
861
 
      any event yet, it could be that group_master_log_name is "". In that case,
862
 
      just wait for more events (as there is no sensible comparison to do).
863
 
    */
864
 
 
865
 
    if (*log_name)
866
 
    {
867
 
      const char *basename= log_name + dirname_length(log_name);
868
 
 
869
 
      const char *q= (const char*)(fn_ext(basename)+1);
870
 
      if (strncmp(basename, until_log_name, (int32_t)(q-basename)) == 0)
871
 
      {
872
 
        /* Now compare extensions. */
873
 
        char *q_end;
874
 
        uint32_t log_name_extension= strtoul(q, &q_end, 10);
875
 
        if (log_name_extension < until_log_name_extension)
876
 
          until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
877
 
        else
878
 
          until_log_names_cmp_result=
879
 
            (log_name_extension > until_log_name_extension) ?
880
 
            UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
881
 
      }
882
 
      else
883
 
      {
884
 
        /* Probably error so we aborting */
885
 
        errmsg_printf(ERRMSG_LVL_ERROR, _("Slave SQL thread is stopped because UNTIL "
886
 
                          "condition is bad."));
887
 
        return(true);
888
 
      }
889
 
    }
890
 
    else
891
 
      return(until_log_pos == 0);
892
 
  }
893
 
 
894
 
  return(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
895
 
           log_pos >= until_log_pos) ||
896
 
          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
897
 
}
898
 
 
899
 
 
900
 
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
901
 
                                  time_t event_creation_time)
902
 
{
903
 
  extern uint32_t debug_not_change_ts_if_art_event;
904
 
  clear_flag(IN_STMT);
905
 
 
906
 
  /*
907
 
    If in a transaction, and if the slave supports transactions, just
908
 
    inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
909
 
    (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
910
 
    BEGIN/COMMIT, not with SET AUTOCOMMIT= .
911
 
 
912
 
    CAUTION: opt_using_transactions means innodb || bdb ; suppose the
913
 
    master supports InnoDB and BDB, but the slave supports only BDB,
914
 
    problems will arise: - suppose an InnoDB table is created on the
915
 
    master, - then it will be MyISAM on the slave - but as
916
 
    opt_using_transactions is true, the slave will believe he is
917
 
    transactional with the MyISAM table. And problems will come when
918
 
    one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
919
 
    resume at BEGIN whereas there has not been any rollback).  This is
920
 
    the problem of using opt_using_transactions instead of a finer
921
 
    "does the slave support _transactional handler used on the
922
 
    master_".
923
 
 
924
 
    More generally, we'll have problems when a query mixes a
925
 
    transactional handler and MyISAM and STOP SLAVE is issued in the
926
 
    middle of the "transaction". START SLAVE will resume at BEGIN
927
 
    while the MyISAM table has already been updated.
928
 
  */
929
 
  if ((sql_session->options & OPTION_BEGIN) && opt_using_transactions)
930
 
    inc_event_relay_log_pos();
931
 
  else
932
 
  {
933
 
    inc_group_relay_log_pos(event_master_log_pos);
934
 
    flush_relay_log_info(this);
935
 
    /*
936
 
      Note that Rotate_log_event::do_apply_event() does not call this
937
 
      function, so there is no chance that a fake rotate event resets
938
 
      last_master_timestamp.  Note that we update without mutex
939
 
      (probably ok - except in some very rare cases, only consequence
940
 
      is that value may take some time to display in
941
 
      Seconds_Behind_Master - not critical).
942
 
    */
943
 
    if (!(event_creation_time == 0 && debug_not_change_ts_if_art_event > 0))
944
 
      last_master_timestamp= event_creation_time;
945
 
  }
946
 
}
947
 
 
948
 
void Relay_log_info::cleanup_context(Session *session, bool error)
949
 
{
950
 
  assert(sql_session == session);
951
 
  /*
952
 
    1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
953
 
    may have opened tables, which we cannot be sure have been closed (because
954
 
    maybe the Rows_log_event have not been found or will not be, because slave
955
 
    SQL thread is stopping, or relay log has a missing tail etc). So we close
956
 
    all thread's tables. And so the table mappings have to be cancelled.
957
 
    2) Rows_log_event::do_apply_event() may even have started statements or
958
 
    transactions on them, which we need to rollback in case of error.
959
 
    3) If finding a Format_description_log_event after a BEGIN, we also need
960
 
    to rollback before continuing with the next events.
961
 
    4) so we need this "context cleanup" function.
962
 
  */
963
 
  if (error)
964
 
  {
965
 
    ha_autocommit_or_rollback(session, 1); // if a "statement transaction"
966
 
    end_trans(session, ROLLBACK); // if a "real transaction"
967
 
  }
968
 
  m_table_map.clear_tables();
969
 
  close_thread_tables(session);
970
 
  clear_tables_to_lock();
971
 
  clear_flag(IN_STMT);
972
 
  /*
973
 
    Cleanup for the flags that have been set at do_apply_event.
974
 
  */
975
 
  session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
976
 
  session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
977
 
  last_event_start_time= 0;
978
 
  return;
979
 
}
980
 
 
981
 
 
982
 
bool Relay_log_info::is_in_group() const {
983
 
  return (sql_session->options & OPTION_BEGIN) ||
984
 
      (m_flags & (1UL << IN_STMT));
985
 
}
986
 
 
987
 
 
988
 
void Relay_log_info::clear_tables_to_lock()
989
 
{
990
 
  while (tables_to_lock)
991
 
  {
992
 
    unsigned char* to_free= reinterpret_cast<unsigned char*>(tables_to_lock);
993
 
    if (tables_to_lock->m_tabledef_valid)
994
 
    {
995
 
      tables_to_lock->m_tabledef.table_def::~table_def();
996
 
      tables_to_lock->m_tabledef_valid= false;
997
 
    }
998
 
    tables_to_lock=
999
 
      static_cast<RPL_TableList*>(tables_to_lock->next_global);
1000
 
    tables_to_lock_count--;
1001
 
    free(to_free);
1002
 
  }
1003
 
  assert(tables_to_lock == NULL && tables_to_lock_count == 0);
1004
 
}