~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to server/rpl_rli.cc

Removed/replaced DBUG symbols and TRUE/FALSE

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include "mysql_priv.h"
 
17
 
 
18
#include "rpl_mi.h"
 
19
#include "rpl_rli.h"
 
20
#include <my_dir.h>
 
21
#include "sql_repl.h"  // For check_binlog_magic
 
22
#include "rpl_utility.h"
 
23
 
 
24
static int count_relay_log_space(Relay_log_info* rli);
 
25
 
 
26
// Defined in slave.cc
 
27
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
 
28
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
 
29
                          const char *default_val);
 
30
 
 
31
 
 
32
Relay_log_info::Relay_log_info()
 
33
  :Slave_reporting_capability("SQL"),
 
34
   no_storage(false), replicate_same_server_id(::replicate_same_server_id),
 
35
   info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
 
36
#if defined(HAVE_purify) && HAVE_purify
 
37
   is_fake(false),
 
38
#endif
 
39
   cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
 
40
   group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
 
41
   last_master_timestamp(0), slave_skip_counter(0),
 
42
   abort_pos_wait(0), slave_run_id(0), sql_thd(0),
 
43
   inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
 
44
   until_log_pos(0), retried_trans(0),
 
45
   tables_to_lock(0), tables_to_lock_count(0),
 
46
   last_event_start_time(0), m_flags(0)
 
47
{
 
48
  group_relay_log_name[0]= event_relay_log_name[0]=
 
49
    group_master_log_name[0]= 0;
 
50
  until_log_name[0]= ign_master_log_name_end[0]= 0;
 
51
  bzero((char*) &info_file, sizeof(info_file));
 
52
  bzero((char*) &cache_buf, sizeof(cache_buf));
 
53
  cached_charset_invalidate();
 
54
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
 
55
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
 
56
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
 
57
  pthread_cond_init(&data_cond, NULL);
 
58
  pthread_cond_init(&start_cond, NULL);
 
59
  pthread_cond_init(&stop_cond, NULL);
 
60
  pthread_cond_init(&log_space_cond, NULL);
 
61
  relay_log.init_pthread_objects();
 
62
  return;
 
63
}
 
64
 
 
65
 
 
66
Relay_log_info::~Relay_log_info()
 
67
{
 
68
  pthread_mutex_destroy(&run_lock);
 
69
  pthread_mutex_destroy(&data_lock);
 
70
  pthread_mutex_destroy(&log_space_lock);
 
71
  pthread_cond_destroy(&data_cond);
 
72
  pthread_cond_destroy(&start_cond);
 
73
  pthread_cond_destroy(&stop_cond);
 
74
  pthread_cond_destroy(&log_space_cond);
 
75
  relay_log.cleanup();
 
76
  return;
 
77
}
 
78
 
 
79
 
 
80
int init_relay_log_info(Relay_log_info* rli,
 
81
                        const char* info_fname)
 
82
{
 
83
  char fname[FN_REFLEN+128];
 
84
  int info_fd;
 
85
  const char* msg = 0;
 
86
  int error = 0;
 
87
  assert(!rli->no_storage);         // Don't init if there is no storage
 
88
 
 
89
  if (rli->inited)                       // Set if this function called
 
90
    return(0);
 
91
  fn_format(fname, info_fname, mysql_data_home, "", 4+32);
 
92
  pthread_mutex_lock(&rli->data_lock);
 
93
  info_fd = rli->info_fd;
 
94
  rli->cur_log_fd = -1;
 
95
  rli->slave_skip_counter=0;
 
96
  rli->abort_pos_wait=0;
 
97
  rli->log_space_limit= relay_log_space_limit;
 
98
  rli->log_space_total= 0;
 
99
  rli->tables_to_lock= 0;
 
100
  rli->tables_to_lock_count= 0;
 
101
 
 
102
  /*
 
103
    The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
 
104
    Note that the I/O thread flushes it to disk after writing every
 
105
    event, in flush_master_info(mi, 1).
 
106
  */
 
107
 
 
108
  /*
 
109
    For the maximum log size, we choose max_relay_log_size if it is
 
110
    non-zero, max_binlog_size otherwise. If later the user does SET
 
111
    GLOBAL on one of these variables, fix_max_binlog_size and
 
112
    fix_max_relay_log_size will reconsider the choice (for example
 
113
    if the user changes max_relay_log_size to zero, we have to
 
114
    switch to using max_binlog_size for the relay log) and update
 
115
    rli->relay_log.max_size (and mysql_bin_log.max_size).
 
116
  */
 
117
  {
 
118
    char buf[FN_REFLEN];
 
119
    const char *ln;
 
120
    static bool name_warning_sent= 0;
 
121
    ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
 
122
                                     1, buf);
 
123
    /* We send the warning only at startup, not after every RESET SLAVE */
 
124
    if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
 
125
    {
 
126
      /*
 
127
        User didn't give us info to name the relay log index file.
 
128
        Picking `hostname`-relay-bin.index like we do, causes replication to
 
129
        fail if this slave's hostname is changed later. So, we would like to
 
130
        instead require a name. But as we don't want to break many existing
 
131
        setups, we only give warning, not error.
 
132
      */
 
133
      sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
 
134
                        " so replication "
 
135
                        "may break when this MySQL server acts as a "
 
136
                        "slave and has his hostname changed!! Please "
 
137
                        "use '--relay-log=%s' to avoid this problem.", ln);
 
138
      name_warning_sent= 1;
 
139
    }
 
140
    /*
 
141
      note, that if open() fails, we'll still have index file open
 
142
      but a destructor will take care of that
 
143
    */
 
144
    if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln) ||
 
145
        rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
 
146
                            (max_relay_log_size ? max_relay_log_size :
 
147
                            max_binlog_size), 1))
 
148
    {
 
149
      pthread_mutex_unlock(&rli->data_lock);
 
150
      sql_print_error("Failed in open_log() called from init_relay_log_info()");
 
151
      return(1);
 
152
    }
 
153
  }
 
154
 
 
155
  /* if file does not exist */
 
156
  if (access(fname,F_OK))
 
157
  {
 
158
    /*
 
159
      If someone removed the file from underneath our feet, just close
 
160
      the old descriptor and re-create the old file
 
161
    */
 
162
    if (info_fd >= 0)
 
163
      my_close(info_fd, MYF(MY_WME));
 
164
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
 
165
    {
 
166
      sql_print_error("Failed to create a new relay log info file (\
 
167
file '%s', errno %d)", fname, my_errno);
 
168
      msg= current_thd->main_da.message();
 
169
      goto err;
 
170
    }
 
171
    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
 
172
                      MYF(MY_WME)))
 
173
    {
 
174
      sql_print_error("Failed to create a cache on relay log info file '%s'",
 
175
                      fname);
 
176
      msg= current_thd->main_da.message();
 
177
      goto err;
 
178
    }
 
179
 
 
180
    /* Init relay log with first entry in the relay index file */
 
181
    if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
 
182
                           &msg, 0))
 
183
    {
 
184
      sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
 
185
      goto err;
 
186
    }
 
187
    rli->group_master_log_name[0]= 0;
 
188
    rli->group_master_log_pos= 0;
 
189
    rli->info_fd= info_fd;
 
190
  }
 
191
  else // file exists
 
192
  {
 
193
    if (info_fd >= 0)
 
194
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
 
195
    else
 
196
    {
 
197
      int error=0;
 
198
      if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
 
199
      {
 
200
        sql_print_error("\
 
201
Failed to open the existing relay log info file '%s' (errno %d)",
 
202
                        fname, my_errno);
 
203
        error= 1;
 
204
      }
 
205
      else if (init_io_cache(&rli->info_file, info_fd,
 
206
                             IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
 
207
      {
 
208
        sql_print_error("Failed to create a cache on relay log info file '%s'",
 
209
                        fname);
 
210
        error= 1;
 
211
      }
 
212
      if (error)
 
213
      {
 
214
        if (info_fd >= 0)
 
215
          my_close(info_fd, MYF(0));
 
216
        rli->info_fd= -1;
 
217
        rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
218
        pthread_mutex_unlock(&rli->data_lock);
 
219
        return(1);
 
220
      }
 
221
    }
 
222
 
 
223
    rli->info_fd = info_fd;
 
224
    int relay_log_pos, master_log_pos;
 
225
    if (init_strvar_from_file(rli->group_relay_log_name,
 
226
                              sizeof(rli->group_relay_log_name),
 
227
                              &rli->info_file, "") ||
 
228
       init_intvar_from_file(&relay_log_pos,
 
229
                             &rli->info_file, BIN_LOG_HEADER_SIZE) ||
 
230
       init_strvar_from_file(rli->group_master_log_name,
 
231
                             sizeof(rli->group_master_log_name),
 
232
                             &rli->info_file, "") ||
 
233
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
 
234
    {
 
235
      msg="Error reading slave log configuration";
 
236
      goto err;
 
237
    }
 
238
    strmake(rli->event_relay_log_name,rli->group_relay_log_name,
 
239
            sizeof(rli->event_relay_log_name)-1);
 
240
    rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
 
241
    rli->group_master_log_pos= master_log_pos;
 
242
 
 
243
    if (init_relay_log_pos(rli,
 
244
                           rli->group_relay_log_name,
 
245
                           rli->group_relay_log_pos,
 
246
                           0 /* no data lock*/,
 
247
                           &msg, 0))
 
248
    {
 
249
      char llbuf[22];
 
250
      sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
 
251
                      rli->group_relay_log_name,
 
252
                      llstr(rli->group_relay_log_pos, llbuf));
 
253
      goto err;
 
254
    }
 
255
  }
 
256
  char llbuf1[22], llbuf2[22];
 
257
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
 
258
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
 
259
 
 
260
  /*
 
261
    Now change the cache from READ to WRITE - must do this
 
262
    before flush_relay_log_info
 
263
  */
 
264
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
 
265
  if ((error= flush_relay_log_info(rli)))
 
266
    sql_print_error("Failed to flush relay log info file");
 
267
  if (count_relay_log_space(rli))
 
268
  {
 
269
    msg="Error counting relay log space";
 
270
    goto err;
 
271
  }
 
272
  rli->inited= 1;
 
273
  pthread_mutex_unlock(&rli->data_lock);
 
274
  return(error);
 
275
 
 
276
err:
 
277
  sql_print_error(msg);
 
278
  end_io_cache(&rli->info_file);
 
279
  if (info_fd >= 0)
 
280
    my_close(info_fd, MYF(0));
 
281
  rli->info_fd= -1;
 
282
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
283
  pthread_mutex_unlock(&rli->data_lock);
 
284
  return(1);
 
285
}
 
286
 
 
287
 
 
288
static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
 
289
{
 
290
  struct stat s;
 
291
  if (stat(linfo->log_file_name,&s))
 
292
  {
 
293
    sql_print_error("log %s listed in the index, but failed to stat",
 
294
                    linfo->log_file_name);
 
295
    return(1);
 
296
  }
 
297
  rli->log_space_total += s.st_size;
 
298
  return(0);
 
299
}
 
300
 
 
301
 
 
302
static int count_relay_log_space(Relay_log_info* rli)
 
303
{
 
304
  LOG_INFO linfo;
 
305
  rli->log_space_total= 0;
 
306
  if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
 
307
  {
 
308
    sql_print_error("Could not find first log while counting relay log space");
 
309
    return(1);
 
310
  }
 
311
  do
 
312
  {
 
313
    if (add_relay_log(rli,&linfo))
 
314
      return(1);
 
315
  } while (!rli->relay_log.find_next_log(&linfo, 1));
 
316
  /*
 
317
     As we have counted everything, including what may have written in a
 
318
     preceding write, we must reset bytes_written, or we may count some space
 
319
     twice.
 
320
  */
 
321
  rli->relay_log.reset_bytes_written();
 
322
  return(0);
 
323
}
 
324
 
 
325
 
 
326
/*
 
327
   Reset UNTIL condition for Relay_log_info
 
328
 
 
329
   SYNOPSYS
 
330
    clear_until_condition()
 
331
      rli - Relay_log_info structure where UNTIL condition should be reset
 
332
 */
 
333
 
 
334
void Relay_log_info::clear_until_condition()
 
335
{
 
336
  until_condition= Relay_log_info::UNTIL_NONE;
 
337
  until_log_name[0]= 0;
 
338
  until_log_pos= 0;
 
339
  return;
 
340
}
 
341
 
 
342
 
 
343
/*
 
344
  Open the given relay log
 
345
 
 
346
  SYNOPSIS
 
347
    init_relay_log_pos()
 
348
    rli                 Relay information (will be initialized)
 
349
    log                 Name of relay log file to read from. NULL = First log
 
350
    pos                 Position in relay log file
 
351
    need_data_lock      Set to 1 if this functions should do mutex locks
 
352
    errmsg              Store pointer to error message here
 
353
    look_for_description_event
 
354
                        1 if we should look for such an event. We only need
 
355
                        this when the SQL thread starts and opens an existing
 
356
                        relay log and has to execute it (possibly from an
 
357
                        offset >4); then we need to read the first event of
 
358
                        the relay log to be able to parse the events we have
 
359
                        to execute.
 
360
 
 
361
  DESCRIPTION
 
362
  - Close old open relay log files.
 
363
  - If we are using the same relay log as the running IO-thread, then set
 
364
    rli->cur_log to point to the same IO_CACHE entry.
 
365
  - If not, open the 'log' binary file.
 
366
 
 
367
  TODO
 
368
    - check proper initialization of group_master_log_name/group_master_log_pos
 
369
 
 
370
  RETURN VALUES
 
371
    0   ok
 
372
    1   error.  errmsg is set to point to the error message
 
373
*/
 
374
 
 
375
int init_relay_log_pos(Relay_log_info* rli,const char* log,
 
376
                       ulonglong pos, bool need_data_lock,
 
377
                       const char** errmsg,
 
378
                       bool look_for_description_event)
 
379
{
 
380
  *errmsg=0;
 
381
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
 
382
 
 
383
  if (need_data_lock)
 
384
    pthread_mutex_lock(&rli->data_lock);
 
385
 
 
386
  /*
 
387
    Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
 
388
    is, too, and init_slave() too; these 2 functions allocate a description
 
389
    event in init_relay_log_pos, which is not freed by the terminating SQL slave
 
390
    thread as that thread is not started by these functions. So we have to free
 
391
    the description_event here, in case, so that there is no memory leak in
 
392
    running, say, CHANGE MASTER.
 
393
  */
 
394
  delete rli->relay_log.description_event_for_exec;
 
395
  /*
 
396
    By default the relay log is in binlog format 3 (4.0).
 
397
    Even if format is 4, this will work enough to read the first event
 
398
    (Format_desc) (remember that format 4 is just lenghtened compared to format
 
399
    3; format 3 is a prefix of format 4).
 
400
  */
 
401
  rli->relay_log.description_event_for_exec= new
 
402
    Format_description_log_event(3);
 
403
 
 
404
  pthread_mutex_lock(log_lock);
 
405
 
 
406
  /* Close log file and free buffers if it's already open */
 
407
  if (rli->cur_log_fd >= 0)
 
408
  {
 
409
    end_io_cache(&rli->cache_buf);
 
410
    my_close(rli->cur_log_fd, MYF(MY_WME));
 
411
    rli->cur_log_fd = -1;
 
412
  }
 
413
 
 
414
  rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
 
415
 
 
416
  /*
 
417
    Test to see if the previous run was with the skip of purging
 
418
    If yes, we do not purge when we restart
 
419
  */
 
420
  if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
 
421
  {
 
422
    *errmsg="Could not find first log during relay log initialization";
 
423
    goto err;
 
424
  }
 
425
 
 
426
  if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
 
427
  {
 
428
    *errmsg="Could not find target log during relay log initialization";
 
429
    goto err;
 
430
  }
 
431
  strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
 
432
          sizeof(rli->group_relay_log_name)-1);
 
433
  strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
 
434
          sizeof(rli->event_relay_log_name)-1);
 
435
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
 
436
  {
 
437
    /*
 
438
      The IO thread is using this log file.
 
439
      In this case, we will use the same IO_CACHE pointer to
 
440
      read data as the IO thread is using to write data.
 
441
    */
 
442
    my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
 
443
    if (check_binlog_magic(rli->cur_log,errmsg))
 
444
      goto err;
 
445
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
 
446
  }
 
447
  else
 
448
  {
 
449
    /*
 
450
      Open the relay log and set rli->cur_log to point at this one
 
451
    */
 
452
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
 
453
                                     rli->linfo.log_file_name,errmsg)) < 0)
 
454
      goto err;
 
455
    rli->cur_log = &rli->cache_buf;
 
456
  }
 
457
  /*
 
458
    In all cases, check_binlog_magic() has been called so we're at offset 4 for
 
459
    sure.
 
460
  */
 
461
  if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
 
462
  {
 
463
    Log_event* ev;
 
464
    while (look_for_description_event)
 
465
    {
 
466
      /*
 
467
        Read the possible Format_description_log_event; if position
 
468
        was 4, no need, it will be read naturally.
 
469
      */
 
470
      if (my_b_tell(rli->cur_log) >= pos)
 
471
        break;
 
472
 
 
473
      /*
 
474
        Because of we have rli->data_lock and log_lock, we can safely read an
 
475
        event
 
476
      */
 
477
      if (!(ev=Log_event::read_log_event(rli->cur_log,0,
 
478
                                         rli->relay_log.description_event_for_exec)))
 
479
      {
 
480
        if (rli->cur_log->error) /* not EOF */
 
481
        {
 
482
          *errmsg= "I/O error reading event at position 4";
 
483
          goto err;
 
484
        }
 
485
        break;
 
486
      }
 
487
      else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
 
488
      {
 
489
        delete rli->relay_log.description_event_for_exec;
 
490
        rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
 
491
        /*
 
492
          As ev was returned by read_log_event, it has passed is_valid(), so
 
493
          my_malloc() in ctor worked, no need to check again.
 
494
        */
 
495
        /*
 
496
          Ok, we found a Format_description event. But it is not sure that this
 
497
          describes the whole relay log; indeed, one can have this sequence
 
498
          (starting from position 4):
 
499
          Format_desc (of slave)
 
500
          Rotate (of master)
 
501
          Format_desc (of master)
 
502
          So the Format_desc which really describes the rest of the relay log
 
503
          is the 3rd event (it can't be further than that, because we rotate
 
504
          the relay log when we queue a Rotate event from the master).
 
505
          But what describes the Rotate is the first Format_desc.
 
506
          So what we do is:
 
507
          go on searching for Format_description events, until you exceed the
 
508
          position (argument 'pos') or until you find another event than Rotate
 
509
          or Format_desc.
 
510
        */
 
511
      }
 
512
      else
 
513
      {
 
514
        look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
 
515
        delete ev;
 
516
      }
 
517
    }
 
518
    my_b_seek(rli->cur_log,(off_t)pos);
 
519
 
 
520
  }
 
521
 
 
522
err:
 
523
  /*
 
524
    If we don't purge, we can't honour relay_log_space_limit ;
 
525
    silently discard it
 
526
  */
 
527
  if (!relay_log_purge)
 
528
    rli->log_space_limit= 0;
 
529
  pthread_cond_broadcast(&rli->data_cond);
 
530
 
 
531
  pthread_mutex_unlock(log_lock);
 
532
 
 
533
  if (need_data_lock)
 
534
    pthread_mutex_unlock(&rli->data_lock);
 
535
  if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
 
536
    *errmsg= "Invalid Format_description log event; could be out of memory";
 
537
 
 
538
  return ((*errmsg) ? 1 : 0);
 
539
}
 
540
 
 
541
 
 
542
/*
 
543
  Waits until the SQL thread reaches (has executed up to) the
 
544
  log/position or timed out.
 
545
 
 
546
  SYNOPSIS
 
547
    wait_for_pos()
 
548
    thd             client thread that sent SELECT MASTER_POS_WAIT
 
549
    log_name        log name to wait for
 
550
    log_pos         position to wait for
 
551
    timeout         timeout in seconds before giving up waiting
 
552
 
 
553
  NOTES
 
554
    timeout is longlong whereas it should be ulong ; but this is
 
555
    to catch if the user submitted a negative timeout.
 
556
 
 
557
  RETURN VALUES
 
558
    -2          improper arguments (log_pos<0)
 
559
                or slave not running, or master info changed
 
560
                during the function's execution,
 
561
                or client thread killed. -2 is translated to NULL by caller
 
562
    -1          timed out
 
563
    >=0         number of log events the function had to wait
 
564
                before reaching the desired log/position
 
565
 */
 
566
 
 
567
int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
 
568
                                    longlong log_pos,
 
569
                                    longlong timeout)
 
570
{
 
571
  int event_count = 0;
 
572
  ulong init_abort_pos_wait;
 
573
  int error=0;
 
574
  struct timespec abstime; // for timeout checking
 
575
  const char *msg;
 
576
 
 
577
  if (!inited)
 
578
    return(-2);
 
579
 
 
580
  set_timespec(abstime,timeout);
 
581
  pthread_mutex_lock(&data_lock);
 
582
  msg= thd->enter_cond(&data_cond, &data_lock,
 
583
                       "Waiting for the slave SQL thread to "
 
584
                       "advance position");
 
585
  /*
 
586
     This function will abort when it notices that some CHANGE MASTER or
 
587
     RESET MASTER has changed the master info.
 
588
     To catch this, these commands modify abort_pos_wait ; We just monitor
 
589
     abort_pos_wait and see if it has changed.
 
590
     Why do we have this mechanism instead of simply monitoring slave_running
 
591
     in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
 
592
     the SQL thread be stopped?
 
593
     This is becasue if someones does:
 
594
     STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
 
595
     the change may happen very quickly and we may not notice that
 
596
     slave_running briefly switches between 1/0/1.
 
597
  */
 
598
  init_abort_pos_wait= abort_pos_wait;
 
599
 
 
600
  /*
 
601
    We'll need to
 
602
    handle all possible log names comparisons (e.g. 999 vs 1000).
 
603
    We use ulong for string->number conversion ; this is no
 
604
    stronger limitation than in find_uniq_filename in sql/log.cc
 
605
  */
 
606
  ulong log_name_extension;
 
607
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
 
608
 
 
609
  strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), FN_REFLEN-1));
 
610
 
 
611
  char *p= fn_ext(log_name_tmp);
 
612
  char *p_end;
 
613
  if (!*p || log_pos<0)
 
614
  {
 
615
    error= -2; //means improper arguments
 
616
    goto err;
 
617
  }
 
618
  // Convert 0-3 to 4
 
619
  log_pos= max(log_pos, BIN_LOG_HEADER_SIZE);
 
620
  /* p points to '.' */
 
621
  log_name_extension= strtoul(++p, &p_end, 10);
 
622
  /*
 
623
    p_end points to the first invalid character.
 
624
    If it equals to p, no digits were found, error.
 
625
    If it contains '\0' it means conversion went ok.
 
626
  */
 
627
  if (p_end==p || *p_end)
 
628
  {
 
629
    error= -2;
 
630
    goto err;
 
631
  }
 
632
 
 
633
  /* The "compare and wait" main loop */
 
634
  while (!thd->killed &&
 
635
         init_abort_pos_wait == abort_pos_wait &&
 
636
         slave_running)
 
637
  {
 
638
    bool pos_reached;
 
639
    int cmp_result= 0;
 
640
 
 
641
    /*
 
642
      group_master_log_name can be "", if we are just after a fresh
 
643
      replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
 
644
      (before we have executed one Rotate event from the master) or
 
645
      (rare) if the user is doing a weird slave setup (see next
 
646
      paragraph).  If group_master_log_name is "", we assume we don't
 
647
      have enough info to do the comparison yet, so we just wait until
 
648
      more data. In this case master_log_pos is always 0 except if
 
649
      somebody (wrongly) sets this slave to be a slave of itself
 
650
      without using --replicate-same-server-id (an unsupported
 
651
      configuration which does nothing), then group_master_log_pos
 
652
      will grow and group_master_log_name will stay "".
 
653
    */
 
654
    if (*group_master_log_name)
 
655
    {
 
656
      char *basename= (group_master_log_name +
 
657
                       dirname_length(group_master_log_name));
 
658
      /*
 
659
        First compare the parts before the extension.
 
660
        Find the dot in the master's log basename,
 
661
        and protect against user's input error :
 
662
        if the names do not match up to '.' included, return error
 
663
      */
 
664
      char *q= (char*)(fn_ext(basename)+1);
 
665
      if (strncmp(basename, log_name_tmp, (int)(q-basename)))
 
666
      {
 
667
        error= -2;
 
668
        break;
 
669
      }
 
670
      // Now compare extensions.
 
671
      char *q_end;
 
672
      ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
 
673
      if (group_master_log_name_extension < log_name_extension)
 
674
        cmp_result= -1 ;
 
675
      else
 
676
        cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
 
677
 
 
678
      pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
 
679
                    cmp_result > 0);
 
680
      if (pos_reached || thd->killed)
 
681
        break;
 
682
    }
 
683
 
 
684
    //wait for master update, with optional timeout.
 
685
 
 
686
    /*
 
687
      We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
 
688
      will wake us up.
 
689
    */
 
690
    if (timeout > 0)
 
691
    {
 
692
      /*
 
693
        Note that pthread_cond_timedwait checks for the timeout
 
694
        before for the condition ; i.e. it returns ETIMEDOUT
 
695
        if the system time equals or exceeds the time specified by abstime
 
696
        before the condition variable is signaled or broadcast, _or_ if
 
697
        the absolute time specified by abstime has already passed at the time
 
698
        of the call.
 
699
        For that reason, pthread_cond_timedwait will do the "timeoutting" job
 
700
        even if its condition is always immediately signaled (case of a loaded
 
701
        master).
 
702
      */
 
703
      error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
 
704
    }
 
705
    else
 
706
      pthread_cond_wait(&data_cond, &data_lock);
 
707
    if (error == ETIMEDOUT || error == ETIME)
 
708
    {
 
709
      error= -1;
 
710
      break;
 
711
    }
 
712
    error=0;
 
713
    event_count++;
 
714
  }
 
715
 
 
716
err:
 
717
  thd->exit_cond(msg);
 
718
  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
 
719
      !slave_running)
 
720
  {
 
721
    error= -2;
 
722
  }
 
723
  return( error ? error : event_count );
 
724
}
 
725
 
 
726
 
 
727
void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
 
728
                                                bool skip_lock)
 
729
{
 
730
  if (!skip_lock)
 
731
    pthread_mutex_lock(&data_lock);
 
732
  inc_event_relay_log_pos();
 
733
  group_relay_log_pos= event_relay_log_pos;
 
734
  strmake(group_relay_log_name,event_relay_log_name,
 
735
          sizeof(group_relay_log_name)-1);
 
736
 
 
737
  notify_group_relay_log_name_update();
 
738
 
 
739
  /*
 
740
    If the slave does not support transactions and replicates a transaction,
 
741
    users should not trust group_master_log_pos (which they can display with
 
742
    SHOW SLAVE STATUS or read from relay-log.info), because to compute
 
743
    group_master_log_pos the slave relies on log_pos stored in the master's
 
744
    binlog, but if we are in a master's transaction these positions are always
 
745
    the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
 
746
    not advance as it should on the non-transactional slave (it advances by
 
747
    big leaps, whereas it should advance by small leaps).
 
748
  */
 
749
  /*
 
750
    In 4.x we used the event's len to compute the positions here. This is
 
751
    wrong if the event was 3.23/4.0 and has been converted to 5.0, because
 
752
    then the event's len is not what is was in the master's binlog, so this
 
753
    will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
 
754
    replication: Exec_master_log_pos is wrong). Only way to solve this is to
 
755
    have the original offset of the end of the event the relay log. This is
 
756
    what we do in 5.0: log_pos has become "end_log_pos" (because the real use
 
757
    of log_pos in 4.0 was to compute the end_log_pos; so better to store
 
758
    end_log_pos instead of begin_log_pos.
 
759
    If we had not done this fix here, the problem would also have appeared
 
760
    when the slave and master are 5.0 but with different event length (for
 
761
    example the slave is more recent than the master and features the event
 
762
    UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
 
763
    SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
 
764
    value which would lead to badly broken replication.
 
765
    Even the relay_log_pos will be corrupted in this case, because the len is
 
766
    the relay log is not "val".
 
767
    With the end_log_pos solution, we avoid computations involving lengthes.
 
768
  */
 
769
  if (log_pos) // 3.23 binlogs don't have log_posx
 
770
  {
 
771
    group_master_log_pos= log_pos;
 
772
  }
 
773
  pthread_cond_broadcast(&data_cond);
 
774
  if (!skip_lock)
 
775
    pthread_mutex_unlock(&data_lock);
 
776
  return;
 
777
}
 
778
 
 
779
 
 
780
void Relay_log_info::close_temporary_tables()
 
781
{
 
782
  TABLE *table,*next;
 
783
 
 
784
  for (table=save_temporary_tables ; table ; table=next)
 
785
  {
 
786
    next=table->next;
 
787
    /*
 
788
      Don't ask for disk deletion. For now, anyway they will be deleted when
 
789
      slave restarts, but it is a better intention to not delete them.
 
790
    */
 
791
    close_temporary(table, 1, 0);
 
792
  }
 
793
  save_temporary_tables= 0;
 
794
  slave_open_temp_tables= 0;
 
795
  return;
 
796
}
 
797
 
 
798
/*
 
799
  purge_relay_logs()
 
800
 
 
801
  NOTES
 
802
    Assumes to have a run lock on rli and that no slave thread are running.
 
803
*/
 
804
 
 
805
int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
 
806
                     const char** errmsg)
 
807
{
 
808
  int error=0;
 
809
 
 
810
  /*
 
811
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
 
812
    Indeed, rli->inited==0 does not imply that they already are empty.
 
813
    It could be that slave's info initialization partly succeeded :
 
814
    for example if relay-log.info existed but *relay-bin*.*
 
815
    have been manually removed, init_relay_log_info reads the old
 
816
    relay-log.info and fills rli->master_log_*, then init_relay_log_info
 
817
    checks for the existence of the relay log, this fails and
 
818
    init_relay_log_info leaves rli->inited to 0.
 
819
    In that pathological case, rli->master_log_pos* will be properly reinited
 
820
    at the next START SLAVE (as RESET SLAVE or CHANGE
 
821
    MASTER, the callers of purge_relay_logs, will delete bogus *.info files
 
822
    or replace them with correct files), however if the user does SHOW SLAVE
 
823
    STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
 
824
    In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
 
825
    to display fine in any case.
 
826
  */
 
827
 
 
828
  rli->group_master_log_name[0]= 0;
 
829
  rli->group_master_log_pos= 0;
 
830
 
 
831
  if (!rli->inited)
 
832
  {
 
833
    return(0);
 
834
  }
 
835
 
 
836
  assert(rli->slave_running == 0);
 
837
  assert(rli->mi->slave_running == 0);
 
838
 
 
839
  rli->slave_skip_counter=0;
 
840
  pthread_mutex_lock(&rli->data_lock);
 
841
 
 
842
  /*
 
843
    we close the relay log fd possibly left open by the slave SQL thread,
 
844
    to be able to delete it; the relay log fd possibly left open by the slave
 
845
    I/O thread will be closed naturally in reset_logs() by the
 
846
    close(LOG_CLOSE_TO_BE_OPENED) call
 
847
  */
 
848
  if (rli->cur_log_fd >= 0)
 
849
  {
 
850
    end_io_cache(&rli->cache_buf);
 
851
    my_close(rli->cur_log_fd, MYF(MY_WME));
 
852
    rli->cur_log_fd= -1;
 
853
  }
 
854
 
 
855
  if (rli->relay_log.reset_logs(thd))
 
856
  {
 
857
    *errmsg = "Failed during log reset";
 
858
    error=1;
 
859
    goto err;
 
860
  }
 
861
  /* Save name of used relay log file */
 
862
  strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
 
863
          sizeof(rli->group_relay_log_name)-1);
 
864
  strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
 
865
          sizeof(rli->event_relay_log_name)-1);
 
866
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
 
867
  if (count_relay_log_space(rli))
 
868
  {
 
869
    *errmsg= "Error counting relay log space";
 
870
    goto err;
 
871
  }
 
872
  if (!just_reset)
 
873
    error= init_relay_log_pos(rli, rli->group_relay_log_name,
 
874
                              rli->group_relay_log_pos,
 
875
                              0 /* do not need data lock */, errmsg, 0);
 
876
 
 
877
err:
 
878
  pthread_mutex_unlock(&rli->data_lock);
 
879
  return(error);
 
880
}
 
881
 
 
882
 
 
883
/*
 
884
     Check if condition stated in UNTIL clause of START SLAVE is reached.
 
885
   SYNOPSYS
 
886
     Relay_log_info::is_until_satisfied()
 
887
     master_beg_pos    position of the beginning of to be executed event
 
888
                       (not log_pos member of the event that points to the
 
889
                        beginning of the following event)
 
890
 
 
891
 
 
892
   DESCRIPTION
 
893
     Checks if UNTIL condition is reached. Uses caching result of last
 
894
     comparison of current log file name and target log file name. So cached
 
895
     value should be invalidated if current log file name changes
 
896
     (see Relay_log_info::notify_... functions).
 
897
 
 
898
     This caching is needed to avoid of expensive string comparisons and
 
899
     strtol() conversions needed for log names comparison. We don't need to
 
900
     compare them each time this function is called, we only need to do this
 
901
     when current log name changes. If we have UNTIL_MASTER_POS condition we
 
902
     need to do this only after Rotate_log_event::do_apply_event() (which is
 
903
     rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
 
904
     condition then we should invalidate cached comarison value after
 
905
     inc_group_relay_log_pos() which called for each group of events (so we
 
906
     have some benefit if we have something like queries that use
 
907
     autoincrement or if we have transactions).
 
908
 
 
909
     Should be called ONLY if until_condition != UNTIL_NONE !
 
910
   RETURN VALUE
 
911
     true - condition met or error happened (condition seems to have
 
912
            bad log file name)
 
913
     false - condition not met
 
914
*/
 
915
 
 
916
bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
 
917
{
 
918
  const char *log_name;
 
919
  ulonglong log_pos;
 
920
 
 
921
  assert(until_condition != UNTIL_NONE);
 
922
 
 
923
  if (until_condition == UNTIL_MASTER_POS)
 
924
  {
 
925
    log_name= group_master_log_name;
 
926
    log_pos= master_beg_pos;
 
927
  }
 
928
  else
 
929
  { /* until_condition == UNTIL_RELAY_POS */
 
930
    log_name= group_relay_log_name;
 
931
    log_pos= group_relay_log_pos;
 
932
  }
 
933
 
 
934
  if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
 
935
  {
 
936
    /*
 
937
      We have no cached comparison results so we should compare log names
 
938
      and cache result.
 
939
      If we are after RESET SLAVE, and the SQL slave thread has not processed
 
940
      any event yet, it could be that group_master_log_name is "". In that case,
 
941
      just wait for more events (as there is no sensible comparison to do).
 
942
    */
 
943
 
 
944
    if (*log_name)
 
945
    {
 
946
      const char *basename= log_name + dirname_length(log_name);
 
947
 
 
948
      const char *q= (const char*)(fn_ext(basename)+1);
 
949
      if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
 
950
      {
 
951
        /* Now compare extensions. */
 
952
        char *q_end;
 
953
        ulong log_name_extension= strtoul(q, &q_end, 10);
 
954
        if (log_name_extension < until_log_name_extension)
 
955
          until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
 
956
        else
 
957
          until_log_names_cmp_result=
 
958
            (log_name_extension > until_log_name_extension) ?
 
959
            UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
 
960
      }
 
961
      else
 
962
      {
 
963
        /* Probably error so we aborting */
 
964
        sql_print_error("Slave SQL thread is stopped because UNTIL "
 
965
                        "condition is bad.");
 
966
        return(true);
 
967
      }
 
968
    }
 
969
    else
 
970
      return(until_log_pos == 0);
 
971
  }
 
972
 
 
973
  return(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
 
974
           log_pos >= until_log_pos) ||
 
975
          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
 
976
}
 
977
 
 
978
 
 
979
void Relay_log_info::cached_charset_invalidate()
 
980
{
 
981
  /* Full of zeroes means uninitialized. */
 
982
  bzero(cached_charset, sizeof(cached_charset));
 
983
  return;
 
984
}
 
985
 
 
986
 
 
987
bool Relay_log_info::cached_charset_compare(char *charset) const
 
988
{
 
989
  if (bcmp((uchar*) cached_charset, (uchar*) charset,
 
990
           sizeof(cached_charset)))
 
991
  {
 
992
    memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
 
993
    return(1);
 
994
  }
 
995
  return(0);
 
996
}
 
997
 
 
998
 
 
999
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
 
1000
                                  time_t event_creation_time)
 
1001
{
 
1002
  extern uint debug_not_change_ts_if_art_event;
 
1003
  clear_flag(IN_STMT);
 
1004
 
 
1005
  /*
 
1006
    If in a transaction, and if the slave supports transactions, just
 
1007
    inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
 
1008
    (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
 
1009
    BEGIN/COMMIT, not with SET AUTOCOMMIT= .
 
1010
 
 
1011
    CAUTION: opt_using_transactions means innodb || bdb ; suppose the
 
1012
    master supports InnoDB and BDB, but the slave supports only BDB,
 
1013
    problems will arise: - suppose an InnoDB table is created on the
 
1014
    master, - then it will be MyISAM on the slave - but as
 
1015
    opt_using_transactions is true, the slave will believe he is
 
1016
    transactional with the MyISAM table. And problems will come when
 
1017
    one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
 
1018
    resume at BEGIN whereas there has not been any rollback).  This is
 
1019
    the problem of using opt_using_transactions instead of a finer
 
1020
    "does the slave support _transactional handler used on the
 
1021
    master_".
 
1022
 
 
1023
    More generally, we'll have problems when a query mixes a
 
1024
    transactional handler and MyISAM and STOP SLAVE is issued in the
 
1025
    middle of the "transaction". START SLAVE will resume at BEGIN
 
1026
    while the MyISAM table has already been updated.
 
1027
  */
 
1028
  if ((sql_thd->options & OPTION_BEGIN) && opt_using_transactions)
 
1029
    inc_event_relay_log_pos();
 
1030
  else
 
1031
  {
 
1032
    inc_group_relay_log_pos(event_master_log_pos);
 
1033
    flush_relay_log_info(this);
 
1034
    /*
 
1035
      Note that Rotate_log_event::do_apply_event() does not call this
 
1036
      function, so there is no chance that a fake rotate event resets
 
1037
      last_master_timestamp.  Note that we update without mutex
 
1038
      (probably ok - except in some very rare cases, only consequence
 
1039
      is that value may take some time to display in
 
1040
      Seconds_Behind_Master - not critical).
 
1041
    */
 
1042
    if (!(event_creation_time == 0 && debug_not_change_ts_if_art_event > 0))
 
1043
      last_master_timestamp= event_creation_time;
 
1044
  }
 
1045
}
 
1046
 
 
1047
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 
1048
void Relay_log_info::cleanup_context(THD *thd, bool error)
 
1049
{
 
1050
  assert(sql_thd == thd);
 
1051
  /*
 
1052
    1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
 
1053
    may have opened tables, which we cannot be sure have been closed (because
 
1054
    maybe the Rows_log_event have not been found or will not be, because slave
 
1055
    SQL thread is stopping, or relay log has a missing tail etc). So we close
 
1056
    all thread's tables. And so the table mappings have to be cancelled.
 
1057
    2) Rows_log_event::do_apply_event() may even have started statements or
 
1058
    transactions on them, which we need to rollback in case of error.
 
1059
    3) If finding a Format_description_log_event after a BEGIN, we also need
 
1060
    to rollback before continuing with the next events.
 
1061
    4) so we need this "context cleanup" function.
 
1062
  */
 
1063
  if (error)
 
1064
  {
 
1065
    ha_autocommit_or_rollback(thd, 1); // if a "statement transaction"
 
1066
    end_trans(thd, ROLLBACK); // if a "real transaction"
 
1067
  }
 
1068
  m_table_map.clear_tables();
 
1069
  close_thread_tables(thd);
 
1070
  clear_tables_to_lock();
 
1071
  clear_flag(IN_STMT);
 
1072
  /*
 
1073
    Cleanup for the flags that have been set at do_apply_event.
 
1074
  */
 
1075
  thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
 
1076
  thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
 
1077
  last_event_start_time= 0;
 
1078
  return;
 
1079
}
 
1080
 
 
1081
void Relay_log_info::clear_tables_to_lock()
 
1082
{
 
1083
  while (tables_to_lock)
 
1084
  {
 
1085
    uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
 
1086
    if (tables_to_lock->m_tabledef_valid)
 
1087
    {
 
1088
      tables_to_lock->m_tabledef.table_def::~table_def();
 
1089
      tables_to_lock->m_tabledef_valid= false;
 
1090
    }
 
1091
    tables_to_lock=
 
1092
      static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
 
1093
    tables_to_lock_count--;
 
1094
    my_free(to_free, MYF(MY_WME));
 
1095
  }
 
1096
  assert(tables_to_lock == NULL && tables_to_lock_count == 0);
 
1097
}
 
1098
 
 
1099
#endif