~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/rpl_rli.cc

  • Committer: Monty Taylor
  • Date: 2009-03-25 21:06:47 UTC
  • mto: This revision was merged to the branch mainline in revision 964.
  • Revision ID: mordred@inaugust.com-20090325210647-7j1tm98gvct3jxsu
Removed legacy_db_type.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2000-2003 MySQL AB
2
 
 
3
 
   This program is free software; you can redistribute it and/or modify
4
 
   it under the terms of the GNU General Public License as published by
5
 
   the Free Software Foundation; version 2 of the License.
6
 
 
7
 
   This program is distributed in the hope that it will be useful,
8
 
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 
   GNU General Public License for more details.
11
 
 
12
 
   You should have received a copy of the GNU General Public License
13
 
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
 
 
16
 
#include <drizzled/server_includes.h>
17
 
 
18
 
#include "rpl_mi.h"
19
 
#include "rpl_rli.h"
20
 
#include "sql_repl.h"  // For check_binlog_magic
21
 
#include "rpl_utility.h"
22
 
 
23
 
#include <libdrizzle/gettext.h>
24
 
 
25
 
static int32_t count_relay_log_space(Relay_log_info* rli);
26
 
 
27
 
// Defined in slave.cc
28
 
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val);
29
 
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
30
 
                          const char *default_val);
31
 
 
32
 
 
33
 
Relay_log_info::Relay_log_info()
34
 
  :Slave_reporting_capability("SQL"),
35
 
   no_storage(false), replicate_same_server_id(::replicate_same_server_id),
36
 
   info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
37
 
#if defined(HAVE_purify) && HAVE_purify
38
 
   is_fake(false),
39
 
#endif
40
 
   cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
41
 
   group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
42
 
   last_master_timestamp(0), slave_skip_counter(0),
43
 
   abort_pos_wait(0), slave_run_id(0), sql_thd(0),
44
 
   inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
45
 
   until_log_pos(0), retried_trans(0),
46
 
   tables_to_lock(0), tables_to_lock_count(0),
47
 
   last_event_start_time(0), m_flags(0)
48
 
{
49
 
  group_relay_log_name[0]= event_relay_log_name[0]=
50
 
    group_master_log_name[0]= 0;
51
 
  until_log_name[0]= ign_master_log_name_end[0]= 0;
52
 
  memset(&info_file, 0, sizeof(info_file));
53
 
  memset(&cache_buf, 0, sizeof(cache_buf));
54
 
  cached_charset_invalidate();
55
 
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
56
 
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
57
 
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
58
 
  pthread_cond_init(&data_cond, NULL);
59
 
  pthread_cond_init(&start_cond, NULL);
60
 
  pthread_cond_init(&stop_cond, NULL);
61
 
  pthread_cond_init(&log_space_cond, NULL);
62
 
  relay_log.init_pthread_objects();
63
 
  return;
64
 
}
65
 
 
66
 
 
67
 
Relay_log_info::~Relay_log_info()
68
 
{
69
 
  pthread_mutex_destroy(&run_lock);
70
 
  pthread_mutex_destroy(&data_lock);
71
 
  pthread_mutex_destroy(&log_space_lock);
72
 
  pthread_cond_destroy(&data_cond);
73
 
  pthread_cond_destroy(&start_cond);
74
 
  pthread_cond_destroy(&stop_cond);
75
 
  pthread_cond_destroy(&log_space_cond);
76
 
  relay_log.cleanup();
77
 
  return;
78
 
}
79
 
 
80
 
 
81
 
int32_t init_relay_log_info(Relay_log_info* rli,
82
 
                        const char* info_fname)
83
 
{
84
 
  char fname[FN_REFLEN+128];
85
 
  int32_t info_fd;
86
 
  const char* msg = 0;
87
 
  int32_t error = 0;
88
 
  assert(!rli->no_storage);         // Don't init if there is no storage
89
 
 
90
 
  if (rli->inited)                       // Set if this function called
91
 
    return(0);
92
 
  fn_format(fname, info_fname, mysql_data_home, "", 4+32);
93
 
  pthread_mutex_lock(&rli->data_lock);
94
 
  info_fd = rli->info_fd;
95
 
  rli->cur_log_fd = -1;
96
 
  rli->slave_skip_counter=0;
97
 
  rli->abort_pos_wait=0;
98
 
  rli->log_space_limit= relay_log_space_limit;
99
 
  rli->log_space_total= 0;
100
 
  rli->tables_to_lock= 0;
101
 
  rli->tables_to_lock_count= 0;
102
 
 
103
 
  /*
104
 
    The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
105
 
    Note that the I/O thread flushes it to disk after writing every
106
 
    event, in flush_master_info(mi, 1).
107
 
  */
108
 
 
109
 
  /*
110
 
    For the maximum log size, we choose max_relay_log_size if it is
111
 
    non-zero, max_binlog_size otherwise. If later the user does SET
112
 
    GLOBAL on one of these variables, fix_max_binlog_size and
113
 
    fix_max_relay_log_size will reconsider the choice (for example
114
 
    if the user changes max_relay_log_size to zero, we have to
115
 
    switch to using max_binlog_size for the relay log) and update
116
 
    rli->relay_log.max_size (and mysql_bin_log.max_size).
117
 
  */
118
 
  {
119
 
    char buf[FN_REFLEN];
120
 
    const char *ln;
121
 
    static bool name_warning_sent= 0;
122
 
    ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
123
 
                                     1, buf);
124
 
    /* We send the warning only at startup, not after every RESET SLAVE */
125
 
    if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
126
 
    {
127
 
      /*
128
 
        User didn't give us info to name the relay log index file.
129
 
        Picking `hostname`-relay-bin.index like we do, causes replication to
130
 
        fail if this slave's hostname is changed later. So, we would like to
131
 
        instead require a name. But as we don't want to break many existing
132
 
        setups, we only give warning, not error.
133
 
      */
134
 
      sql_print_warning(_("Neither --relay-log nor --relay-log-index were used;"
135
 
                        " so replication "
136
 
                        "may break when this MySQL server acts as a "
137
 
                        "slave and has his hostname changed!! Please "
138
 
                        "use '--relay-log=%s' to avoid this problem."), ln);
139
 
      name_warning_sent= 1;
140
 
    }
141
 
    /*
142
 
      note, that if open() fails, we'll still have index file open
143
 
      but a destructor will take care of that
144
 
    */
145
 
    if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln) ||
146
 
        rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
147
 
                            (max_relay_log_size ? max_relay_log_size :
148
 
                            max_binlog_size), 1))
149
 
    {
150
 
      pthread_mutex_unlock(&rli->data_lock);
151
 
      sql_print_error(_("Failed in open_log() called from "
152
 
                        "init_relay_log_info()"));
153
 
      return(1);
154
 
    }
155
 
  }
156
 
 
157
 
  /* if file does not exist */
158
 
  if (access(fname,F_OK))
159
 
  {
160
 
    /*
161
 
      If someone removed the file from underneath our feet, just close
162
 
      the old descriptor and re-create the old file
163
 
    */
164
 
    if (info_fd >= 0)
165
 
      my_close(info_fd, MYF(MY_WME));
166
 
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
167
 
    {
168
 
      sql_print_error(_("Failed to create a new relay log info file "
169
 
                        "( file '%s', errno %d)"), fname, my_errno);
170
 
      msg= current_thd->main_da.message();
171
 
      goto err;
172
 
    }
173
 
    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
174
 
                      MYF(MY_WME)))
175
 
    {
176
 
      sql_print_error(_("Failed to create a cache on relay log info file '%s'"),
177
 
                      fname);
178
 
      msg= current_thd->main_da.message();
179
 
      goto err;
180
 
    }
181
 
 
182
 
    /* Init relay log with first entry in the relay index file */
183
 
    if (init_relay_log_pos(rli,NULL,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
184
 
                           &msg, 0))
185
 
    {
186
 
      sql_print_error(_("Failed to open the relay log 'FIRST' (relay_log_pos 4)"));
187
 
      goto err;
188
 
    }
189
 
    rli->group_master_log_name[0]= 0;
190
 
    rli->group_master_log_pos= 0;
191
 
    rli->info_fd= info_fd;
192
 
  }
193
 
  else // file exists
194
 
  {
195
 
    if (info_fd >= 0)
196
 
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
197
 
    else
198
 
    {
199
 
      int32_t error=0;
200
 
      if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
201
 
      {
202
 
        sql_print_error(_("Failed to open the existing relay log info "
203
 
                          "file '%s' (errno %d)"),
204
 
                        fname, my_errno);
205
 
        error= 1;
206
 
      }
207
 
      else if (init_io_cache(&rli->info_file, info_fd,
208
 
                             IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
209
 
      {
210
 
        sql_print_error(_("Failed to create a cache on relay log info "
211
 
                          "file '%s'"),
212
 
                        fname);
213
 
        error= 1;
214
 
      }
215
 
      if (error)
216
 
      {
217
 
        if (info_fd >= 0)
218
 
          my_close(info_fd, MYF(0));
219
 
        rli->info_fd= -1;
220
 
        rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
221
 
        pthread_mutex_unlock(&rli->data_lock);
222
 
        return(1);
223
 
      }
224
 
    }
225
 
 
226
 
    rli->info_fd = info_fd;
227
 
    int32_t relay_log_pos, master_log_pos;
228
 
    if (init_strvar_from_file(rli->group_relay_log_name,
229
 
                              sizeof(rli->group_relay_log_name),
230
 
                              &rli->info_file, "") ||
231
 
       init_intvar_from_file(&relay_log_pos,
232
 
                             &rli->info_file, BIN_LOG_HEADER_SIZE) ||
233
 
       init_strvar_from_file(rli->group_master_log_name,
234
 
                             sizeof(rli->group_master_log_name),
235
 
                             &rli->info_file, "") ||
236
 
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
237
 
    {
238
 
      msg="Error reading slave log configuration";
239
 
      goto err;
240
 
    }
241
 
    strmake(rli->event_relay_log_name,rli->group_relay_log_name,
242
 
            sizeof(rli->event_relay_log_name)-1);
243
 
    rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
244
 
    rli->group_master_log_pos= master_log_pos;
245
 
 
246
 
    if (init_relay_log_pos(rli,
247
 
                           rli->group_relay_log_name,
248
 
                           rli->group_relay_log_pos,
249
 
                           0 /* no data lock*/,
250
 
                           &msg, 0))
251
 
    {
252
 
      char llbuf[22];
253
 
      sql_print_error(_("Failed to open the relay log '%s' (relay_log_pos %s)"),
254
 
                      rli->group_relay_log_name,
255
 
                      llstr(rli->group_relay_log_pos, llbuf));
256
 
      goto err;
257
 
    }
258
 
  }
259
 
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
260
 
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
261
 
 
262
 
  /*
263
 
    Now change the cache from READ to WRITE - must do this
264
 
    before flush_relay_log_info
265
 
  */
266
 
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
267
 
  if ((error= flush_relay_log_info(rli)))
268
 
    sql_print_error(_("Failed to flush relay log info file"));
269
 
  if (count_relay_log_space(rli))
270
 
  {
271
 
    msg=_("Error counting relay log space");
272
 
    goto err;
273
 
  }
274
 
  rli->inited= 1;
275
 
  pthread_mutex_unlock(&rli->data_lock);
276
 
  return(error);
277
 
 
278
 
err:
279
 
  sql_print_error(msg);
280
 
  end_io_cache(&rli->info_file);
281
 
  if (info_fd >= 0)
282
 
    my_close(info_fd, MYF(0));
283
 
  rli->info_fd= -1;
284
 
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
285
 
  pthread_mutex_unlock(&rli->data_lock);
286
 
  return(1);
287
 
}
288
 
 
289
 
 
290
 
static inline int32_t add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
291
 
{
292
 
  struct stat s;
293
 
  if (stat(linfo->log_file_name,&s))
294
 
  {
295
 
    sql_print_error(_("log %s listed in the index, but failed to stat"),
296
 
                    linfo->log_file_name);
297
 
    return(1);
298
 
  }
299
 
  rli->log_space_total += s.st_size;
300
 
  return(0);
301
 
}
302
 
 
303
 
 
304
 
static int32_t count_relay_log_space(Relay_log_info* rli)
305
 
{
306
 
  LOG_INFO linfo;
307
 
  rli->log_space_total= 0;
308
 
  if (rli->relay_log.find_log_pos(&linfo, NULL, 1))
309
 
  {
310
 
    sql_print_error(_("Could not find first log while counting relay "
311
 
                      "log space"));
312
 
    return(1);
313
 
  }
314
 
  do
315
 
  {
316
 
    if (add_relay_log(rli,&linfo))
317
 
      return(1);
318
 
  } while (!rli->relay_log.find_next_log(&linfo, 1));
319
 
  /*
320
 
     As we have counted everything, including what may have written in a
321
 
     preceding write, we must reset bytes_written, or we may count some space
322
 
     twice.
323
 
  */
324
 
  rli->relay_log.reset_bytes_written();
325
 
  return(0);
326
 
}
327
 
 
328
 
 
329
 
/*
330
 
   Reset UNTIL condition for Relay_log_info
331
 
 
332
 
   SYNOPSYS
333
 
    clear_until_condition()
334
 
      rli - Relay_log_info structure where UNTIL condition should be reset
335
 
 */
336
 
 
337
 
void Relay_log_info::clear_until_condition()
338
 
{
339
 
  until_condition= Relay_log_info::UNTIL_NONE;
340
 
  until_log_name[0]= 0;
341
 
  until_log_pos= 0;
342
 
  return;
343
 
}
344
 
 
345
 
 
346
 
/*
347
 
  Open the given relay log
348
 
 
349
 
  SYNOPSIS
350
 
    init_relay_log_pos()
351
 
    rli                 Relay information (will be initialized)
352
 
    log                 Name of relay log file to read from. NULL = First log
353
 
    pos                 Position in relay log file
354
 
    need_data_lock      Set to 1 if this functions should do mutex locks
355
 
    errmsg              Store pointer to error message here
356
 
    look_for_description_event
357
 
                        1 if we should look for such an event. We only need
358
 
                        this when the SQL thread starts and opens an existing
359
 
                        relay log and has to execute it (possibly from an
360
 
                        offset >4); then we need to read the first event of
361
 
                        the relay log to be able to parse the events we have
362
 
                        to execute.
363
 
 
364
 
  DESCRIPTION
365
 
  - Close old open relay log files.
366
 
  - If we are using the same relay log as the running IO-thread, then set
367
 
    rli->cur_log to point to the same IO_CACHE entry.
368
 
  - If not, open the 'log' binary file.
369
 
 
370
 
  TODO
371
 
    - check proper initialization of group_master_log_name/group_master_log_pos
372
 
 
373
 
  RETURN VALUES
374
 
    0   ok
375
 
    1   error.  errmsg is set to point to the error message
376
 
*/
377
 
 
378
 
int32_t init_relay_log_pos(Relay_log_info* rli,const char* log,
379
 
                       uint64_t pos, bool need_data_lock,
380
 
                       const char** errmsg,
381
 
                       bool look_for_description_event)
382
 
{
383
 
  *errmsg=0;
384
 
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
385
 
 
386
 
  if (need_data_lock)
387
 
    pthread_mutex_lock(&rli->data_lock);
388
 
 
389
 
  /*
390
 
    Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
391
 
    is, too, and init_slave() too; these 2 functions allocate a description
392
 
    event in init_relay_log_pos, which is not freed by the terminating SQL slave
393
 
    thread as that thread is not started by these functions. So we have to free
394
 
    the description_event here, in case, so that there is no memory leak in
395
 
    running, say, CHANGE MASTER.
396
 
  */
397
 
  delete rli->relay_log.description_event_for_exec;
398
 
  /*
399
 
    By default the relay log is in binlog format 3 (4.0).
400
 
    Even if format is 4, this will work enough to read the first event
401
 
    (Format_desc) (remember that format 4 is just lenghtened compared to format
402
 
    3; format 3 is a prefix of format 4).
403
 
  */
404
 
  rli->relay_log.description_event_for_exec= new
405
 
    Format_description_log_event(3);
406
 
 
407
 
  pthread_mutex_lock(log_lock);
408
 
 
409
 
  /* Close log file and free buffers if it's already open */
410
 
  if (rli->cur_log_fd >= 0)
411
 
  {
412
 
    end_io_cache(&rli->cache_buf);
413
 
    my_close(rli->cur_log_fd, MYF(MY_WME));
414
 
    rli->cur_log_fd = -1;
415
 
  }
416
 
 
417
 
  rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
418
 
 
419
 
  /*
420
 
    Test to see if the previous run was with the skip of purging
421
 
    If yes, we do not purge when we restart
422
 
  */
423
 
  if (rli->relay_log.find_log_pos(&rli->linfo, NULL, 1))
424
 
  {
425
 
    *errmsg="Could not find first log during relay log initialization";
426
 
    goto err;
427
 
  }
428
 
 
429
 
  if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
430
 
  {
431
 
    *errmsg="Could not find target log during relay log initialization";
432
 
    goto err;
433
 
  }
434
 
  strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
435
 
          sizeof(rli->group_relay_log_name)-1);
436
 
  strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
437
 
          sizeof(rli->event_relay_log_name)-1);
438
 
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
439
 
  {
440
 
    /*
441
 
      The IO thread is using this log file.
442
 
      In this case, we will use the same IO_CACHE pointer to
443
 
      read data as the IO thread is using to write data.
444
 
    */
445
 
    my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
446
 
    if (check_binlog_magic(rli->cur_log,errmsg))
447
 
      goto err;
448
 
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
449
 
  }
450
 
  else
451
 
  {
452
 
    /*
453
 
      Open the relay log and set rli->cur_log to point at this one
454
 
    */
455
 
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
456
 
                                     rli->linfo.log_file_name,errmsg)) < 0)
457
 
      goto err;
458
 
    rli->cur_log = &rli->cache_buf;
459
 
  }
460
 
  /*
461
 
    In all cases, check_binlog_magic() has been called so we're at offset 4 for
462
 
    sure.
463
 
  */
464
 
  if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
465
 
  {
466
 
    Log_event* ev;
467
 
    while (look_for_description_event)
468
 
    {
469
 
      /*
470
 
        Read the possible Format_description_log_event; if position
471
 
        was 4, no need, it will be read naturally.
472
 
      */
473
 
      if (my_b_tell(rli->cur_log) >= pos)
474
 
        break;
475
 
 
476
 
      /*
477
 
        Because of we have rli->data_lock and log_lock, we can safely read an
478
 
        event
479
 
      */
480
 
      if (!(ev=Log_event::read_log_event(rli->cur_log,0,
481
 
                                         rli->relay_log.description_event_for_exec)))
482
 
      {
483
 
        if (rli->cur_log->error) /* not EOF */
484
 
        {
485
 
          *errmsg= "I/O error reading event at position 4";
486
 
          goto err;
487
 
        }
488
 
        break;
489
 
      }
490
 
      else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
491
 
      {
492
 
        delete rli->relay_log.description_event_for_exec;
493
 
        rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
494
 
        /*
495
 
          As ev was returned by read_log_event, it has passed is_valid(), so
496
 
          my_malloc() in ctor worked, no need to check again.
497
 
        */
498
 
        /*
499
 
          Ok, we found a Format_description event. But it is not sure that this
500
 
          describes the whole relay log; indeed, one can have this sequence
501
 
          (starting from position 4):
502
 
          Format_desc (of slave)
503
 
          Rotate (of master)
504
 
          Format_desc (of master)
505
 
          So the Format_desc which really describes the rest of the relay log
506
 
          is the 3rd event (it can't be further than that, because we rotate
507
 
          the relay log when we queue a Rotate event from the master).
508
 
          But what describes the Rotate is the first Format_desc.
509
 
          So what we do is:
510
 
          go on searching for Format_description events, until you exceed the
511
 
          position (argument 'pos') or until you find another event than Rotate
512
 
          or Format_desc.
513
 
        */
514
 
      }
515
 
      else
516
 
      {
517
 
        look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
518
 
        delete ev;
519
 
      }
520
 
    }
521
 
    my_b_seek(rli->cur_log,(off_t)pos);
522
 
 
523
 
  }
524
 
 
525
 
err:
526
 
  /*
527
 
    If we don't purge, we can't honour relay_log_space_limit ;
528
 
    silently discard it
529
 
  */
530
 
  if (!relay_log_purge)
531
 
    rli->log_space_limit= 0;
532
 
  pthread_cond_broadcast(&rli->data_cond);
533
 
 
534
 
  pthread_mutex_unlock(log_lock);
535
 
 
536
 
  if (need_data_lock)
537
 
    pthread_mutex_unlock(&rli->data_lock);
538
 
  if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
539
 
    *errmsg= "Invalid Format_description log event; could be out of memory";
540
 
 
541
 
  return ((*errmsg) ? 1 : 0);
542
 
}
543
 
 
544
 
 
545
 
/*
546
 
  Waits until the SQL thread reaches (has executed up to) the
547
 
  log/position or timed out.
548
 
 
549
 
  SYNOPSIS
550
 
    wait_for_pos()
551
 
    thd             client thread that sent SELECT MASTER_POS_WAIT
552
 
    log_name        log name to wait for
553
 
    log_pos         position to wait for
554
 
    timeout         timeout in seconds before giving up waiting
555
 
 
556
 
  NOTES
557
 
    timeout is int64_t whereas it should be uint32_t ; but this is
558
 
    to catch if the user submitted a negative timeout.
559
 
 
560
 
  RETURN VALUES
561
 
    -2          improper arguments (log_pos<0)
562
 
                or slave not running, or master info changed
563
 
                during the function's execution,
564
 
                or client thread killed. -2 is translated to NULL by caller
565
 
    -1          timed out
566
 
    >=0         number of log events the function had to wait
567
 
                before reaching the desired log/position
568
 
 */
569
 
 
570
 
int32_t Relay_log_info::wait_for_pos(THD* thd, String* log_name,
571
 
                                    int64_t log_pos,
572
 
                                    int64_t timeout)
573
 
{
574
 
  int32_t event_count = 0;
575
 
  uint32_t init_abort_pos_wait;
576
 
  int32_t error=0;
577
 
  struct timespec abstime; // for timeout checking
578
 
  const char *msg;
579
 
 
580
 
  if (!inited)
581
 
    return(-2);
582
 
 
583
 
  set_timespec(abstime,timeout);
584
 
  pthread_mutex_lock(&data_lock);
585
 
  msg= thd->enter_cond(&data_cond, &data_lock,
586
 
                       "Waiting for the slave SQL thread to "
587
 
                       "advance position");
588
 
  /*
589
 
     This function will abort when it notices that some CHANGE MASTER or
590
 
     RESET MASTER has changed the master info.
591
 
     To catch this, these commands modify abort_pos_wait ; We just monitor
592
 
     abort_pos_wait and see if it has changed.
593
 
     Why do we have this mechanism instead of simply monitoring slave_running
594
 
     in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
595
 
     the SQL thread be stopped?
596
 
     This is becasue if someones does:
597
 
     STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
598
 
     the change may happen very quickly and we may not notice that
599
 
     slave_running briefly switches between 1/0/1.
600
 
  */
601
 
  init_abort_pos_wait= abort_pos_wait;
602
 
 
603
 
  /*
604
 
    We'll need to
605
 
    handle all possible log names comparisons (e.g. 999 vs 1000).
606
 
    We use uint32_t for string->number conversion ; this is no
607
 
    stronger limitation than in find_uniq_filename in sql/log.cc
608
 
  */
609
 
  uint32_t log_name_extension;
610
 
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
611
 
 
612
 
  strmake(log_name_tmp, log_name->ptr(), cmin(log_name->length(), (uint32_t)FN_REFLEN-1));
613
 
 
614
 
  char *p= fn_ext(log_name_tmp);
615
 
  char *p_end;
616
 
  if (!*p || log_pos<0)
617
 
  {
618
 
    error= -2; //means improper arguments
619
 
    goto err;
620
 
  }
621
 
  // Convert 0-3 to 4
622
 
  log_pos= cmax(log_pos, (int64_t)BIN_LOG_HEADER_SIZE);
623
 
  /* p points to '.' */
624
 
  log_name_extension= strtoul(++p, &p_end, 10);
625
 
  /*
626
 
    p_end points to the first invalid character.
627
 
    If it equals to p, no digits were found, error.
628
 
    If it contains '\0' it means conversion went ok.
629
 
  */
630
 
  if (p_end==p || *p_end)
631
 
  {
632
 
    error= -2;
633
 
    goto err;
634
 
  }
635
 
 
636
 
  /* The "compare and wait" main loop */
637
 
  while (!thd->killed &&
638
 
         init_abort_pos_wait == abort_pos_wait &&
639
 
         slave_running)
640
 
  {
641
 
    bool pos_reached;
642
 
    int32_t cmp_result= 0;
643
 
 
644
 
    /*
645
 
      group_master_log_name can be "", if we are just after a fresh
646
 
      replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
647
 
      (before we have executed one Rotate event from the master) or
648
 
      (rare) if the user is doing a weird slave setup (see next
649
 
      paragraph).  If group_master_log_name is "", we assume we don't
650
 
      have enough info to do the comparison yet, so we just wait until
651
 
      more data. In this case master_log_pos is always 0 except if
652
 
      somebody (wrongly) sets this slave to be a slave of itself
653
 
      without using --replicate-same-server-id (an unsupported
654
 
      configuration which does nothing), then group_master_log_pos
655
 
      will grow and group_master_log_name will stay "".
656
 
    */
657
 
    if (*group_master_log_name)
658
 
    {
659
 
      char *basename= (group_master_log_name +
660
 
                       dirname_length(group_master_log_name));
661
 
      /*
662
 
        First compare the parts before the extension.
663
 
        Find the dot in the master's log basename,
664
 
        and protect against user's input error :
665
 
        if the names do not match up to '.' included, return error
666
 
      */
667
 
      char *q= (char*)(fn_ext(basename)+1);
668
 
      if (strncmp(basename, log_name_tmp, (int32_t)(q-basename)))
669
 
      {
670
 
        error= -2;
671
 
        break;
672
 
      }
673
 
      // Now compare extensions.
674
 
      char *q_end;
675
 
      uint32_t group_master_log_name_extension= strtoul(q, &q_end, 10);
676
 
      if (group_master_log_name_extension < log_name_extension)
677
 
        cmp_result= -1 ;
678
 
      else
679
 
        cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
680
 
 
681
 
      pos_reached= ((!cmp_result && group_master_log_pos >= (uint64_t)log_pos) ||
682
 
                    cmp_result > 0);
683
 
      if (pos_reached || thd->killed)
684
 
        break;
685
 
    }
686
 
 
687
 
    //wait for master update, with optional timeout.
688
 
 
689
 
    /*
690
 
      We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
691
 
      will wake us up.
692
 
    */
693
 
    if (timeout > 0)
694
 
    {
695
 
      /*
696
 
        Note that pthread_cond_timedwait checks for the timeout
697
 
        before for the condition ; i.e. it returns ETIMEDOUT
698
 
        if the system time equals or exceeds the time specified by abstime
699
 
        before the condition variable is signaled or broadcast, _or_ if
700
 
        the absolute time specified by abstime has already passed at the time
701
 
        of the call.
702
 
        For that reason, pthread_cond_timedwait will do the "timeoutting" job
703
 
        even if its condition is always immediately signaled (case of a loaded
704
 
        master).
705
 
      */
706
 
      error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
707
 
    }
708
 
    else
709
 
      pthread_cond_wait(&data_cond, &data_lock);
710
 
    if (error == ETIMEDOUT || error == ETIME)
711
 
    {
712
 
      error= -1;
713
 
      break;
714
 
    }
715
 
    error=0;
716
 
    event_count++;
717
 
  }
718
 
 
719
 
err:
720
 
  thd->exit_cond(msg);
721
 
  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
722
 
      !slave_running)
723
 
  {
724
 
    error= -2;
725
 
  }
726
 
  return( error ? error : event_count );
727
 
}
728
 
 
729
 
 
730
 
void Relay_log_info::inc_group_relay_log_pos(uint64_t log_pos,
731
 
                                                bool skip_lock)
732
 
{
733
 
  if (!skip_lock)
734
 
    pthread_mutex_lock(&data_lock);
735
 
  inc_event_relay_log_pos();
736
 
  group_relay_log_pos= event_relay_log_pos;
737
 
  strmake(group_relay_log_name,event_relay_log_name,
738
 
          sizeof(group_relay_log_name)-1);
739
 
 
740
 
  notify_group_relay_log_name_update();
741
 
 
742
 
  /*
743
 
    If the slave does not support transactions and replicates a transaction,
744
 
    users should not trust group_master_log_pos (which they can display with
745
 
    SHOW SLAVE STATUS or read from relay-log.info), because to compute
746
 
    group_master_log_pos the slave relies on log_pos stored in the master's
747
 
    binlog, but if we are in a master's transaction these positions are always
748
 
    the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
749
 
    not advance as it should on the non-transactional slave (it advances by
750
 
    big leaps, whereas it should advance by small leaps).
751
 
  */
752
 
  /*
753
 
    In 4.x we used the event's len to compute the positions here. This is
754
 
    wrong if the event was 3.23/4.0 and has been converted to 5.0, because
755
 
    then the event's len is not what is was in the master's binlog, so this
756
 
    will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
757
 
    replication: Exec_master_log_pos is wrong). Only way to solve this is to
758
 
    have the original offset of the end of the event the relay log. This is
759
 
    what we do in 5.0: log_pos has become "end_log_pos" (because the real use
760
 
    of log_pos in 4.0 was to compute the end_log_pos; so better to store
761
 
    end_log_pos instead of begin_log_pos.
762
 
    If we had not done this fix here, the problem would also have appeared
763
 
    when the slave and master are 5.0 but with different event length (for
764
 
    example the slave is more recent than the master and features the event
765
 
    UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
766
 
    SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
767
 
    value which would lead to badly broken replication.
768
 
    Even the relay_log_pos will be corrupted in this case, because the len is
769
 
    the relay log is not "val".
770
 
    With the end_log_pos solution, we avoid computations involving lengthes.
771
 
  */
772
 
  if (log_pos) // 3.23 binlogs don't have log_posx
773
 
  {
774
 
    group_master_log_pos= log_pos;
775
 
  }
776
 
  pthread_cond_broadcast(&data_cond);
777
 
  if (!skip_lock)
778
 
    pthread_mutex_unlock(&data_lock);
779
 
  return;
780
 
}
781
 
 
782
 
 
783
 
void Relay_log_info::close_temporary_tables()
784
 
{
785
 
  Table *table,*next;
786
 
 
787
 
  for (table=save_temporary_tables ; table ; table=next)
788
 
  {
789
 
    next=table->next;
790
 
    /*
791
 
      Don't ask for disk deletion. For now, anyway they will be deleted when
792
 
      slave restarts, but it is a better intention to not delete them.
793
 
    */
794
 
    close_temporary(table, 1, 0);
795
 
  }
796
 
  save_temporary_tables= 0;
797
 
  slave_open_temp_tables= 0;
798
 
  return;
799
 
}
800
 
 
801
 
/*
802
 
  purge_relay_logs()
803
 
 
804
 
  NOTES
805
 
    Assumes to have a run lock on rli and that no slave thread are running.
806
 
*/
807
 
 
808
 
int32_t purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
809
 
                     const char** errmsg)
810
 
{
811
 
  int32_t error=0;
812
 
 
813
 
  /*
814
 
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
815
 
    Indeed, rli->inited==0 does not imply that they already are empty.
816
 
    It could be that slave's info initialization partly succeeded :
817
 
    for example if relay-log.info existed but *relay-bin*.*
818
 
    have been manually removed, init_relay_log_info reads the old
819
 
    relay-log.info and fills rli->master_log_*, then init_relay_log_info
820
 
    checks for the existence of the relay log, this fails and
821
 
    init_relay_log_info leaves rli->inited to 0.
822
 
    In that pathological case, rli->master_log_pos* will be properly reinited
823
 
    at the next START SLAVE (as RESET SLAVE or CHANGE
824
 
    MASTER, the callers of purge_relay_logs, will delete bogus *.info files
825
 
    or replace them with correct files), however if the user does SHOW SLAVE
826
 
    STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
827
 
    In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
828
 
    to display fine in any case.
829
 
  */
830
 
 
831
 
  rli->group_master_log_name[0]= 0;
832
 
  rli->group_master_log_pos= 0;
833
 
 
834
 
  if (!rli->inited)
835
 
  {
836
 
    return(0);
837
 
  }
838
 
 
839
 
  assert(rli->slave_running == 0);
840
 
  assert(rli->mi->slave_running == 0);
841
 
 
842
 
  rli->slave_skip_counter=0;
843
 
  pthread_mutex_lock(&rli->data_lock);
844
 
 
845
 
  /*
846
 
    we close the relay log fd possibly left open by the slave SQL thread,
847
 
    to be able to delete it; the relay log fd possibly left open by the slave
848
 
    I/O thread will be closed naturally in reset_logs() by the
849
 
    close(LOG_CLOSE_TO_BE_OPENED) call
850
 
  */
851
 
  if (rli->cur_log_fd >= 0)
852
 
  {
853
 
    end_io_cache(&rli->cache_buf);
854
 
    my_close(rli->cur_log_fd, MYF(MY_WME));
855
 
    rli->cur_log_fd= -1;
856
 
  }
857
 
 
858
 
  if (rli->relay_log.reset_logs(thd))
859
 
  {
860
 
    *errmsg = "Failed during log reset";
861
 
    error=1;
862
 
    goto err;
863
 
  }
864
 
  /* Save name of used relay log file */
865
 
  strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
866
 
          sizeof(rli->group_relay_log_name)-1);
867
 
  strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
868
 
          sizeof(rli->event_relay_log_name)-1);
869
 
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
870
 
  if (count_relay_log_space(rli))
871
 
  {
872
 
    *errmsg= "Error counting relay log space";
873
 
    goto err;
874
 
  }
875
 
  if (!just_reset)
876
 
    error= init_relay_log_pos(rli, rli->group_relay_log_name,
877
 
                              rli->group_relay_log_pos,
878
 
                              0 /* do not need data lock */, errmsg, 0);
879
 
 
880
 
err:
881
 
  pthread_mutex_unlock(&rli->data_lock);
882
 
  return(error);
883
 
}
884
 
 
885
 
 
886
 
/*
887
 
     Check if condition stated in UNTIL clause of START SLAVE is reached.
888
 
   SYNOPSYS
889
 
     Relay_log_info::is_until_satisfied()
890
 
     master_beg_pos    position of the beginning of to be executed event
891
 
                       (not log_pos member of the event that points to the
892
 
                        beginning of the following event)
893
 
 
894
 
 
895
 
   DESCRIPTION
896
 
     Checks if UNTIL condition is reached. Uses caching result of last
897
 
     comparison of current log file name and target log file name. So cached
898
 
     value should be invalidated if current log file name changes
899
 
     (see Relay_log_info::notify_... functions).
900
 
 
901
 
     This caching is needed to avoid of expensive string comparisons and
902
 
     strtol() conversions needed for log names comparison. We don't need to
903
 
     compare them each time this function is called, we only need to do this
904
 
     when current log name changes. If we have UNTIL_MASTER_POS condition we
905
 
     need to do this only after Rotate_log_event::do_apply_event() (which is
906
 
     rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
907
 
     condition then we should invalidate cached comarison value after
908
 
     inc_group_relay_log_pos() which called for each group of events (so we
909
 
     have some benefit if we have something like queries that use
910
 
     autoincrement or if we have transactions).
911
 
 
912
 
     Should be called ONLY if until_condition != UNTIL_NONE !
913
 
   RETURN VALUE
914
 
     true - condition met or error happened (condition seems to have
915
 
            bad log file name)
916
 
     false - condition not met
917
 
*/
918
 
 
919
 
bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
920
 
{
921
 
  const char *log_name;
922
 
  uint64_t log_pos;
923
 
 
924
 
  assert(until_condition != UNTIL_NONE);
925
 
 
926
 
  if (until_condition == UNTIL_MASTER_POS)
927
 
  {
928
 
    log_name= group_master_log_name;
929
 
    log_pos= master_beg_pos;
930
 
  }
931
 
  else
932
 
  { /* until_condition == UNTIL_RELAY_POS */
933
 
    log_name= group_relay_log_name;
934
 
    log_pos= group_relay_log_pos;
935
 
  }
936
 
 
937
 
  if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
938
 
  {
939
 
    /*
940
 
      We have no cached comparison results so we should compare log names
941
 
      and cache result.
942
 
      If we are after RESET SLAVE, and the SQL slave thread has not processed
943
 
      any event yet, it could be that group_master_log_name is "". In that case,
944
 
      just wait for more events (as there is no sensible comparison to do).
945
 
    */
946
 
 
947
 
    if (*log_name)
948
 
    {
949
 
      const char *basename= log_name + dirname_length(log_name);
950
 
 
951
 
      const char *q= (const char*)(fn_ext(basename)+1);
952
 
      if (strncmp(basename, until_log_name, (int32_t)(q-basename)) == 0)
953
 
      {
954
 
        /* Now compare extensions. */
955
 
        char *q_end;
956
 
        uint32_t log_name_extension= strtoul(q, &q_end, 10);
957
 
        if (log_name_extension < until_log_name_extension)
958
 
          until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
959
 
        else
960
 
          until_log_names_cmp_result=
961
 
            (log_name_extension > until_log_name_extension) ?
962
 
            UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
963
 
      }
964
 
      else
965
 
      {
966
 
        /* Probably error so we aborting */
967
 
        sql_print_error(_("Slave SQL thread is stopped because UNTIL "
968
 
                          "condition is bad."));
969
 
        return(true);
970
 
      }
971
 
    }
972
 
    else
973
 
      return(until_log_pos == 0);
974
 
  }
975
 
 
976
 
  return(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
977
 
           log_pos >= until_log_pos) ||
978
 
          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
979
 
}
980
 
 
981
 
 
982
 
void Relay_log_info::cached_charset_invalidate()
983
 
{
984
 
  /* Full of zeroes means uninitialized. */
985
 
  memset(cached_charset, 0, sizeof(cached_charset));
986
 
  return;
987
 
}
988
 
 
989
 
 
990
 
bool Relay_log_info::cached_charset_compare(char *charset) const
991
 
{
992
 
  if (memcmp(cached_charset, charset, sizeof(cached_charset)))
993
 
  {
994
 
    memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
995
 
    return(1);
996
 
  }
997
 
  return(0);
998
 
}
999
 
 
1000
 
 
1001
 
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
1002
 
                                  time_t event_creation_time)
1003
 
{
1004
 
  extern uint32_t debug_not_change_ts_if_art_event;
1005
 
  clear_flag(IN_STMT);
1006
 
 
1007
 
  /*
1008
 
    If in a transaction, and if the slave supports transactions, just
1009
 
    inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
1010
 
    (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
1011
 
    BEGIN/COMMIT, not with SET AUTOCOMMIT= .
1012
 
 
1013
 
    CAUTION: opt_using_transactions means innodb || bdb ; suppose the
1014
 
    master supports InnoDB and BDB, but the slave supports only BDB,
1015
 
    problems will arise: - suppose an InnoDB table is created on the
1016
 
    master, - then it will be MyISAM on the slave - but as
1017
 
    opt_using_transactions is true, the slave will believe he is
1018
 
    transactional with the MyISAM table. And problems will come when
1019
 
    one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
1020
 
    resume at BEGIN whereas there has not been any rollback).  This is
1021
 
    the problem of using opt_using_transactions instead of a finer
1022
 
    "does the slave support _transactional handler used on the
1023
 
    master_".
1024
 
 
1025
 
    More generally, we'll have problems when a query mixes a
1026
 
    transactional handler and MyISAM and STOP SLAVE is issued in the
1027
 
    middle of the "transaction". START SLAVE will resume at BEGIN
1028
 
    while the MyISAM table has already been updated.
1029
 
  */
1030
 
  if ((sql_thd->options & OPTION_BEGIN) && opt_using_transactions)
1031
 
    inc_event_relay_log_pos();
1032
 
  else
1033
 
  {
1034
 
    inc_group_relay_log_pos(event_master_log_pos);
1035
 
    flush_relay_log_info(this);
1036
 
    /*
1037
 
      Note that Rotate_log_event::do_apply_event() does not call this
1038
 
      function, so there is no chance that a fake rotate event resets
1039
 
      last_master_timestamp.  Note that we update without mutex
1040
 
      (probably ok - except in some very rare cases, only consequence
1041
 
      is that value may take some time to display in
1042
 
      Seconds_Behind_Master - not critical).
1043
 
    */
1044
 
    if (!(event_creation_time == 0 && debug_not_change_ts_if_art_event > 0))
1045
 
      last_master_timestamp= event_creation_time;
1046
 
  }
1047
 
}
1048
 
 
1049
 
void Relay_log_info::cleanup_context(THD *thd, bool error)
1050
 
{
1051
 
  assert(sql_thd == thd);
1052
 
  /*
1053
 
    1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
1054
 
    may have opened tables, which we cannot be sure have been closed (because
1055
 
    maybe the Rows_log_event have not been found or will not be, because slave
1056
 
    SQL thread is stopping, or relay log has a missing tail etc). So we close
1057
 
    all thread's tables. And so the table mappings have to be cancelled.
1058
 
    2) Rows_log_event::do_apply_event() may even have started statements or
1059
 
    transactions on them, which we need to rollback in case of error.
1060
 
    3) If finding a Format_description_log_event after a BEGIN, we also need
1061
 
    to rollback before continuing with the next events.
1062
 
    4) so we need this "context cleanup" function.
1063
 
  */
1064
 
  if (error)
1065
 
  {
1066
 
    ha_autocommit_or_rollback(thd, 1); // if a "statement transaction"
1067
 
    end_trans(thd, ROLLBACK); // if a "real transaction"
1068
 
  }
1069
 
  m_table_map.clear_tables();
1070
 
  close_thread_tables(thd);
1071
 
  clear_tables_to_lock();
1072
 
  clear_flag(IN_STMT);
1073
 
  /*
1074
 
    Cleanup for the flags that have been set at do_apply_event.
1075
 
  */
1076
 
  thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1077
 
  thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1078
 
  last_event_start_time= 0;
1079
 
  return;
1080
 
}
1081
 
 
1082
 
void Relay_log_info::clear_tables_to_lock()
1083
 
{
1084
 
  while (tables_to_lock)
1085
 
  {
1086
 
    unsigned char* to_free= reinterpret_cast<unsigned char*>(tables_to_lock);
1087
 
    if (tables_to_lock->m_tabledef_valid)
1088
 
    {
1089
 
      tables_to_lock->m_tabledef.table_def::~table_def();
1090
 
      tables_to_lock->m_tabledef_valid= false;
1091
 
    }
1092
 
    tables_to_lock=
1093
 
      static_cast<RPL_TableList*>(tables_to_lock->next_global);
1094
 
    tables_to_lock_count--;
1095
 
    free(to_free);
1096
 
  }
1097
 
  assert(tables_to_lock == NULL && tables_to_lock_count == 0);
1098
 
}