~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to server/rpl_rli.cc

  • Committer: Jay Pipes
  • Date: 2008-07-17 17:54:00 UTC
  • mto: This revision was merged to the branch mainline in revision 182.
  • Revision ID: jay@mysql.com-20080717175400-xm2aazihjra8mdzq
Removal of DBUG from libdrizzle/ - Round 2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include "mysql_priv.h"
 
17
 
 
18
#include "rpl_mi.h"
 
19
#include "rpl_rli.h"
 
20
#include <my_dir.h>
 
21
#include "sql_repl.h"  // For check_binlog_magic
 
22
#include "rpl_utility.h"
 
23
 
 
24
static int32_t count_relay_log_space(Relay_log_info* rli);
 
25
 
 
26
// Defined in slave.cc
 
27
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val);
 
28
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
 
29
                          const char *default_val);
 
30
 
 
31
 
 
32
Relay_log_info::Relay_log_info()
 
33
  :Slave_reporting_capability("SQL"),
 
34
   no_storage(false), replicate_same_server_id(::replicate_same_server_id),
 
35
   info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
 
36
#if defined(HAVE_purify) && HAVE_purify
 
37
   is_fake(false),
 
38
#endif
 
39
   cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
 
40
   group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
 
41
   last_master_timestamp(0), slave_skip_counter(0),
 
42
   abort_pos_wait(0), slave_run_id(0), sql_thd(0),
 
43
   inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
 
44
   until_log_pos(0), retried_trans(0),
 
45
   tables_to_lock(0), tables_to_lock_count(0),
 
46
   last_event_start_time(0), m_flags(0)
 
47
{
 
48
  group_relay_log_name[0]= event_relay_log_name[0]=
 
49
    group_master_log_name[0]= 0;
 
50
  until_log_name[0]= ign_master_log_name_end[0]= 0;
 
51
  bzero((char*) &info_file, sizeof(info_file));
 
52
  bzero((char*) &cache_buf, sizeof(cache_buf));
 
53
  cached_charset_invalidate();
 
54
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
 
55
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
 
56
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
 
57
  pthread_cond_init(&data_cond, NULL);
 
58
  pthread_cond_init(&start_cond, NULL);
 
59
  pthread_cond_init(&stop_cond, NULL);
 
60
  pthread_cond_init(&log_space_cond, NULL);
 
61
  relay_log.init_pthread_objects();
 
62
  return;
 
63
}
 
64
 
 
65
 
 
66
Relay_log_info::~Relay_log_info()
 
67
{
 
68
  pthread_mutex_destroy(&run_lock);
 
69
  pthread_mutex_destroy(&data_lock);
 
70
  pthread_mutex_destroy(&log_space_lock);
 
71
  pthread_cond_destroy(&data_cond);
 
72
  pthread_cond_destroy(&start_cond);
 
73
  pthread_cond_destroy(&stop_cond);
 
74
  pthread_cond_destroy(&log_space_cond);
 
75
  relay_log.cleanup();
 
76
  return;
 
77
}
 
78
 
 
79
 
 
80
int32_t init_relay_log_info(Relay_log_info* rli,
 
81
                        const char* info_fname)
 
82
{
 
83
  char fname[FN_REFLEN+128];
 
84
  int32_t info_fd;
 
85
  const char* msg = 0;
 
86
  int32_t error = 0;
 
87
  assert(!rli->no_storage);         // Don't init if there is no storage
 
88
 
 
89
  if (rli->inited)                       // Set if this function called
 
90
    return(0);
 
91
  fn_format(fname, info_fname, mysql_data_home, "", 4+32);
 
92
  pthread_mutex_lock(&rli->data_lock);
 
93
  info_fd = rli->info_fd;
 
94
  rli->cur_log_fd = -1;
 
95
  rli->slave_skip_counter=0;
 
96
  rli->abort_pos_wait=0;
 
97
  rli->log_space_limit= relay_log_space_limit;
 
98
  rli->log_space_total= 0;
 
99
  rli->tables_to_lock= 0;
 
100
  rli->tables_to_lock_count= 0;
 
101
 
 
102
  /*
 
103
    The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
 
104
    Note that the I/O thread flushes it to disk after writing every
 
105
    event, in flush_master_info(mi, 1).
 
106
  */
 
107
 
 
108
  /*
 
109
    For the maximum log size, we choose max_relay_log_size if it is
 
110
    non-zero, max_binlog_size otherwise. If later the user does SET
 
111
    GLOBAL on one of these variables, fix_max_binlog_size and
 
112
    fix_max_relay_log_size will reconsider the choice (for example
 
113
    if the user changes max_relay_log_size to zero, we have to
 
114
    switch to using max_binlog_size for the relay log) and update
 
115
    rli->relay_log.max_size (and mysql_bin_log.max_size).
 
116
  */
 
117
  {
 
118
    char buf[FN_REFLEN];
 
119
    const char *ln;
 
120
    static bool name_warning_sent= 0;
 
121
    ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
 
122
                                     1, buf);
 
123
    /* We send the warning only at startup, not after every RESET SLAVE */
 
124
    if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
 
125
    {
 
126
      /*
 
127
        User didn't give us info to name the relay log index file.
 
128
        Picking `hostname`-relay-bin.index like we do, causes replication to
 
129
        fail if this slave's hostname is changed later. So, we would like to
 
130
        instead require a name. But as we don't want to break many existing
 
131
        setups, we only give warning, not error.
 
132
      */
 
133
      sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
 
134
                        " so replication "
 
135
                        "may break when this MySQL server acts as a "
 
136
                        "slave and has his hostname changed!! Please "
 
137
                        "use '--relay-log=%s' to avoid this problem.", ln);
 
138
      name_warning_sent= 1;
 
139
    }
 
140
    /*
 
141
      note, that if open() fails, we'll still have index file open
 
142
      but a destructor will take care of that
 
143
    */
 
144
    if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln) ||
 
145
        rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
 
146
                            (max_relay_log_size ? max_relay_log_size :
 
147
                            max_binlog_size), 1))
 
148
    {
 
149
      pthread_mutex_unlock(&rli->data_lock);
 
150
      sql_print_error("Failed in open_log() called from init_relay_log_info()");
 
151
      return(1);
 
152
    }
 
153
  }
 
154
 
 
155
  /* if file does not exist */
 
156
  if (access(fname,F_OK))
 
157
  {
 
158
    /*
 
159
      If someone removed the file from underneath our feet, just close
 
160
      the old descriptor and re-create the old file
 
161
    */
 
162
    if (info_fd >= 0)
 
163
      my_close(info_fd, MYF(MY_WME));
 
164
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
 
165
    {
 
166
      sql_print_error("Failed to create a new relay log info file (\
 
167
file '%s', errno %d)", fname, my_errno);
 
168
      msg= current_thd->main_da.message();
 
169
      goto err;
 
170
    }
 
171
    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
 
172
                      MYF(MY_WME)))
 
173
    {
 
174
      sql_print_error("Failed to create a cache on relay log info file '%s'",
 
175
                      fname);
 
176
      msg= current_thd->main_da.message();
 
177
      goto err;
 
178
    }
 
179
 
 
180
    /* Init relay log with first entry in the relay index file */
 
181
    if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
 
182
                           &msg, 0))
 
183
    {
 
184
      sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
 
185
      goto err;
 
186
    }
 
187
    rli->group_master_log_name[0]= 0;
 
188
    rli->group_master_log_pos= 0;
 
189
    rli->info_fd= info_fd;
 
190
  }
 
191
  else // file exists
 
192
  {
 
193
    if (info_fd >= 0)
 
194
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
 
195
    else
 
196
    {
 
197
      int32_t error=0;
 
198
      if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
 
199
      {
 
200
        sql_print_error("\
 
201
Failed to open the existing relay log info file '%s' (errno %d)",
 
202
                        fname, my_errno);
 
203
        error= 1;
 
204
      }
 
205
      else if (init_io_cache(&rli->info_file, info_fd,
 
206
                             IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
 
207
      {
 
208
        sql_print_error("Failed to create a cache on relay log info file '%s'",
 
209
                        fname);
 
210
        error= 1;
 
211
      }
 
212
      if (error)
 
213
      {
 
214
        if (info_fd >= 0)
 
215
          my_close(info_fd, MYF(0));
 
216
        rli->info_fd= -1;
 
217
        rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
218
        pthread_mutex_unlock(&rli->data_lock);
 
219
        return(1);
 
220
      }
 
221
    }
 
222
 
 
223
    rli->info_fd = info_fd;
 
224
    int32_t relay_log_pos, master_log_pos;
 
225
    if (init_strvar_from_file(rli->group_relay_log_name,
 
226
                              sizeof(rli->group_relay_log_name),
 
227
                              &rli->info_file, "") ||
 
228
       init_intvar_from_file(&relay_log_pos,
 
229
                             &rli->info_file, BIN_LOG_HEADER_SIZE) ||
 
230
       init_strvar_from_file(rli->group_master_log_name,
 
231
                             sizeof(rli->group_master_log_name),
 
232
                             &rli->info_file, "") ||
 
233
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
 
234
    {
 
235
      msg="Error reading slave log configuration";
 
236
      goto err;
 
237
    }
 
238
    strmake(rli->event_relay_log_name,rli->group_relay_log_name,
 
239
            sizeof(rli->event_relay_log_name)-1);
 
240
    rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
 
241
    rli->group_master_log_pos= master_log_pos;
 
242
 
 
243
    if (init_relay_log_pos(rli,
 
244
                           rli->group_relay_log_name,
 
245
                           rli->group_relay_log_pos,
 
246
                           0 /* no data lock*/,
 
247
                           &msg, 0))
 
248
    {
 
249
      char llbuf[22];
 
250
      sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
 
251
                      rli->group_relay_log_name,
 
252
                      llstr(rli->group_relay_log_pos, llbuf));
 
253
      goto err;
 
254
    }
 
255
  }
 
256
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
 
257
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
 
258
 
 
259
  /*
 
260
    Now change the cache from READ to WRITE - must do this
 
261
    before flush_relay_log_info
 
262
  */
 
263
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
 
264
  if ((error= flush_relay_log_info(rli)))
 
265
    sql_print_error("Failed to flush relay log info file");
 
266
  if (count_relay_log_space(rli))
 
267
  {
 
268
    msg="Error counting relay log space";
 
269
    goto err;
 
270
  }
 
271
  rli->inited= 1;
 
272
  pthread_mutex_unlock(&rli->data_lock);
 
273
  return(error);
 
274
 
 
275
err:
 
276
  sql_print_error(msg);
 
277
  end_io_cache(&rli->info_file);
 
278
  if (info_fd >= 0)
 
279
    my_close(info_fd, MYF(0));
 
280
  rli->info_fd= -1;
 
281
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
282
  pthread_mutex_unlock(&rli->data_lock);
 
283
  return(1);
 
284
}
 
285
 
 
286
 
 
287
static inline int32_t add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
 
288
{
 
289
  struct stat s;
 
290
  if (stat(linfo->log_file_name,&s))
 
291
  {
 
292
    sql_print_error("log %s listed in the index, but failed to stat",
 
293
                    linfo->log_file_name);
 
294
    return(1);
 
295
  }
 
296
  rli->log_space_total += s.st_size;
 
297
  return(0);
 
298
}
 
299
 
 
300
 
 
301
static int32_t count_relay_log_space(Relay_log_info* rli)
 
302
{
 
303
  LOG_INFO linfo;
 
304
  rli->log_space_total= 0;
 
305
  if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
 
306
  {
 
307
    sql_print_error("Could not find first log while counting relay log space");
 
308
    return(1);
 
309
  }
 
310
  do
 
311
  {
 
312
    if (add_relay_log(rli,&linfo))
 
313
      return(1);
 
314
  } while (!rli->relay_log.find_next_log(&linfo, 1));
 
315
  /*
 
316
     As we have counted everything, including what may have written in a
 
317
     preceding write, we must reset bytes_written, or we may count some space
 
318
     twice.
 
319
  */
 
320
  rli->relay_log.reset_bytes_written();
 
321
  return(0);
 
322
}
 
323
 
 
324
 
 
325
/*
 
326
   Reset UNTIL condition for Relay_log_info
 
327
 
 
328
   SYNOPSYS
 
329
    clear_until_condition()
 
330
      rli - Relay_log_info structure where UNTIL condition should be reset
 
331
 */
 
332
 
 
333
void Relay_log_info::clear_until_condition()
 
334
{
 
335
  until_condition= Relay_log_info::UNTIL_NONE;
 
336
  until_log_name[0]= 0;
 
337
  until_log_pos= 0;
 
338
  return;
 
339
}
 
340
 
 
341
 
 
342
/*
 
343
  Open the given relay log
 
344
 
 
345
  SYNOPSIS
 
346
    init_relay_log_pos()
 
347
    rli                 Relay information (will be initialized)
 
348
    log                 Name of relay log file to read from. NULL = First log
 
349
    pos                 Position in relay log file
 
350
    need_data_lock      Set to 1 if this functions should do mutex locks
 
351
    errmsg              Store pointer to error message here
 
352
    look_for_description_event
 
353
                        1 if we should look for such an event. We only need
 
354
                        this when the SQL thread starts and opens an existing
 
355
                        relay log and has to execute it (possibly from an
 
356
                        offset >4); then we need to read the first event of
 
357
                        the relay log to be able to parse the events we have
 
358
                        to execute.
 
359
 
 
360
  DESCRIPTION
 
361
  - Close old open relay log files.
 
362
  - If we are using the same relay log as the running IO-thread, then set
 
363
    rli->cur_log to point to the same IO_CACHE entry.
 
364
  - If not, open the 'log' binary file.
 
365
 
 
366
  TODO
 
367
    - check proper initialization of group_master_log_name/group_master_log_pos
 
368
 
 
369
  RETURN VALUES
 
370
    0   ok
 
371
    1   error.  errmsg is set to point to the error message
 
372
*/
 
373
 
 
374
int32_t init_relay_log_pos(Relay_log_info* rli,const char* log,
 
375
                       uint64_t pos, bool need_data_lock,
 
376
                       const char** errmsg,
 
377
                       bool look_for_description_event)
 
378
{
 
379
  *errmsg=0;
 
380
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
 
381
 
 
382
  if (need_data_lock)
 
383
    pthread_mutex_lock(&rli->data_lock);
 
384
 
 
385
  /*
 
386
    Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
 
387
    is, too, and init_slave() too; these 2 functions allocate a description
 
388
    event in init_relay_log_pos, which is not freed by the terminating SQL slave
 
389
    thread as that thread is not started by these functions. So we have to free
 
390
    the description_event here, in case, so that there is no memory leak in
 
391
    running, say, CHANGE MASTER.
 
392
  */
 
393
  delete rli->relay_log.description_event_for_exec;
 
394
  /*
 
395
    By default the relay log is in binlog format 3 (4.0).
 
396
    Even if format is 4, this will work enough to read the first event
 
397
    (Format_desc) (remember that format 4 is just lenghtened compared to format
 
398
    3; format 3 is a prefix of format 4).
 
399
  */
 
400
  rli->relay_log.description_event_for_exec= new
 
401
    Format_description_log_event(3);
 
402
 
 
403
  pthread_mutex_lock(log_lock);
 
404
 
 
405
  /* Close log file and free buffers if it's already open */
 
406
  if (rli->cur_log_fd >= 0)
 
407
  {
 
408
    end_io_cache(&rli->cache_buf);
 
409
    my_close(rli->cur_log_fd, MYF(MY_WME));
 
410
    rli->cur_log_fd = -1;
 
411
  }
 
412
 
 
413
  rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
 
414
 
 
415
  /*
 
416
    Test to see if the previous run was with the skip of purging
 
417
    If yes, we do not purge when we restart
 
418
  */
 
419
  if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
 
420
  {
 
421
    *errmsg="Could not find first log during relay log initialization";
 
422
    goto err;
 
423
  }
 
424
 
 
425
  if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
 
426
  {
 
427
    *errmsg="Could not find target log during relay log initialization";
 
428
    goto err;
 
429
  }
 
430
  strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
 
431
          sizeof(rli->group_relay_log_name)-1);
 
432
  strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
 
433
          sizeof(rli->event_relay_log_name)-1);
 
434
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
 
435
  {
 
436
    /*
 
437
      The IO thread is using this log file.
 
438
      In this case, we will use the same IO_CACHE pointer to
 
439
      read data as the IO thread is using to write data.
 
440
    */
 
441
    my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
 
442
    if (check_binlog_magic(rli->cur_log,errmsg))
 
443
      goto err;
 
444
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
 
445
  }
 
446
  else
 
447
  {
 
448
    /*
 
449
      Open the relay log and set rli->cur_log to point at this one
 
450
    */
 
451
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
 
452
                                     rli->linfo.log_file_name,errmsg)) < 0)
 
453
      goto err;
 
454
    rli->cur_log = &rli->cache_buf;
 
455
  }
 
456
  /*
 
457
    In all cases, check_binlog_magic() has been called so we're at offset 4 for
 
458
    sure.
 
459
  */
 
460
  if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
 
461
  {
 
462
    Log_event* ev;
 
463
    while (look_for_description_event)
 
464
    {
 
465
      /*
 
466
        Read the possible Format_description_log_event; if position
 
467
        was 4, no need, it will be read naturally.
 
468
      */
 
469
      if (my_b_tell(rli->cur_log) >= pos)
 
470
        break;
 
471
 
 
472
      /*
 
473
        Because of we have rli->data_lock and log_lock, we can safely read an
 
474
        event
 
475
      */
 
476
      if (!(ev=Log_event::read_log_event(rli->cur_log,0,
 
477
                                         rli->relay_log.description_event_for_exec)))
 
478
      {
 
479
        if (rli->cur_log->error) /* not EOF */
 
480
        {
 
481
          *errmsg= "I/O error reading event at position 4";
 
482
          goto err;
 
483
        }
 
484
        break;
 
485
      }
 
486
      else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
 
487
      {
 
488
        delete rli->relay_log.description_event_for_exec;
 
489
        rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
 
490
        /*
 
491
          As ev was returned by read_log_event, it has passed is_valid(), so
 
492
          my_malloc() in ctor worked, no need to check again.
 
493
        */
 
494
        /*
 
495
          Ok, we found a Format_description event. But it is not sure that this
 
496
          describes the whole relay log; indeed, one can have this sequence
 
497
          (starting from position 4):
 
498
          Format_desc (of slave)
 
499
          Rotate (of master)
 
500
          Format_desc (of master)
 
501
          So the Format_desc which really describes the rest of the relay log
 
502
          is the 3rd event (it can't be further than that, because we rotate
 
503
          the relay log when we queue a Rotate event from the master).
 
504
          But what describes the Rotate is the first Format_desc.
 
505
          So what we do is:
 
506
          go on searching for Format_description events, until you exceed the
 
507
          position (argument 'pos') or until you find another event than Rotate
 
508
          or Format_desc.
 
509
        */
 
510
      }
 
511
      else
 
512
      {
 
513
        look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
 
514
        delete ev;
 
515
      }
 
516
    }
 
517
    my_b_seek(rli->cur_log,(off_t)pos);
 
518
 
 
519
  }
 
520
 
 
521
err:
 
522
  /*
 
523
    If we don't purge, we can't honour relay_log_space_limit ;
 
524
    silently discard it
 
525
  */
 
526
  if (!relay_log_purge)
 
527
    rli->log_space_limit= 0;
 
528
  pthread_cond_broadcast(&rli->data_cond);
 
529
 
 
530
  pthread_mutex_unlock(log_lock);
 
531
 
 
532
  if (need_data_lock)
 
533
    pthread_mutex_unlock(&rli->data_lock);
 
534
  if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
 
535
    *errmsg= "Invalid Format_description log event; could be out of memory";
 
536
 
 
537
  return ((*errmsg) ? 1 : 0);
 
538
}
 
539
 
 
540
 
 
541
/*
 
542
  Waits until the SQL thread reaches (has executed up to) the
 
543
  log/position or timed out.
 
544
 
 
545
  SYNOPSIS
 
546
    wait_for_pos()
 
547
    thd             client thread that sent SELECT MASTER_POS_WAIT
 
548
    log_name        log name to wait for
 
549
    log_pos         position to wait for
 
550
    timeout         timeout in seconds before giving up waiting
 
551
 
 
552
  NOTES
 
553
    timeout is int64_t whereas it should be uint32_t ; but this is
 
554
    to catch if the user submitted a negative timeout.
 
555
 
 
556
  RETURN VALUES
 
557
    -2          improper arguments (log_pos<0)
 
558
                or slave not running, or master info changed
 
559
                during the function's execution,
 
560
                or client thread killed. -2 is translated to NULL by caller
 
561
    -1          timed out
 
562
    >=0         number of log events the function had to wait
 
563
                before reaching the desired log/position
 
564
 */
 
565
 
 
566
int32_t Relay_log_info::wait_for_pos(THD* thd, String* log_name,
 
567
                                    int64_t log_pos,
 
568
                                    int64_t timeout)
 
569
{
 
570
  int32_t event_count = 0;
 
571
  uint32_t init_abort_pos_wait;
 
572
  int32_t error=0;
 
573
  struct timespec abstime; // for timeout checking
 
574
  const char *msg;
 
575
 
 
576
  if (!inited)
 
577
    return(-2);
 
578
 
 
579
  set_timespec(abstime,timeout);
 
580
  pthread_mutex_lock(&data_lock);
 
581
  msg= thd->enter_cond(&data_cond, &data_lock,
 
582
                       "Waiting for the slave SQL thread to "
 
583
                       "advance position");
 
584
  /*
 
585
     This function will abort when it notices that some CHANGE MASTER or
 
586
     RESET MASTER has changed the master info.
 
587
     To catch this, these commands modify abort_pos_wait ; We just monitor
 
588
     abort_pos_wait and see if it has changed.
 
589
     Why do we have this mechanism instead of simply monitoring slave_running
 
590
     in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
 
591
     the SQL thread be stopped?
 
592
     This is becasue if someones does:
 
593
     STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
 
594
     the change may happen very quickly and we may not notice that
 
595
     slave_running briefly switches between 1/0/1.
 
596
  */
 
597
  init_abort_pos_wait= abort_pos_wait;
 
598
 
 
599
  /*
 
600
    We'll need to
 
601
    handle all possible log names comparisons (e.g. 999 vs 1000).
 
602
    We use uint32_t for string->number conversion ; this is no
 
603
    stronger limitation than in find_uniq_filename in sql/log.cc
 
604
  */
 
605
  uint32_t log_name_extension;
 
606
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
 
607
 
 
608
  strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), FN_REFLEN-1));
 
609
 
 
610
  char *p= fn_ext(log_name_tmp);
 
611
  char *p_end;
 
612
  if (!*p || log_pos<0)
 
613
  {
 
614
    error= -2; //means improper arguments
 
615
    goto err;
 
616
  }
 
617
  // Convert 0-3 to 4
 
618
  log_pos= max(log_pos, BIN_LOG_HEADER_SIZE);
 
619
  /* p points to '.' */
 
620
  log_name_extension= strtoul(++p, &p_end, 10);
 
621
  /*
 
622
    p_end points to the first invalid character.
 
623
    If it equals to p, no digits were found, error.
 
624
    If it contains '\0' it means conversion went ok.
 
625
  */
 
626
  if (p_end==p || *p_end)
 
627
  {
 
628
    error= -2;
 
629
    goto err;
 
630
  }
 
631
 
 
632
  /* The "compare and wait" main loop */
 
633
  while (!thd->killed &&
 
634
         init_abort_pos_wait == abort_pos_wait &&
 
635
         slave_running)
 
636
  {
 
637
    bool pos_reached;
 
638
    int32_t cmp_result= 0;
 
639
 
 
640
    /*
 
641
      group_master_log_name can be "", if we are just after a fresh
 
642
      replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
 
643
      (before we have executed one Rotate event from the master) or
 
644
      (rare) if the user is doing a weird slave setup (see next
 
645
      paragraph).  If group_master_log_name is "", we assume we don't
 
646
      have enough info to do the comparison yet, so we just wait until
 
647
      more data. In this case master_log_pos is always 0 except if
 
648
      somebody (wrongly) sets this slave to be a slave of itself
 
649
      without using --replicate-same-server-id (an unsupported
 
650
      configuration which does nothing), then group_master_log_pos
 
651
      will grow and group_master_log_name will stay "".
 
652
    */
 
653
    if (*group_master_log_name)
 
654
    {
 
655
      char *basename= (group_master_log_name +
 
656
                       dirname_length(group_master_log_name));
 
657
      /*
 
658
        First compare the parts before the extension.
 
659
        Find the dot in the master's log basename,
 
660
        and protect against user's input error :
 
661
        if the names do not match up to '.' included, return error
 
662
      */
 
663
      char *q= (char*)(fn_ext(basename)+1);
 
664
      if (strncmp(basename, log_name_tmp, (int32_t)(q-basename)))
 
665
      {
 
666
        error= -2;
 
667
        break;
 
668
      }
 
669
      // Now compare extensions.
 
670
      char *q_end;
 
671
      uint32_t group_master_log_name_extension= strtoul(q, &q_end, 10);
 
672
      if (group_master_log_name_extension < log_name_extension)
 
673
        cmp_result= -1 ;
 
674
      else
 
675
        cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
 
676
 
 
677
      pos_reached= ((!cmp_result && group_master_log_pos >= (uint64_t)log_pos) ||
 
678
                    cmp_result > 0);
 
679
      if (pos_reached || thd->killed)
 
680
        break;
 
681
    }
 
682
 
 
683
    //wait for master update, with optional timeout.
 
684
 
 
685
    /*
 
686
      We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
 
687
      will wake us up.
 
688
    */
 
689
    if (timeout > 0)
 
690
    {
 
691
      /*
 
692
        Note that pthread_cond_timedwait checks for the timeout
 
693
        before for the condition ; i.e. it returns ETIMEDOUT
 
694
        if the system time equals or exceeds the time specified by abstime
 
695
        before the condition variable is signaled or broadcast, _or_ if
 
696
        the absolute time specified by abstime has already passed at the time
 
697
        of the call.
 
698
        For that reason, pthread_cond_timedwait will do the "timeoutting" job
 
699
        even if its condition is always immediately signaled (case of a loaded
 
700
        master).
 
701
      */
 
702
      error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
 
703
    }
 
704
    else
 
705
      pthread_cond_wait(&data_cond, &data_lock);
 
706
    if (error == ETIMEDOUT || error == ETIME)
 
707
    {
 
708
      error= -1;
 
709
      break;
 
710
    }
 
711
    error=0;
 
712
    event_count++;
 
713
  }
 
714
 
 
715
err:
 
716
  thd->exit_cond(msg);
 
717
  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
 
718
      !slave_running)
 
719
  {
 
720
    error= -2;
 
721
  }
 
722
  return( error ? error : event_count );
 
723
}
 
724
 
 
725
 
 
726
void Relay_log_info::inc_group_relay_log_pos(uint64_t log_pos,
 
727
                                                bool skip_lock)
 
728
{
 
729
  if (!skip_lock)
 
730
    pthread_mutex_lock(&data_lock);
 
731
  inc_event_relay_log_pos();
 
732
  group_relay_log_pos= event_relay_log_pos;
 
733
  strmake(group_relay_log_name,event_relay_log_name,
 
734
          sizeof(group_relay_log_name)-1);
 
735
 
 
736
  notify_group_relay_log_name_update();
 
737
 
 
738
  /*
 
739
    If the slave does not support transactions and replicates a transaction,
 
740
    users should not trust group_master_log_pos (which they can display with
 
741
    SHOW SLAVE STATUS or read from relay-log.info), because to compute
 
742
    group_master_log_pos the slave relies on log_pos stored in the master's
 
743
    binlog, but if we are in a master's transaction these positions are always
 
744
    the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
 
745
    not advance as it should on the non-transactional slave (it advances by
 
746
    big leaps, whereas it should advance by small leaps).
 
747
  */
 
748
  /*
 
749
    In 4.x we used the event's len to compute the positions here. This is
 
750
    wrong if the event was 3.23/4.0 and has been converted to 5.0, because
 
751
    then the event's len is not what is was in the master's binlog, so this
 
752
    will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
 
753
    replication: Exec_master_log_pos is wrong). Only way to solve this is to
 
754
    have the original offset of the end of the event the relay log. This is
 
755
    what we do in 5.0: log_pos has become "end_log_pos" (because the real use
 
756
    of log_pos in 4.0 was to compute the end_log_pos; so better to store
 
757
    end_log_pos instead of begin_log_pos.
 
758
    If we had not done this fix here, the problem would also have appeared
 
759
    when the slave and master are 5.0 but with different event length (for
 
760
    example the slave is more recent than the master and features the event
 
761
    UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
 
762
    SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
 
763
    value which would lead to badly broken replication.
 
764
    Even the relay_log_pos will be corrupted in this case, because the len is
 
765
    the relay log is not "val".
 
766
    With the end_log_pos solution, we avoid computations involving lengthes.
 
767
  */
 
768
  if (log_pos) // 3.23 binlogs don't have log_posx
 
769
  {
 
770
    group_master_log_pos= log_pos;
 
771
  }
 
772
  pthread_cond_broadcast(&data_cond);
 
773
  if (!skip_lock)
 
774
    pthread_mutex_unlock(&data_lock);
 
775
  return;
 
776
}
 
777
 
 
778
 
 
779
void Relay_log_info::close_temporary_tables()
 
780
{
 
781
  TABLE *table,*next;
 
782
 
 
783
  for (table=save_temporary_tables ; table ; table=next)
 
784
  {
 
785
    next=table->next;
 
786
    /*
 
787
      Don't ask for disk deletion. For now, anyway they will be deleted when
 
788
      slave restarts, but it is a better intention to not delete them.
 
789
    */
 
790
    close_temporary(table, 1, 0);
 
791
  }
 
792
  save_temporary_tables= 0;
 
793
  slave_open_temp_tables= 0;
 
794
  return;
 
795
}
 
796
 
 
797
/*
 
798
  purge_relay_logs()
 
799
 
 
800
  NOTES
 
801
    Assumes to have a run lock on rli and that no slave thread are running.
 
802
*/
 
803
 
 
804
int32_t purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
 
805
                     const char** errmsg)
 
806
{
 
807
  int32_t error=0;
 
808
 
 
809
  /*
 
810
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
 
811
    Indeed, rli->inited==0 does not imply that they already are empty.
 
812
    It could be that slave's info initialization partly succeeded :
 
813
    for example if relay-log.info existed but *relay-bin*.*
 
814
    have been manually removed, init_relay_log_info reads the old
 
815
    relay-log.info and fills rli->master_log_*, then init_relay_log_info
 
816
    checks for the existence of the relay log, this fails and
 
817
    init_relay_log_info leaves rli->inited to 0.
 
818
    In that pathological case, rli->master_log_pos* will be properly reinited
 
819
    at the next START SLAVE (as RESET SLAVE or CHANGE
 
820
    MASTER, the callers of purge_relay_logs, will delete bogus *.info files
 
821
    or replace them with correct files), however if the user does SHOW SLAVE
 
822
    STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
 
823
    In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
 
824
    to display fine in any case.
 
825
  */
 
826
 
 
827
  rli->group_master_log_name[0]= 0;
 
828
  rli->group_master_log_pos= 0;
 
829
 
 
830
  if (!rli->inited)
 
831
  {
 
832
    return(0);
 
833
  }
 
834
 
 
835
  assert(rli->slave_running == 0);
 
836
  assert(rli->mi->slave_running == 0);
 
837
 
 
838
  rli->slave_skip_counter=0;
 
839
  pthread_mutex_lock(&rli->data_lock);
 
840
 
 
841
  /*
 
842
    we close the relay log fd possibly left open by the slave SQL thread,
 
843
    to be able to delete it; the relay log fd possibly left open by the slave
 
844
    I/O thread will be closed naturally in reset_logs() by the
 
845
    close(LOG_CLOSE_TO_BE_OPENED) call
 
846
  */
 
847
  if (rli->cur_log_fd >= 0)
 
848
  {
 
849
    end_io_cache(&rli->cache_buf);
 
850
    my_close(rli->cur_log_fd, MYF(MY_WME));
 
851
    rli->cur_log_fd= -1;
 
852
  }
 
853
 
 
854
  if (rli->relay_log.reset_logs(thd))
 
855
  {
 
856
    *errmsg = "Failed during log reset";
 
857
    error=1;
 
858
    goto err;
 
859
  }
 
860
  /* Save name of used relay log file */
 
861
  strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
 
862
          sizeof(rli->group_relay_log_name)-1);
 
863
  strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
 
864
          sizeof(rli->event_relay_log_name)-1);
 
865
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
 
866
  if (count_relay_log_space(rli))
 
867
  {
 
868
    *errmsg= "Error counting relay log space";
 
869
    goto err;
 
870
  }
 
871
  if (!just_reset)
 
872
    error= init_relay_log_pos(rli, rli->group_relay_log_name,
 
873
                              rli->group_relay_log_pos,
 
874
                              0 /* do not need data lock */, errmsg, 0);
 
875
 
 
876
err:
 
877
  pthread_mutex_unlock(&rli->data_lock);
 
878
  return(error);
 
879
}
 
880
 
 
881
 
 
882
/*
 
883
     Check if condition stated in UNTIL clause of START SLAVE is reached.
 
884
   SYNOPSYS
 
885
     Relay_log_info::is_until_satisfied()
 
886
     master_beg_pos    position of the beginning of to be executed event
 
887
                       (not log_pos member of the event that points to the
 
888
                        beginning of the following event)
 
889
 
 
890
 
 
891
   DESCRIPTION
 
892
     Checks if UNTIL condition is reached. Uses caching result of last
 
893
     comparison of current log file name and target log file name. So cached
 
894
     value should be invalidated if current log file name changes
 
895
     (see Relay_log_info::notify_... functions).
 
896
 
 
897
     This caching is needed to avoid of expensive string comparisons and
 
898
     strtol() conversions needed for log names comparison. We don't need to
 
899
     compare them each time this function is called, we only need to do this
 
900
     when current log name changes. If we have UNTIL_MASTER_POS condition we
 
901
     need to do this only after Rotate_log_event::do_apply_event() (which is
 
902
     rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
 
903
     condition then we should invalidate cached comarison value after
 
904
     inc_group_relay_log_pos() which called for each group of events (so we
 
905
     have some benefit if we have something like queries that use
 
906
     autoincrement or if we have transactions).
 
907
 
 
908
     Should be called ONLY if until_condition != UNTIL_NONE !
 
909
   RETURN VALUE
 
910
     true - condition met or error happened (condition seems to have
 
911
            bad log file name)
 
912
     false - condition not met
 
913
*/
 
914
 
 
915
bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
 
916
{
 
917
  const char *log_name;
 
918
  uint64_t log_pos;
 
919
 
 
920
  assert(until_condition != UNTIL_NONE);
 
921
 
 
922
  if (until_condition == UNTIL_MASTER_POS)
 
923
  {
 
924
    log_name= group_master_log_name;
 
925
    log_pos= master_beg_pos;
 
926
  }
 
927
  else
 
928
  { /* until_condition == UNTIL_RELAY_POS */
 
929
    log_name= group_relay_log_name;
 
930
    log_pos= group_relay_log_pos;
 
931
  }
 
932
 
 
933
  if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
 
934
  {
 
935
    /*
 
936
      We have no cached comparison results so we should compare log names
 
937
      and cache result.
 
938
      If we are after RESET SLAVE, and the SQL slave thread has not processed
 
939
      any event yet, it could be that group_master_log_name is "". In that case,
 
940
      just wait for more events (as there is no sensible comparison to do).
 
941
    */
 
942
 
 
943
    if (*log_name)
 
944
    {
 
945
      const char *basename= log_name + dirname_length(log_name);
 
946
 
 
947
      const char *q= (const char*)(fn_ext(basename)+1);
 
948
      if (strncmp(basename, until_log_name, (int32_t)(q-basename)) == 0)
 
949
      {
 
950
        /* Now compare extensions. */
 
951
        char *q_end;
 
952
        uint32_t log_name_extension= strtoul(q, &q_end, 10);
 
953
        if (log_name_extension < until_log_name_extension)
 
954
          until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
 
955
        else
 
956
          until_log_names_cmp_result=
 
957
            (log_name_extension > until_log_name_extension) ?
 
958
            UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
 
959
      }
 
960
      else
 
961
      {
 
962
        /* Probably error so we aborting */
 
963
        sql_print_error("Slave SQL thread is stopped because UNTIL "
 
964
                        "condition is bad.");
 
965
        return(true);
 
966
      }
 
967
    }
 
968
    else
 
969
      return(until_log_pos == 0);
 
970
  }
 
971
 
 
972
  return(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
 
973
           log_pos >= until_log_pos) ||
 
974
          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
 
975
}
 
976
 
 
977
 
 
978
void Relay_log_info::cached_charset_invalidate()
 
979
{
 
980
  /* Full of zeroes means uninitialized. */
 
981
  bzero(cached_charset, sizeof(cached_charset));
 
982
  return;
 
983
}
 
984
 
 
985
 
 
986
bool Relay_log_info::cached_charset_compare(char *charset) const
 
987
{
 
988
  if (bcmp((uchar*) cached_charset, (uchar*) charset,
 
989
           sizeof(cached_charset)))
 
990
  {
 
991
    memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
 
992
    return(1);
 
993
  }
 
994
  return(0);
 
995
}
 
996
 
 
997
 
 
998
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
 
999
                                  time_t event_creation_time)
 
1000
{
 
1001
  extern uint32_t debug_not_change_ts_if_art_event;
 
1002
  clear_flag(IN_STMT);
 
1003
 
 
1004
  /*
 
1005
    If in a transaction, and if the slave supports transactions, just
 
1006
    inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
 
1007
    (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
 
1008
    BEGIN/COMMIT, not with SET AUTOCOMMIT= .
 
1009
 
 
1010
    CAUTION: opt_using_transactions means innodb || bdb ; suppose the
 
1011
    master supports InnoDB and BDB, but the slave supports only BDB,
 
1012
    problems will arise: - suppose an InnoDB table is created on the
 
1013
    master, - then it will be MyISAM on the slave - but as
 
1014
    opt_using_transactions is true, the slave will believe he is
 
1015
    transactional with the MyISAM table. And problems will come when
 
1016
    one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
 
1017
    resume at BEGIN whereas there has not been any rollback).  This is
 
1018
    the problem of using opt_using_transactions instead of a finer
 
1019
    "does the slave support _transactional handler used on the
 
1020
    master_".
 
1021
 
 
1022
    More generally, we'll have problems when a query mixes a
 
1023
    transactional handler and MyISAM and STOP SLAVE is issued in the
 
1024
    middle of the "transaction". START SLAVE will resume at BEGIN
 
1025
    while the MyISAM table has already been updated.
 
1026
  */
 
1027
  if ((sql_thd->options & OPTION_BEGIN) && opt_using_transactions)
 
1028
    inc_event_relay_log_pos();
 
1029
  else
 
1030
  {
 
1031
    inc_group_relay_log_pos(event_master_log_pos);
 
1032
    flush_relay_log_info(this);
 
1033
    /*
 
1034
      Note that Rotate_log_event::do_apply_event() does not call this
 
1035
      function, so there is no chance that a fake rotate event resets
 
1036
      last_master_timestamp.  Note that we update without mutex
 
1037
      (probably ok - except in some very rare cases, only consequence
 
1038
      is that value may take some time to display in
 
1039
      Seconds_Behind_Master - not critical).
 
1040
    */
 
1041
    if (!(event_creation_time == 0 && debug_not_change_ts_if_art_event > 0))
 
1042
      last_master_timestamp= event_creation_time;
 
1043
  }
 
1044
}
 
1045
 
 
1046
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 
1047
void Relay_log_info::cleanup_context(THD *thd, bool error)
 
1048
{
 
1049
  assert(sql_thd == thd);
 
1050
  /*
 
1051
    1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
 
1052
    may have opened tables, which we cannot be sure have been closed (because
 
1053
    maybe the Rows_log_event have not been found or will not be, because slave
 
1054
    SQL thread is stopping, or relay log has a missing tail etc). So we close
 
1055
    all thread's tables. And so the table mappings have to be cancelled.
 
1056
    2) Rows_log_event::do_apply_event() may even have started statements or
 
1057
    transactions on them, which we need to rollback in case of error.
 
1058
    3) If finding a Format_description_log_event after a BEGIN, we also need
 
1059
    to rollback before continuing with the next events.
 
1060
    4) so we need this "context cleanup" function.
 
1061
  */
 
1062
  if (error)
 
1063
  {
 
1064
    ha_autocommit_or_rollback(thd, 1); // if a "statement transaction"
 
1065
    end_trans(thd, ROLLBACK); // if a "real transaction"
 
1066
  }
 
1067
  m_table_map.clear_tables();
 
1068
  close_thread_tables(thd);
 
1069
  clear_tables_to_lock();
 
1070
  clear_flag(IN_STMT);
 
1071
  /*
 
1072
    Cleanup for the flags that have been set at do_apply_event.
 
1073
  */
 
1074
  thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
 
1075
  thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
 
1076
  last_event_start_time= 0;
 
1077
  return;
 
1078
}
 
1079
 
 
1080
void Relay_log_info::clear_tables_to_lock()
 
1081
{
 
1082
  while (tables_to_lock)
 
1083
  {
 
1084
    uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
 
1085
    if (tables_to_lock->m_tabledef_valid)
 
1086
    {
 
1087
      tables_to_lock->m_tabledef.table_def::~table_def();
 
1088
      tables_to_lock->m_tabledef_valid= false;
 
1089
    }
 
1090
    tables_to_lock=
 
1091
      static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
 
1092
    tables_to_lock_count--;
 
1093
    my_free(to_free, MYF(MY_WME));
 
1094
  }
 
1095
  assert(tables_to_lock == NULL && tables_to_lock_count == 0);
 
1096
}
 
1097
 
 
1098
#endif