~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/rpl_rli.cc

  • Committer: Brian Aker
  • Date: 2008-10-06 06:47:29 UTC
  • Revision ID: brian@tangent.org-20081006064729-2i9mhjkzyvow9xsm
Remove uint.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <drizzled/server_includes.h>
 
17
 
 
18
#include "rpl_mi.h"
 
19
#include "rpl_rli.h"
 
20
#include "sql_repl.h"  // For check_binlog_magic
 
21
#include "rpl_utility.h"
 
22
 
 
23
#include <libdrizzle/gettext.h>
 
24
 
 
25
static int32_t count_relay_log_space(Relay_log_info* rli);
 
26
 
 
27
// Defined in slave.cc
 
28
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val);
 
29
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
 
30
                          const char *default_val);
 
31
 
 
32
 
 
33
Relay_log_info::Relay_log_info()
 
34
  :Slave_reporting_capability("SQL"),
 
35
   no_storage(false), replicate_same_server_id(::replicate_same_server_id),
 
36
   info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
 
37
#if defined(HAVE_purify) && HAVE_purify
 
38
   is_fake(false),
 
39
#endif
 
40
   cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
 
41
   group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
 
42
   last_master_timestamp(0), slave_skip_counter(0),
 
43
   abort_pos_wait(0), slave_run_id(0), sql_thd(0),
 
44
   inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
 
45
   until_log_pos(0), retried_trans(0),
 
46
   tables_to_lock(0), tables_to_lock_count(0),
 
47
   last_event_start_time(0), m_flags(0)
 
48
{
 
49
  group_relay_log_name[0]= event_relay_log_name[0]=
 
50
    group_master_log_name[0]= 0;
 
51
  until_log_name[0]= ign_master_log_name_end[0]= 0;
 
52
  memset(&info_file, 0, sizeof(info_file));
 
53
  memset(&cache_buf, 0, sizeof(cache_buf));
 
54
  cached_charset_invalidate();
 
55
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
 
56
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
 
57
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
 
58
  pthread_cond_init(&data_cond, NULL);
 
59
  pthread_cond_init(&start_cond, NULL);
 
60
  pthread_cond_init(&stop_cond, NULL);
 
61
  pthread_cond_init(&log_space_cond, NULL);
 
62
  relay_log.init_pthread_objects();
 
63
  return;
 
64
}
 
65
 
 
66
 
 
67
Relay_log_info::~Relay_log_info()
 
68
{
 
69
  pthread_mutex_destroy(&run_lock);
 
70
  pthread_mutex_destroy(&data_lock);
 
71
  pthread_mutex_destroy(&log_space_lock);
 
72
  pthread_cond_destroy(&data_cond);
 
73
  pthread_cond_destroy(&start_cond);
 
74
  pthread_cond_destroy(&stop_cond);
 
75
  pthread_cond_destroy(&log_space_cond);
 
76
  relay_log.cleanup();
 
77
  return;
 
78
}
 
79
 
 
80
 
 
81
int32_t init_relay_log_info(Relay_log_info* rli,
 
82
                        const char* info_fname)
 
83
{
 
84
  char fname[FN_REFLEN+128];
 
85
  int32_t info_fd;
 
86
  const char* msg = 0;
 
87
  int32_t error = 0;
 
88
  assert(!rli->no_storage);         // Don't init if there is no storage
 
89
 
 
90
  if (rli->inited)                       // Set if this function called
 
91
    return(0);
 
92
  fn_format(fname, info_fname, mysql_data_home, "", 4+32);
 
93
  pthread_mutex_lock(&rli->data_lock);
 
94
  info_fd = rli->info_fd;
 
95
  rli->cur_log_fd = -1;
 
96
  rli->slave_skip_counter=0;
 
97
  rli->abort_pos_wait=0;
 
98
  rli->log_space_limit= relay_log_space_limit;
 
99
  rli->log_space_total= 0;
 
100
  rli->tables_to_lock= 0;
 
101
  rli->tables_to_lock_count= 0;
 
102
 
 
103
  /*
 
104
    The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
 
105
    Note that the I/O thread flushes it to disk after writing every
 
106
    event, in flush_master_info(mi, 1).
 
107
  */
 
108
 
 
109
  /*
 
110
    For the maximum log size, we choose max_relay_log_size if it is
 
111
    non-zero, max_binlog_size otherwise. If later the user does SET
 
112
    GLOBAL on one of these variables, fix_max_binlog_size and
 
113
    fix_max_relay_log_size will reconsider the choice (for example
 
114
    if the user changes max_relay_log_size to zero, we have to
 
115
    switch to using max_binlog_size for the relay log) and update
 
116
    rli->relay_log.max_size (and mysql_bin_log.max_size).
 
117
  */
 
118
  {
 
119
    char buf[FN_REFLEN];
 
120
    const char *ln;
 
121
    static bool name_warning_sent= 0;
 
122
    ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
 
123
                                     1, buf);
 
124
    /* We send the warning only at startup, not after every RESET SLAVE */
 
125
    if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
 
126
    {
 
127
      /*
 
128
        User didn't give us info to name the relay log index file.
 
129
        Picking `hostname`-relay-bin.index like we do, causes replication to
 
130
        fail if this slave's hostname is changed later. So, we would like to
 
131
        instead require a name. But as we don't want to break many existing
 
132
        setups, we only give warning, not error.
 
133
      */
 
134
      sql_print_warning(_("Neither --relay-log nor --relay-log-index were used;"
 
135
                        " so replication "
 
136
                        "may break when this MySQL server acts as a "
 
137
                        "slave and has his hostname changed!! Please "
 
138
                        "use '--relay-log=%s' to avoid this problem."), ln);
 
139
      name_warning_sent= 1;
 
140
    }
 
141
    /*
 
142
      note, that if open() fails, we'll still have index file open
 
143
      but a destructor will take care of that
 
144
    */
 
145
    if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln) ||
 
146
        rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
 
147
                            (max_relay_log_size ? max_relay_log_size :
 
148
                            max_binlog_size), 1))
 
149
    {
 
150
      pthread_mutex_unlock(&rli->data_lock);
 
151
      sql_print_error(_("Failed in open_log() called from "
 
152
                        "init_relay_log_info()"));
 
153
      return(1);
 
154
    }
 
155
  }
 
156
 
 
157
  /* if file does not exist */
 
158
  if (access(fname,F_OK))
 
159
  {
 
160
    /*
 
161
      If someone removed the file from underneath our feet, just close
 
162
      the old descriptor and re-create the old file
 
163
    */
 
164
    if (info_fd >= 0)
 
165
      my_close(info_fd, MYF(MY_WME));
 
166
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
 
167
    {
 
168
      sql_print_error(_("Failed to create a new relay log info file "
 
169
                        "( file '%s', errno %d)"), fname, my_errno);
 
170
      msg= current_thd->main_da.message();
 
171
      goto err;
 
172
    }
 
173
    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
 
174
                      MYF(MY_WME)))
 
175
    {
 
176
      sql_print_error(_("Failed to create a cache on relay log info file '%s'"),
 
177
                      fname);
 
178
      msg= current_thd->main_da.message();
 
179
      goto err;
 
180
    }
 
181
 
 
182
    /* Init relay log with first entry in the relay index file */
 
183
    if (init_relay_log_pos(rli,NULL,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
 
184
                           &msg, 0))
 
185
    {
 
186
      sql_print_error(_("Failed to open the relay log 'FIRST' (relay_log_pos 4)"));
 
187
      goto err;
 
188
    }
 
189
    rli->group_master_log_name[0]= 0;
 
190
    rli->group_master_log_pos= 0;
 
191
    rli->info_fd= info_fd;
 
192
  }
 
193
  else // file exists
 
194
  {
 
195
    if (info_fd >= 0)
 
196
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
 
197
    else
 
198
    {
 
199
      int32_t error=0;
 
200
      if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
 
201
      {
 
202
        sql_print_error(_("Failed to open the existing relay log info "
 
203
                          "file '%s' (errno %d)"),
 
204
                        fname, my_errno);
 
205
        error= 1;
 
206
      }
 
207
      else if (init_io_cache(&rli->info_file, info_fd,
 
208
                             IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
 
209
      {
 
210
        sql_print_error(_("Failed to create a cache on relay log info "
 
211
                          "file '%s'"),
 
212
                        fname);
 
213
        error= 1;
 
214
      }
 
215
      if (error)
 
216
      {
 
217
        if (info_fd >= 0)
 
218
          my_close(info_fd, MYF(0));
 
219
        rli->info_fd= -1;
 
220
        rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
221
        pthread_mutex_unlock(&rli->data_lock);
 
222
        return(1);
 
223
      }
 
224
    }
 
225
 
 
226
    rli->info_fd = info_fd;
 
227
    int32_t relay_log_pos, master_log_pos;
 
228
    if (init_strvar_from_file(rli->group_relay_log_name,
 
229
                              sizeof(rli->group_relay_log_name),
 
230
                              &rli->info_file, "") ||
 
231
       init_intvar_from_file(&relay_log_pos,
 
232
                             &rli->info_file, BIN_LOG_HEADER_SIZE) ||
 
233
       init_strvar_from_file(rli->group_master_log_name,
 
234
                             sizeof(rli->group_master_log_name),
 
235
                             &rli->info_file, "") ||
 
236
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
 
237
    {
 
238
      msg="Error reading slave log configuration";
 
239
      goto err;
 
240
    }
 
241
    strmake(rli->event_relay_log_name,rli->group_relay_log_name,
 
242
            sizeof(rli->event_relay_log_name)-1);
 
243
    rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
 
244
    rli->group_master_log_pos= master_log_pos;
 
245
 
 
246
    if (init_relay_log_pos(rli,
 
247
                           rli->group_relay_log_name,
 
248
                           rli->group_relay_log_pos,
 
249
                           0 /* no data lock*/,
 
250
                           &msg, 0))
 
251
    {
 
252
      char llbuf[22];
 
253
      sql_print_error(_("Failed to open the relay log '%s' (relay_log_pos %s)"),
 
254
                      rli->group_relay_log_name,
 
255
                      llstr(rli->group_relay_log_pos, llbuf));
 
256
      goto err;
 
257
    }
 
258
  }
 
259
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
 
260
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
 
261
 
 
262
  /*
 
263
    Now change the cache from READ to WRITE - must do this
 
264
    before flush_relay_log_info
 
265
  */
 
266
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
 
267
  if ((error= flush_relay_log_info(rli)))
 
268
    sql_print_error(_("Failed to flush relay log info file"));
 
269
  if (count_relay_log_space(rli))
 
270
  {
 
271
    msg=_("Error counting relay log space");
 
272
    goto err;
 
273
  }
 
274
  rli->inited= 1;
 
275
  pthread_mutex_unlock(&rli->data_lock);
 
276
  return(error);
 
277
 
 
278
err:
 
279
  sql_print_error(msg);
 
280
  end_io_cache(&rli->info_file);
 
281
  if (info_fd >= 0)
 
282
    my_close(info_fd, MYF(0));
 
283
  rli->info_fd= -1;
 
284
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
285
  pthread_mutex_unlock(&rli->data_lock);
 
286
  return(1);
 
287
}
 
288
 
 
289
 
 
290
static inline int32_t add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
 
291
{
 
292
  struct stat s;
 
293
  if (stat(linfo->log_file_name,&s))
 
294
  {
 
295
    sql_print_error(_("log %s listed in the index, but failed to stat"),
 
296
                    linfo->log_file_name);
 
297
    return(1);
 
298
  }
 
299
  rli->log_space_total += s.st_size;
 
300
  return(0);
 
301
}
 
302
 
 
303
 
 
304
static int32_t count_relay_log_space(Relay_log_info* rli)
 
305
{
 
306
  LOG_INFO linfo;
 
307
  rli->log_space_total= 0;
 
308
  if (rli->relay_log.find_log_pos(&linfo, NULL, 1))
 
309
  {
 
310
    sql_print_error(_("Could not find first log while counting relay "
 
311
                      "log space"));
 
312
    return(1);
 
313
  }
 
314
  do
 
315
  {
 
316
    if (add_relay_log(rli,&linfo))
 
317
      return(1);
 
318
  } while (!rli->relay_log.find_next_log(&linfo, 1));
 
319
  /*
 
320
     As we have counted everything, including what may have written in a
 
321
     preceding write, we must reset bytes_written, or we may count some space
 
322
     twice.
 
323
  */
 
324
  rli->relay_log.reset_bytes_written();
 
325
  return(0);
 
326
}
 
327
 
 
328
 
 
329
/*
 
330
   Reset UNTIL condition for Relay_log_info
 
331
 
 
332
   SYNOPSYS
 
333
    clear_until_condition()
 
334
      rli - Relay_log_info structure where UNTIL condition should be reset
 
335
 */
 
336
 
 
337
void Relay_log_info::clear_until_condition()
 
338
{
 
339
  until_condition= Relay_log_info::UNTIL_NONE;
 
340
  until_log_name[0]= 0;
 
341
  until_log_pos= 0;
 
342
  return;
 
343
}
 
344
 
 
345
 
 
346
/*
 
347
  Open the given relay log
 
348
 
 
349
  SYNOPSIS
 
350
    init_relay_log_pos()
 
351
    rli                 Relay information (will be initialized)
 
352
    log                 Name of relay log file to read from. NULL = First log
 
353
    pos                 Position in relay log file
 
354
    need_data_lock      Set to 1 if this functions should do mutex locks
 
355
    errmsg              Store pointer to error message here
 
356
    look_for_description_event
 
357
                        1 if we should look for such an event. We only need
 
358
                        this when the SQL thread starts and opens an existing
 
359
                        relay log and has to execute it (possibly from an
 
360
                        offset >4); then we need to read the first event of
 
361
                        the relay log to be able to parse the events we have
 
362
                        to execute.
 
363
 
 
364
  DESCRIPTION
 
365
  - Close old open relay log files.
 
366
  - If we are using the same relay log as the running IO-thread, then set
 
367
    rli->cur_log to point to the same IO_CACHE entry.
 
368
  - If not, open the 'log' binary file.
 
369
 
 
370
  TODO
 
371
    - check proper initialization of group_master_log_name/group_master_log_pos
 
372
 
 
373
  RETURN VALUES
 
374
    0   ok
 
375
    1   error.  errmsg is set to point to the error message
 
376
*/
 
377
 
 
378
int32_t init_relay_log_pos(Relay_log_info* rli,const char* log,
 
379
                       uint64_t pos, bool need_data_lock,
 
380
                       const char** errmsg,
 
381
                       bool look_for_description_event)
 
382
{
 
383
  *errmsg=0;
 
384
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
 
385
 
 
386
  if (need_data_lock)
 
387
    pthread_mutex_lock(&rli->data_lock);
 
388
 
 
389
  /*
 
390
    Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
 
391
    is, too, and init_slave() too; these 2 functions allocate a description
 
392
    event in init_relay_log_pos, which is not freed by the terminating SQL slave
 
393
    thread as that thread is not started by these functions. So we have to free
 
394
    the description_event here, in case, so that there is no memory leak in
 
395
    running, say, CHANGE MASTER.
 
396
  */
 
397
  delete rli->relay_log.description_event_for_exec;
 
398
  /*
 
399
    By default the relay log is in binlog format 3 (4.0).
 
400
    Even if format is 4, this will work enough to read the first event
 
401
    (Format_desc) (remember that format 4 is just lenghtened compared to format
 
402
    3; format 3 is a prefix of format 4).
 
403
  */
 
404
  rli->relay_log.description_event_for_exec= new
 
405
    Format_description_log_event(3);
 
406
 
 
407
  pthread_mutex_lock(log_lock);
 
408
 
 
409
  /* Close log file and free buffers if it's already open */
 
410
  if (rli->cur_log_fd >= 0)
 
411
  {
 
412
    end_io_cache(&rli->cache_buf);
 
413
    my_close(rli->cur_log_fd, MYF(MY_WME));
 
414
    rli->cur_log_fd = -1;
 
415
  }
 
416
 
 
417
  rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
 
418
 
 
419
  /*
 
420
    Test to see if the previous run was with the skip of purging
 
421
    If yes, we do not purge when we restart
 
422
  */
 
423
  if (rli->relay_log.find_log_pos(&rli->linfo, NULL, 1))
 
424
  {
 
425
    *errmsg="Could not find first log during relay log initialization";
 
426
    goto err;
 
427
  }
 
428
 
 
429
  if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
 
430
  {
 
431
    *errmsg="Could not find target log during relay log initialization";
 
432
    goto err;
 
433
  }
 
434
  strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
 
435
          sizeof(rli->group_relay_log_name)-1);
 
436
  strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
 
437
          sizeof(rli->event_relay_log_name)-1);
 
438
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
 
439
  {
 
440
    /*
 
441
      The IO thread is using this log file.
 
442
      In this case, we will use the same IO_CACHE pointer to
 
443
      read data as the IO thread is using to write data.
 
444
    */
 
445
    my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
 
446
    if (check_binlog_magic(rli->cur_log,errmsg))
 
447
      goto err;
 
448
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
 
449
  }
 
450
  else
 
451
  {
 
452
    /*
 
453
      Open the relay log and set rli->cur_log to point at this one
 
454
    */
 
455
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
 
456
                                     rli->linfo.log_file_name,errmsg)) < 0)
 
457
      goto err;
 
458
    rli->cur_log = &rli->cache_buf;
 
459
  }
 
460
  /*
 
461
    In all cases, check_binlog_magic() has been called so we're at offset 4 for
 
462
    sure.
 
463
  */
 
464
  if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
 
465
  {
 
466
    Log_event* ev;
 
467
    while (look_for_description_event)
 
468
    {
 
469
      /*
 
470
        Read the possible Format_description_log_event; if position
 
471
        was 4, no need, it will be read naturally.
 
472
      */
 
473
      if (my_b_tell(rli->cur_log) >= pos)
 
474
        break;
 
475
 
 
476
      /*
 
477
        Because of we have rli->data_lock and log_lock, we can safely read an
 
478
        event
 
479
      */
 
480
      if (!(ev=Log_event::read_log_event(rli->cur_log,0,
 
481
                                         rli->relay_log.description_event_for_exec)))
 
482
      {
 
483
        if (rli->cur_log->error) /* not EOF */
 
484
        {
 
485
          *errmsg= "I/O error reading event at position 4";
 
486
          goto err;
 
487
        }
 
488
        break;
 
489
      }
 
490
      else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
 
491
      {
 
492
        delete rli->relay_log.description_event_for_exec;
 
493
        rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
 
494
        /*
 
495
          As ev was returned by read_log_event, it has passed is_valid(), so
 
496
          my_malloc() in ctor worked, no need to check again.
 
497
        */
 
498
        /*
 
499
          Ok, we found a Format_description event. But it is not sure that this
 
500
          describes the whole relay log; indeed, one can have this sequence
 
501
          (starting from position 4):
 
502
          Format_desc (of slave)
 
503
          Rotate (of master)
 
504
          Format_desc (of master)
 
505
          So the Format_desc which really describes the rest of the relay log
 
506
          is the 3rd event (it can't be further than that, because we rotate
 
507
          the relay log when we queue a Rotate event from the master).
 
508
          But what describes the Rotate is the first Format_desc.
 
509
          So what we do is:
 
510
          go on searching for Format_description events, until you exceed the
 
511
          position (argument 'pos') or until you find another event than Rotate
 
512
          or Format_desc.
 
513
        */
 
514
      }
 
515
      else
 
516
      {
 
517
        look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
 
518
        delete ev;
 
519
      }
 
520
    }
 
521
    my_b_seek(rli->cur_log,(off_t)pos);
 
522
 
 
523
  }
 
524
 
 
525
err:
 
526
  /*
 
527
    If we don't purge, we can't honour relay_log_space_limit ;
 
528
    silently discard it
 
529
  */
 
530
  if (!relay_log_purge)
 
531
    rli->log_space_limit= 0;
 
532
  pthread_cond_broadcast(&rli->data_cond);
 
533
 
 
534
  pthread_mutex_unlock(log_lock);
 
535
 
 
536
  if (need_data_lock)
 
537
    pthread_mutex_unlock(&rli->data_lock);
 
538
  if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
 
539
    *errmsg= "Invalid Format_description log event; could be out of memory";
 
540
 
 
541
  return ((*errmsg) ? 1 : 0);
 
542
}
 
543
 
 
544
 
 
545
/*
 
546
  Waits until the SQL thread reaches (has executed up to) the
 
547
  log/position or timed out.
 
548
 
 
549
  SYNOPSIS
 
550
    wait_for_pos()
 
551
    thd             client thread that sent SELECT MASTER_POS_WAIT
 
552
    log_name        log name to wait for
 
553
    log_pos         position to wait for
 
554
    timeout         timeout in seconds before giving up waiting
 
555
 
 
556
  NOTES
 
557
    timeout is int64_t whereas it should be uint32_t ; but this is
 
558
    to catch if the user submitted a negative timeout.
 
559
 
 
560
  RETURN VALUES
 
561
    -2          improper arguments (log_pos<0)
 
562
                or slave not running, or master info changed
 
563
                during the function's execution,
 
564
                or client thread killed. -2 is translated to NULL by caller
 
565
    -1          timed out
 
566
    >=0         number of log events the function had to wait
 
567
                before reaching the desired log/position
 
568
 */
 
569
 
 
570
int32_t Relay_log_info::wait_for_pos(THD* thd, String* log_name,
 
571
                                    int64_t log_pos,
 
572
                                    int64_t timeout)
 
573
{
 
574
  int32_t event_count = 0;
 
575
  uint32_t init_abort_pos_wait;
 
576
  int32_t error=0;
 
577
  struct timespec abstime; // for timeout checking
 
578
  const char *msg;
 
579
 
 
580
  if (!inited)
 
581
    return(-2);
 
582
 
 
583
  set_timespec(abstime,timeout);
 
584
  pthread_mutex_lock(&data_lock);
 
585
  msg= thd->enter_cond(&data_cond, &data_lock,
 
586
                       "Waiting for the slave SQL thread to "
 
587
                       "advance position");
 
588
  /*
 
589
     This function will abort when it notices that some CHANGE MASTER or
 
590
     RESET MASTER has changed the master info.
 
591
     To catch this, these commands modify abort_pos_wait ; We just monitor
 
592
     abort_pos_wait and see if it has changed.
 
593
     Why do we have this mechanism instead of simply monitoring slave_running
 
594
     in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
 
595
     the SQL thread be stopped?
 
596
     This is becasue if someones does:
 
597
     STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
 
598
     the change may happen very quickly and we may not notice that
 
599
     slave_running briefly switches between 1/0/1.
 
600
  */
 
601
  init_abort_pos_wait= abort_pos_wait;
 
602
 
 
603
  /*
 
604
    We'll need to
 
605
    handle all possible log names comparisons (e.g. 999 vs 1000).
 
606
    We use uint32_t for string->number conversion ; this is no
 
607
    stronger limitation than in find_uniq_filename in sql/log.cc
 
608
  */
 
609
  uint32_t log_name_extension;
 
610
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
 
611
 
 
612
  strmake(log_name_tmp, log_name->ptr(), cmin(log_name->length(), (uint32_t)FN_REFLEN-1));
 
613
 
 
614
  char *p= fn_ext(log_name_tmp);
 
615
  char *p_end;
 
616
  if (!*p || log_pos<0)
 
617
  {
 
618
    error= -2; //means improper arguments
 
619
    goto err;
 
620
  }
 
621
  // Convert 0-3 to 4
 
622
  log_pos= cmax(log_pos, (int64_t)BIN_LOG_HEADER_SIZE);
 
623
  /* p points to '.' */
 
624
  log_name_extension= strtoul(++p, &p_end, 10);
 
625
  /*
 
626
    p_end points to the first invalid character.
 
627
    If it equals to p, no digits were found, error.
 
628
    If it contains '\0' it means conversion went ok.
 
629
  */
 
630
  if (p_end==p || *p_end)
 
631
  {
 
632
    error= -2;
 
633
    goto err;
 
634
  }
 
635
 
 
636
  /* The "compare and wait" main loop */
 
637
  while (!thd->killed &&
 
638
         init_abort_pos_wait == abort_pos_wait &&
 
639
         slave_running)
 
640
  {
 
641
    bool pos_reached;
 
642
    int32_t cmp_result= 0;
 
643
 
 
644
    /*
 
645
      group_master_log_name can be "", if we are just after a fresh
 
646
      replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
 
647
      (before we have executed one Rotate event from the master) or
 
648
      (rare) if the user is doing a weird slave setup (see next
 
649
      paragraph).  If group_master_log_name is "", we assume we don't
 
650
      have enough info to do the comparison yet, so we just wait until
 
651
      more data. In this case master_log_pos is always 0 except if
 
652
      somebody (wrongly) sets this slave to be a slave of itself
 
653
      without using --replicate-same-server-id (an unsupported
 
654
      configuration which does nothing), then group_master_log_pos
 
655
      will grow and group_master_log_name will stay "".
 
656
    */
 
657
    if (*group_master_log_name)
 
658
    {
 
659
      char *basename= (group_master_log_name +
 
660
                       dirname_length(group_master_log_name));
 
661
      /*
 
662
        First compare the parts before the extension.
 
663
        Find the dot in the master's log basename,
 
664
        and protect against user's input error :
 
665
        if the names do not match up to '.' included, return error
 
666
      */
 
667
      char *q= (char*)(fn_ext(basename)+1);
 
668
      if (strncmp(basename, log_name_tmp, (int32_t)(q-basename)))
 
669
      {
 
670
        error= -2;
 
671
        break;
 
672
      }
 
673
      // Now compare extensions.
 
674
      char *q_end;
 
675
      uint32_t group_master_log_name_extension= strtoul(q, &q_end, 10);
 
676
      if (group_master_log_name_extension < log_name_extension)
 
677
        cmp_result= -1 ;
 
678
      else
 
679
        cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
 
680
 
 
681
      pos_reached= ((!cmp_result && group_master_log_pos >= (uint64_t)log_pos) ||
 
682
                    cmp_result > 0);
 
683
      if (pos_reached || thd->killed)
 
684
        break;
 
685
    }
 
686
 
 
687
    //wait for master update, with optional timeout.
 
688
 
 
689
    /*
 
690
      We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
 
691
      will wake us up.
 
692
    */
 
693
    if (timeout > 0)
 
694
    {
 
695
      /*
 
696
        Note that pthread_cond_timedwait checks for the timeout
 
697
        before for the condition ; i.e. it returns ETIMEDOUT
 
698
        if the system time equals or exceeds the time specified by abstime
 
699
        before the condition variable is signaled or broadcast, _or_ if
 
700
        the absolute time specified by abstime has already passed at the time
 
701
        of the call.
 
702
        For that reason, pthread_cond_timedwait will do the "timeoutting" job
 
703
        even if its condition is always immediately signaled (case of a loaded
 
704
        master).
 
705
      */
 
706
      error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
 
707
    }
 
708
    else
 
709
      pthread_cond_wait(&data_cond, &data_lock);
 
710
    if (error == ETIMEDOUT || error == ETIME)
 
711
    {
 
712
      error= -1;
 
713
      break;
 
714
    }
 
715
    error=0;
 
716
    event_count++;
 
717
  }
 
718
 
 
719
err:
 
720
  thd->exit_cond(msg);
 
721
  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
 
722
      !slave_running)
 
723
  {
 
724
    error= -2;
 
725
  }
 
726
  return( error ? error : event_count );
 
727
}
 
728
 
 
729
 
 
730
void Relay_log_info::inc_group_relay_log_pos(uint64_t log_pos,
 
731
                                                bool skip_lock)
 
732
{
 
733
  if (!skip_lock)
 
734
    pthread_mutex_lock(&data_lock);
 
735
  inc_event_relay_log_pos();
 
736
  group_relay_log_pos= event_relay_log_pos;
 
737
  strmake(group_relay_log_name,event_relay_log_name,
 
738
          sizeof(group_relay_log_name)-1);
 
739
 
 
740
  notify_group_relay_log_name_update();
 
741
 
 
742
  /*
 
743
    If the slave does not support transactions and replicates a transaction,
 
744
    users should not trust group_master_log_pos (which they can display with
 
745
    SHOW SLAVE STATUS or read from relay-log.info), because to compute
 
746
    group_master_log_pos the slave relies on log_pos stored in the master's
 
747
    binlog, but if we are in a master's transaction these positions are always
 
748
    the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
 
749
    not advance as it should on the non-transactional slave (it advances by
 
750
    big leaps, whereas it should advance by small leaps).
 
751
  */
 
752
  /*
 
753
    In 4.x we used the event's len to compute the positions here. This is
 
754
    wrong if the event was 3.23/4.0 and has been converted to 5.0, because
 
755
    then the event's len is not what is was in the master's binlog, so this
 
756
    will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
 
757
    replication: Exec_master_log_pos is wrong). Only way to solve this is to
 
758
    have the original offset of the end of the event the relay log. This is
 
759
    what we do in 5.0: log_pos has become "end_log_pos" (because the real use
 
760
    of log_pos in 4.0 was to compute the end_log_pos; so better to store
 
761
    end_log_pos instead of begin_log_pos.
 
762
    If we had not done this fix here, the problem would also have appeared
 
763
    when the slave and master are 5.0 but with different event length (for
 
764
    example the slave is more recent than the master and features the event
 
765
    UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
 
766
    SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
 
767
    value which would lead to badly broken replication.
 
768
    Even the relay_log_pos will be corrupted in this case, because the len is
 
769
    the relay log is not "val".
 
770
    With the end_log_pos solution, we avoid computations involving lengthes.
 
771
  */
 
772
  if (log_pos) // 3.23 binlogs don't have log_posx
 
773
  {
 
774
    group_master_log_pos= log_pos;
 
775
  }
 
776
  pthread_cond_broadcast(&data_cond);
 
777
  if (!skip_lock)
 
778
    pthread_mutex_unlock(&data_lock);
 
779
  return;
 
780
}
 
781
 
 
782
 
 
783
void Relay_log_info::close_temporary_tables()
 
784
{
 
785
  Table *table,*next;
 
786
 
 
787
  for (table=save_temporary_tables ; table ; table=next)
 
788
  {
 
789
    next=table->next;
 
790
    /*
 
791
      Don't ask for disk deletion. For now, anyway they will be deleted when
 
792
      slave restarts, but it is a better intention to not delete them.
 
793
    */
 
794
    close_temporary(table, 1, 0);
 
795
  }
 
796
  save_temporary_tables= 0;
 
797
  slave_open_temp_tables= 0;
 
798
  return;
 
799
}
 
800
 
 
801
/*
 
802
  purge_relay_logs()
 
803
 
 
804
  NOTES
 
805
    Assumes to have a run lock on rli and that no slave thread are running.
 
806
*/
 
807
 
 
808
int32_t purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
 
809
                     const char** errmsg)
 
810
{
 
811
  int32_t error=0;
 
812
 
 
813
  /*
 
814
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
 
815
    Indeed, rli->inited==0 does not imply that they already are empty.
 
816
    It could be that slave's info initialization partly succeeded :
 
817
    for example if relay-log.info existed but *relay-bin*.*
 
818
    have been manually removed, init_relay_log_info reads the old
 
819
    relay-log.info and fills rli->master_log_*, then init_relay_log_info
 
820
    checks for the existence of the relay log, this fails and
 
821
    init_relay_log_info leaves rli->inited to 0.
 
822
    In that pathological case, rli->master_log_pos* will be properly reinited
 
823
    at the next START SLAVE (as RESET SLAVE or CHANGE
 
824
    MASTER, the callers of purge_relay_logs, will delete bogus *.info files
 
825
    or replace them with correct files), however if the user does SHOW SLAVE
 
826
    STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
 
827
    In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
 
828
    to display fine in any case.
 
829
  */
 
830
 
 
831
  rli->group_master_log_name[0]= 0;
 
832
  rli->group_master_log_pos= 0;
 
833
 
 
834
  if (!rli->inited)
 
835
  {
 
836
    return(0);
 
837
  }
 
838
 
 
839
  assert(rli->slave_running == 0);
 
840
  assert(rli->mi->slave_running == 0);
 
841
 
 
842
  rli->slave_skip_counter=0;
 
843
  pthread_mutex_lock(&rli->data_lock);
 
844
 
 
845
  /*
 
846
    we close the relay log fd possibly left open by the slave SQL thread,
 
847
    to be able to delete it; the relay log fd possibly left open by the slave
 
848
    I/O thread will be closed naturally in reset_logs() by the
 
849
    close(LOG_CLOSE_TO_BE_OPENED) call
 
850
  */
 
851
  if (rli->cur_log_fd >= 0)
 
852
  {
 
853
    end_io_cache(&rli->cache_buf);
 
854
    my_close(rli->cur_log_fd, MYF(MY_WME));
 
855
    rli->cur_log_fd= -1;
 
856
  }
 
857
 
 
858
  if (rli->relay_log.reset_logs(thd))
 
859
  {
 
860
    *errmsg = "Failed during log reset";
 
861
    error=1;
 
862
    goto err;
 
863
  }
 
864
  /* Save name of used relay log file */
 
865
  strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
 
866
          sizeof(rli->group_relay_log_name)-1);
 
867
  strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
 
868
          sizeof(rli->event_relay_log_name)-1);
 
869
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
 
870
  if (count_relay_log_space(rli))
 
871
  {
 
872
    *errmsg= "Error counting relay log space";
 
873
    goto err;
 
874
  }
 
875
  if (!just_reset)
 
876
    error= init_relay_log_pos(rli, rli->group_relay_log_name,
 
877
                              rli->group_relay_log_pos,
 
878
                              0 /* do not need data lock */, errmsg, 0);
 
879
 
 
880
err:
 
881
  pthread_mutex_unlock(&rli->data_lock);
 
882
  return(error);
 
883
}
 
884
 
 
885
 
 
886
/*
 
887
     Check if condition stated in UNTIL clause of START SLAVE is reached.
 
888
   SYNOPSYS
 
889
     Relay_log_info::is_until_satisfied()
 
890
     master_beg_pos    position of the beginning of to be executed event
 
891
                       (not log_pos member of the event that points to the
 
892
                        beginning of the following event)
 
893
 
 
894
 
 
895
   DESCRIPTION
 
896
     Checks if UNTIL condition is reached. Uses caching result of last
 
897
     comparison of current log file name and target log file name. So cached
 
898
     value should be invalidated if current log file name changes
 
899
     (see Relay_log_info::notify_... functions).
 
900
 
 
901
     This caching is needed to avoid of expensive string comparisons and
 
902
     strtol() conversions needed for log names comparison. We don't need to
 
903
     compare them each time this function is called, we only need to do this
 
904
     when current log name changes. If we have UNTIL_MASTER_POS condition we
 
905
     need to do this only after Rotate_log_event::do_apply_event() (which is
 
906
     rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
 
907
     condition then we should invalidate cached comarison value after
 
908
     inc_group_relay_log_pos() which called for each group of events (so we
 
909
     have some benefit if we have something like queries that use
 
910
     autoincrement or if we have transactions).
 
911
 
 
912
     Should be called ONLY if until_condition != UNTIL_NONE !
 
913
   RETURN VALUE
 
914
     true - condition met or error happened (condition seems to have
 
915
            bad log file name)
 
916
     false - condition not met
 
917
*/
 
918
 
 
919
bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
 
920
{
 
921
  const char *log_name;
 
922
  uint64_t log_pos;
 
923
 
 
924
  assert(until_condition != UNTIL_NONE);
 
925
 
 
926
  if (until_condition == UNTIL_MASTER_POS)
 
927
  {
 
928
    log_name= group_master_log_name;
 
929
    log_pos= master_beg_pos;
 
930
  }
 
931
  else
 
932
  { /* until_condition == UNTIL_RELAY_POS */
 
933
    log_name= group_relay_log_name;
 
934
    log_pos= group_relay_log_pos;
 
935
  }
 
936
 
 
937
  if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
 
938
  {
 
939
    /*
 
940
      We have no cached comparison results so we should compare log names
 
941
      and cache result.
 
942
      If we are after RESET SLAVE, and the SQL slave thread has not processed
 
943
      any event yet, it could be that group_master_log_name is "". In that case,
 
944
      just wait for more events (as there is no sensible comparison to do).
 
945
    */
 
946
 
 
947
    if (*log_name)
 
948
    {
 
949
      const char *basename= log_name + dirname_length(log_name);
 
950
 
 
951
      const char *q= (const char*)(fn_ext(basename)+1);
 
952
      if (strncmp(basename, until_log_name, (int32_t)(q-basename)) == 0)
 
953
      {
 
954
        /* Now compare extensions. */
 
955
        char *q_end;
 
956
        uint32_t log_name_extension= strtoul(q, &q_end, 10);
 
957
        if (log_name_extension < until_log_name_extension)
 
958
          until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
 
959
        else
 
960
          until_log_names_cmp_result=
 
961
            (log_name_extension > until_log_name_extension) ?
 
962
            UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
 
963
      }
 
964
      else
 
965
      {
 
966
        /* Probably error so we aborting */
 
967
        sql_print_error(_("Slave SQL thread is stopped because UNTIL "
 
968
                          "condition is bad."));
 
969
        return(true);
 
970
      }
 
971
    }
 
972
    else
 
973
      return(until_log_pos == 0);
 
974
  }
 
975
 
 
976
  return(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
 
977
           log_pos >= until_log_pos) ||
 
978
          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
 
979
}
 
980
 
 
981
 
 
982
void Relay_log_info::cached_charset_invalidate()
 
983
{
 
984
  /* Full of zeroes means uninitialized. */
 
985
  memset(cached_charset, 0, sizeof(cached_charset));
 
986
  return;
 
987
}
 
988
 
 
989
 
 
990
bool Relay_log_info::cached_charset_compare(char *charset) const
 
991
{
 
992
  if (memcmp(cached_charset, charset, sizeof(cached_charset)))
 
993
  {
 
994
    memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
 
995
    return(1);
 
996
  }
 
997
  return(0);
 
998
}
 
999
 
 
1000
 
 
1001
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
 
1002
                                  time_t event_creation_time)
 
1003
{
 
1004
  extern uint32_t debug_not_change_ts_if_art_event;
 
1005
  clear_flag(IN_STMT);
 
1006
 
 
1007
  /*
 
1008
    If in a transaction, and if the slave supports transactions, just
 
1009
    inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
 
1010
    (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
 
1011
    BEGIN/COMMIT, not with SET AUTOCOMMIT= .
 
1012
 
 
1013
    CAUTION: opt_using_transactions means innodb || bdb ; suppose the
 
1014
    master supports InnoDB and BDB, but the slave supports only BDB,
 
1015
    problems will arise: - suppose an InnoDB table is created on the
 
1016
    master, - then it will be MyISAM on the slave - but as
 
1017
    opt_using_transactions is true, the slave will believe he is
 
1018
    transactional with the MyISAM table. And problems will come when
 
1019
    one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
 
1020
    resume at BEGIN whereas there has not been any rollback).  This is
 
1021
    the problem of using opt_using_transactions instead of a finer
 
1022
    "does the slave support _transactional handler used on the
 
1023
    master_".
 
1024
 
 
1025
    More generally, we'll have problems when a query mixes a
 
1026
    transactional handler and MyISAM and STOP SLAVE is issued in the
 
1027
    middle of the "transaction". START SLAVE will resume at BEGIN
 
1028
    while the MyISAM table has already been updated.
 
1029
  */
 
1030
  if ((sql_thd->options & OPTION_BEGIN) && opt_using_transactions)
 
1031
    inc_event_relay_log_pos();
 
1032
  else
 
1033
  {
 
1034
    inc_group_relay_log_pos(event_master_log_pos);
 
1035
    flush_relay_log_info(this);
 
1036
    /*
 
1037
      Note that Rotate_log_event::do_apply_event() does not call this
 
1038
      function, so there is no chance that a fake rotate event resets
 
1039
      last_master_timestamp.  Note that we update without mutex
 
1040
      (probably ok - except in some very rare cases, only consequence
 
1041
      is that value may take some time to display in
 
1042
      Seconds_Behind_Master - not critical).
 
1043
    */
 
1044
    if (!(event_creation_time == 0 && debug_not_change_ts_if_art_event > 0))
 
1045
      last_master_timestamp= event_creation_time;
 
1046
  }
 
1047
}
 
1048
 
 
1049
void Relay_log_info::cleanup_context(THD *thd, bool error)
 
1050
{
 
1051
  assert(sql_thd == thd);
 
1052
  /*
 
1053
    1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
 
1054
    may have opened tables, which we cannot be sure have been closed (because
 
1055
    maybe the Rows_log_event have not been found or will not be, because slave
 
1056
    SQL thread is stopping, or relay log has a missing tail etc). So we close
 
1057
    all thread's tables. And so the table mappings have to be cancelled.
 
1058
    2) Rows_log_event::do_apply_event() may even have started statements or
 
1059
    transactions on them, which we need to rollback in case of error.
 
1060
    3) If finding a Format_description_log_event after a BEGIN, we also need
 
1061
    to rollback before continuing with the next events.
 
1062
    4) so we need this "context cleanup" function.
 
1063
  */
 
1064
  if (error)
 
1065
  {
 
1066
    ha_autocommit_or_rollback(thd, 1); // if a "statement transaction"
 
1067
    end_trans(thd, ROLLBACK); // if a "real transaction"
 
1068
  }
 
1069
  m_table_map.clear_tables();
 
1070
  close_thread_tables(thd);
 
1071
  clear_tables_to_lock();
 
1072
  clear_flag(IN_STMT);
 
1073
  /*
 
1074
    Cleanup for the flags that have been set at do_apply_event.
 
1075
  */
 
1076
  thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
 
1077
  thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
 
1078
  last_event_start_time= 0;
 
1079
  return;
 
1080
}
 
1081
 
 
1082
void Relay_log_info::clear_tables_to_lock()
 
1083
{
 
1084
  while (tables_to_lock)
 
1085
  {
 
1086
    unsigned char* to_free= reinterpret_cast<unsigned char*>(tables_to_lock);
 
1087
    if (tables_to_lock->m_tabledef_valid)
 
1088
    {
 
1089
      tables_to_lock->m_tabledef.table_def::~table_def();
 
1090
      tables_to_lock->m_tabledef_valid= false;
 
1091
    }
 
1092
    tables_to_lock=
 
1093
      static_cast<RPL_TableList*>(tables_to_lock->next_global);
 
1094
    tables_to_lock_count--;
 
1095
    free(to_free);
 
1096
  }
 
1097
  assert(tables_to_lock == NULL && tables_to_lock_count == 0);
 
1098
}