~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/rpl_rli.cc

  • Committer: Brian Aker
  • Date: 2009-04-17 01:45:33 UTC
  • Revision ID: brian@gaz-20090417014533-exdrtriab9zecqs2
Refactor get_variable to session

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2000-2003 MySQL AB
2
 
 
3
 
   This program is free software; you can redistribute it and/or modify
4
 
   it under the terms of the GNU General Public License as published by
5
 
   the Free Software Foundation; version 2 of the License.
6
 
 
7
 
   This program is distributed in the hope that it will be useful,
8
 
   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 
   GNU General Public License for more details.
11
 
 
12
 
   You should have received a copy of the GNU General Public License
13
 
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
 
 
16
 
#include <drizzled/server_includes.h>
17
 
 
18
 
#include "rpl_mi.h"
19
 
#include "rpl_rli.h"
20
 
#include "sql_repl.h"  // For check_binlog_magic
21
 
#include "rpl_utility.h"
22
 
 
23
 
static int32_t count_relay_log_space(Relay_log_info* rli);
24
 
 
25
 
// Defined in slave.cc
26
 
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val);
27
 
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
28
 
                          const char *default_val);
29
 
 
30
 
 
31
 
Relay_log_info::Relay_log_info()
32
 
  :Slave_reporting_capability("SQL"),
33
 
   no_storage(false), replicate_same_server_id(::replicate_same_server_id),
34
 
   info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
35
 
#if defined(HAVE_purify) && HAVE_purify
36
 
   is_fake(false),
37
 
#endif
38
 
   cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
39
 
   group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
40
 
   last_master_timestamp(0), slave_skip_counter(0),
41
 
   abort_pos_wait(0), slave_run_id(0), sql_thd(0),
42
 
   inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
43
 
   until_log_pos(0), retried_trans(0),
44
 
   tables_to_lock(0), tables_to_lock_count(0),
45
 
   last_event_start_time(0), m_flags(0)
46
 
{
47
 
  group_relay_log_name[0]= event_relay_log_name[0]=
48
 
    group_master_log_name[0]= 0;
49
 
  until_log_name[0]= ign_master_log_name_end[0]= 0;
50
 
  memset(&info_file, 0, sizeof(info_file));
51
 
  memset(&cache_buf, 0, sizeof(cache_buf));
52
 
  cached_charset_invalidate();
53
 
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
54
 
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
55
 
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
56
 
  pthread_cond_init(&data_cond, NULL);
57
 
  pthread_cond_init(&start_cond, NULL);
58
 
  pthread_cond_init(&stop_cond, NULL);
59
 
  pthread_cond_init(&log_space_cond, NULL);
60
 
  relay_log.init_pthread_objects();
61
 
  return;
62
 
}
63
 
 
64
 
 
65
 
Relay_log_info::~Relay_log_info()
66
 
{
67
 
  pthread_mutex_destroy(&run_lock);
68
 
  pthread_mutex_destroy(&data_lock);
69
 
  pthread_mutex_destroy(&log_space_lock);
70
 
  pthread_cond_destroy(&data_cond);
71
 
  pthread_cond_destroy(&start_cond);
72
 
  pthread_cond_destroy(&stop_cond);
73
 
  pthread_cond_destroy(&log_space_cond);
74
 
  relay_log.cleanup();
75
 
  return;
76
 
}
77
 
 
78
 
 
79
 
int32_t init_relay_log_info(Relay_log_info* rli,
80
 
                        const char* info_fname)
81
 
{
82
 
  char fname[FN_REFLEN+128];
83
 
  int32_t info_fd;
84
 
  const char* msg = 0;
85
 
  int32_t error = 0;
86
 
  assert(!rli->no_storage);         // Don't init if there is no storage
87
 
 
88
 
  if (rli->inited)                       // Set if this function called
89
 
    return(0);
90
 
  fn_format(fname, info_fname, mysql_data_home, "", 4+32);
91
 
  pthread_mutex_lock(&rli->data_lock);
92
 
  info_fd = rli->info_fd;
93
 
  rli->cur_log_fd = -1;
94
 
  rli->slave_skip_counter=0;
95
 
  rli->abort_pos_wait=0;
96
 
  rli->log_space_limit= relay_log_space_limit;
97
 
  rli->log_space_total= 0;
98
 
  rli->tables_to_lock= 0;
99
 
  rli->tables_to_lock_count= 0;
100
 
 
101
 
  /*
102
 
    The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
103
 
    Note that the I/O thread flushes it to disk after writing every
104
 
    event, in flush_master_info(mi, 1).
105
 
  */
106
 
 
107
 
  /*
108
 
    For the maximum log size, we choose max_relay_log_size if it is
109
 
    non-zero, max_binlog_size otherwise. If later the user does SET
110
 
    GLOBAL on one of these variables, fix_max_binlog_size and
111
 
    fix_max_relay_log_size will reconsider the choice (for example
112
 
    if the user changes max_relay_log_size to zero, we have to
113
 
    switch to using max_binlog_size for the relay log) and update
114
 
    rli->relay_log.max_size (and mysql_bin_log.max_size).
115
 
  */
116
 
  {
117
 
    char buf[FN_REFLEN];
118
 
    const char *ln;
119
 
    static bool name_warning_sent= 0;
120
 
    ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
121
 
                                     1, buf);
122
 
    /* We send the warning only at startup, not after every RESET SLAVE */
123
 
    if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
124
 
    {
125
 
      /*
126
 
        User didn't give us info to name the relay log index file.
127
 
        Picking `hostname`-relay-bin.index like we do, causes replication to
128
 
        fail if this slave's hostname is changed later. So, we would like to
129
 
        instead require a name. But as we don't want to break many existing
130
 
        setups, we only give warning, not error.
131
 
      */
132
 
      sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
133
 
                        " so replication "
134
 
                        "may break when this MySQL server acts as a "
135
 
                        "slave and has his hostname changed!! Please "
136
 
                        "use '--relay-log=%s' to avoid this problem.", ln);
137
 
      name_warning_sent= 1;
138
 
    }
139
 
    /*
140
 
      note, that if open() fails, we'll still have index file open
141
 
      but a destructor will take care of that
142
 
    */
143
 
    if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln) ||
144
 
        rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
145
 
                            (max_relay_log_size ? max_relay_log_size :
146
 
                            max_binlog_size), 1))
147
 
    {
148
 
      pthread_mutex_unlock(&rli->data_lock);
149
 
      sql_print_error("Failed in open_log() called from init_relay_log_info()");
150
 
      return(1);
151
 
    }
152
 
  }
153
 
 
154
 
  /* if file does not exist */
155
 
  if (access(fname,F_OK))
156
 
  {
157
 
    /*
158
 
      If someone removed the file from underneath our feet, just close
159
 
      the old descriptor and re-create the old file
160
 
    */
161
 
    if (info_fd >= 0)
162
 
      my_close(info_fd, MYF(MY_WME));
163
 
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
164
 
    {
165
 
      sql_print_error("Failed to create a new relay log info file (\
166
 
file '%s', errno %d)", fname, my_errno);
167
 
      msg= current_thd->main_da.message();
168
 
      goto err;
169
 
    }
170
 
    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
171
 
                      MYF(MY_WME)))
172
 
    {
173
 
      sql_print_error("Failed to create a cache on relay log info file '%s'",
174
 
                      fname);
175
 
      msg= current_thd->main_da.message();
176
 
      goto err;
177
 
    }
178
 
 
179
 
    /* Init relay log with first entry in the relay index file */
180
 
    if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
181
 
                           &msg, 0))
182
 
    {
183
 
      sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
184
 
      goto err;
185
 
    }
186
 
    rli->group_master_log_name[0]= 0;
187
 
    rli->group_master_log_pos= 0;
188
 
    rli->info_fd= info_fd;
189
 
  }
190
 
  else // file exists
191
 
  {
192
 
    if (info_fd >= 0)
193
 
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
194
 
    else
195
 
    {
196
 
      int32_t error=0;
197
 
      if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
198
 
      {
199
 
        sql_print_error("\
200
 
Failed to open the existing relay log info file '%s' (errno %d)",
201
 
                        fname, my_errno);
202
 
        error= 1;
203
 
      }
204
 
      else if (init_io_cache(&rli->info_file, info_fd,
205
 
                             IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
206
 
      {
207
 
        sql_print_error("Failed to create a cache on relay log info file '%s'",
208
 
                        fname);
209
 
        error= 1;
210
 
      }
211
 
      if (error)
212
 
      {
213
 
        if (info_fd >= 0)
214
 
          my_close(info_fd, MYF(0));
215
 
        rli->info_fd= -1;
216
 
        rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
217
 
        pthread_mutex_unlock(&rli->data_lock);
218
 
        return(1);
219
 
      }
220
 
    }
221
 
 
222
 
    rli->info_fd = info_fd;
223
 
    int32_t relay_log_pos, master_log_pos;
224
 
    if (init_strvar_from_file(rli->group_relay_log_name,
225
 
                              sizeof(rli->group_relay_log_name),
226
 
                              &rli->info_file, "") ||
227
 
       init_intvar_from_file(&relay_log_pos,
228
 
                             &rli->info_file, BIN_LOG_HEADER_SIZE) ||
229
 
       init_strvar_from_file(rli->group_master_log_name,
230
 
                             sizeof(rli->group_master_log_name),
231
 
                             &rli->info_file, "") ||
232
 
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
233
 
    {
234
 
      msg="Error reading slave log configuration";
235
 
      goto err;
236
 
    }
237
 
    strmake(rli->event_relay_log_name,rli->group_relay_log_name,
238
 
            sizeof(rli->event_relay_log_name)-1);
239
 
    rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
240
 
    rli->group_master_log_pos= master_log_pos;
241
 
 
242
 
    if (init_relay_log_pos(rli,
243
 
                           rli->group_relay_log_name,
244
 
                           rli->group_relay_log_pos,
245
 
                           0 /* no data lock*/,
246
 
                           &msg, 0))
247
 
    {
248
 
      char llbuf[22];
249
 
      sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
250
 
                      rli->group_relay_log_name,
251
 
                      llstr(rli->group_relay_log_pos, llbuf));
252
 
      goto err;
253
 
    }
254
 
  }
255
 
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
256
 
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
257
 
 
258
 
  /*
259
 
    Now change the cache from READ to WRITE - must do this
260
 
    before flush_relay_log_info
261
 
  */
262
 
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
263
 
  if ((error= flush_relay_log_info(rli)))
264
 
    sql_print_error("Failed to flush relay log info file");
265
 
  if (count_relay_log_space(rli))
266
 
  {
267
 
    msg="Error counting relay log space";
268
 
    goto err;
269
 
  }
270
 
  rli->inited= 1;
271
 
  pthread_mutex_unlock(&rli->data_lock);
272
 
  return(error);
273
 
 
274
 
err:
275
 
  sql_print_error(msg);
276
 
  end_io_cache(&rli->info_file);
277
 
  if (info_fd >= 0)
278
 
    my_close(info_fd, MYF(0));
279
 
  rli->info_fd= -1;
280
 
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
281
 
  pthread_mutex_unlock(&rli->data_lock);
282
 
  return(1);
283
 
}
284
 
 
285
 
 
286
 
static inline int32_t add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
287
 
{
288
 
  struct stat s;
289
 
  if (stat(linfo->log_file_name,&s))
290
 
  {
291
 
    sql_print_error("log %s listed in the index, but failed to stat",
292
 
                    linfo->log_file_name);
293
 
    return(1);
294
 
  }
295
 
  rli->log_space_total += s.st_size;
296
 
  return(0);
297
 
}
298
 
 
299
 
 
300
 
static int32_t count_relay_log_space(Relay_log_info* rli)
301
 
{
302
 
  LOG_INFO linfo;
303
 
  rli->log_space_total= 0;
304
 
  if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
305
 
  {
306
 
    sql_print_error("Could not find first log while counting relay log space");
307
 
    return(1);
308
 
  }
309
 
  do
310
 
  {
311
 
    if (add_relay_log(rli,&linfo))
312
 
      return(1);
313
 
  } while (!rli->relay_log.find_next_log(&linfo, 1));
314
 
  /*
315
 
     As we have counted everything, including what may have written in a
316
 
     preceding write, we must reset bytes_written, or we may count some space
317
 
     twice.
318
 
  */
319
 
  rli->relay_log.reset_bytes_written();
320
 
  return(0);
321
 
}
322
 
 
323
 
 
324
 
/*
325
 
   Reset UNTIL condition for Relay_log_info
326
 
 
327
 
   SYNOPSYS
328
 
    clear_until_condition()
329
 
      rli - Relay_log_info structure where UNTIL condition should be reset
330
 
 */
331
 
 
332
 
void Relay_log_info::clear_until_condition()
333
 
{
334
 
  until_condition= Relay_log_info::UNTIL_NONE;
335
 
  until_log_name[0]= 0;
336
 
  until_log_pos= 0;
337
 
  return;
338
 
}
339
 
 
340
 
 
341
 
/*
342
 
  Open the given relay log
343
 
 
344
 
  SYNOPSIS
345
 
    init_relay_log_pos()
346
 
    rli                 Relay information (will be initialized)
347
 
    log                 Name of relay log file to read from. NULL = First log
348
 
    pos                 Position in relay log file
349
 
    need_data_lock      Set to 1 if this functions should do mutex locks
350
 
    errmsg              Store pointer to error message here
351
 
    look_for_description_event
352
 
                        1 if we should look for such an event. We only need
353
 
                        this when the SQL thread starts and opens an existing
354
 
                        relay log and has to execute it (possibly from an
355
 
                        offset >4); then we need to read the first event of
356
 
                        the relay log to be able to parse the events we have
357
 
                        to execute.
358
 
 
359
 
  DESCRIPTION
360
 
  - Close old open relay log files.
361
 
  - If we are using the same relay log as the running IO-thread, then set
362
 
    rli->cur_log to point to the same IO_CACHE entry.
363
 
  - If not, open the 'log' binary file.
364
 
 
365
 
  TODO
366
 
    - check proper initialization of group_master_log_name/group_master_log_pos
367
 
 
368
 
  RETURN VALUES
369
 
    0   ok
370
 
    1   error.  errmsg is set to point to the error message
371
 
*/
372
 
 
373
 
int32_t init_relay_log_pos(Relay_log_info* rli,const char* log,
374
 
                       uint64_t pos, bool need_data_lock,
375
 
                       const char** errmsg,
376
 
                       bool look_for_description_event)
377
 
{
378
 
  *errmsg=0;
379
 
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
380
 
 
381
 
  if (need_data_lock)
382
 
    pthread_mutex_lock(&rli->data_lock);
383
 
 
384
 
  /*
385
 
    Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
386
 
    is, too, and init_slave() too; these 2 functions allocate a description
387
 
    event in init_relay_log_pos, which is not freed by the terminating SQL slave
388
 
    thread as that thread is not started by these functions. So we have to free
389
 
    the description_event here, in case, so that there is no memory leak in
390
 
    running, say, CHANGE MASTER.
391
 
  */
392
 
  delete rli->relay_log.description_event_for_exec;
393
 
  /*
394
 
    By default the relay log is in binlog format 3 (4.0).
395
 
    Even if format is 4, this will work enough to read the first event
396
 
    (Format_desc) (remember that format 4 is just lenghtened compared to format
397
 
    3; format 3 is a prefix of format 4).
398
 
  */
399
 
  rli->relay_log.description_event_for_exec= new
400
 
    Format_description_log_event(3);
401
 
 
402
 
  pthread_mutex_lock(log_lock);
403
 
 
404
 
  /* Close log file and free buffers if it's already open */
405
 
  if (rli->cur_log_fd >= 0)
406
 
  {
407
 
    end_io_cache(&rli->cache_buf);
408
 
    my_close(rli->cur_log_fd, MYF(MY_WME));
409
 
    rli->cur_log_fd = -1;
410
 
  }
411
 
 
412
 
  rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
413
 
 
414
 
  /*
415
 
    Test to see if the previous run was with the skip of purging
416
 
    If yes, we do not purge when we restart
417
 
  */
418
 
  if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
419
 
  {
420
 
    *errmsg="Could not find first log during relay log initialization";
421
 
    goto err;
422
 
  }
423
 
 
424
 
  if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
425
 
  {
426
 
    *errmsg="Could not find target log during relay log initialization";
427
 
    goto err;
428
 
  }
429
 
  strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
430
 
          sizeof(rli->group_relay_log_name)-1);
431
 
  strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
432
 
          sizeof(rli->event_relay_log_name)-1);
433
 
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
434
 
  {
435
 
    /*
436
 
      The IO thread is using this log file.
437
 
      In this case, we will use the same IO_CACHE pointer to
438
 
      read data as the IO thread is using to write data.
439
 
    */
440
 
    my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
441
 
    if (check_binlog_magic(rli->cur_log,errmsg))
442
 
      goto err;
443
 
    rli->cur_log_old_open_count=rli->relay_log.get_open_count();
444
 
  }
445
 
  else
446
 
  {
447
 
    /*
448
 
      Open the relay log and set rli->cur_log to point at this one
449
 
    */
450
 
    if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
451
 
                                     rli->linfo.log_file_name,errmsg)) < 0)
452
 
      goto err;
453
 
    rli->cur_log = &rli->cache_buf;
454
 
  }
455
 
  /*
456
 
    In all cases, check_binlog_magic() has been called so we're at offset 4 for
457
 
    sure.
458
 
  */
459
 
  if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
460
 
  {
461
 
    Log_event* ev;
462
 
    while (look_for_description_event)
463
 
    {
464
 
      /*
465
 
        Read the possible Format_description_log_event; if position
466
 
        was 4, no need, it will be read naturally.
467
 
      */
468
 
      if (my_b_tell(rli->cur_log) >= pos)
469
 
        break;
470
 
 
471
 
      /*
472
 
        Because of we have rli->data_lock and log_lock, we can safely read an
473
 
        event
474
 
      */
475
 
      if (!(ev=Log_event::read_log_event(rli->cur_log,0,
476
 
                                         rli->relay_log.description_event_for_exec)))
477
 
      {
478
 
        if (rli->cur_log->error) /* not EOF */
479
 
        {
480
 
          *errmsg= "I/O error reading event at position 4";
481
 
          goto err;
482
 
        }
483
 
        break;
484
 
      }
485
 
      else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
486
 
      {
487
 
        delete rli->relay_log.description_event_for_exec;
488
 
        rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
489
 
        /*
490
 
          As ev was returned by read_log_event, it has passed is_valid(), so
491
 
          my_malloc() in ctor worked, no need to check again.
492
 
        */
493
 
        /*
494
 
          Ok, we found a Format_description event. But it is not sure that this
495
 
          describes the whole relay log; indeed, one can have this sequence
496
 
          (starting from position 4):
497
 
          Format_desc (of slave)
498
 
          Rotate (of master)
499
 
          Format_desc (of master)
500
 
          So the Format_desc which really describes the rest of the relay log
501
 
          is the 3rd event (it can't be further than that, because we rotate
502
 
          the relay log when we queue a Rotate event from the master).
503
 
          But what describes the Rotate is the first Format_desc.
504
 
          So what we do is:
505
 
          go on searching for Format_description events, until you exceed the
506
 
          position (argument 'pos') or until you find another event than Rotate
507
 
          or Format_desc.
508
 
        */
509
 
      }
510
 
      else
511
 
      {
512
 
        look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
513
 
        delete ev;
514
 
      }
515
 
    }
516
 
    my_b_seek(rli->cur_log,(off_t)pos);
517
 
 
518
 
  }
519
 
 
520
 
err:
521
 
  /*
522
 
    If we don't purge, we can't honour relay_log_space_limit ;
523
 
    silently discard it
524
 
  */
525
 
  if (!relay_log_purge)
526
 
    rli->log_space_limit= 0;
527
 
  pthread_cond_broadcast(&rli->data_cond);
528
 
 
529
 
  pthread_mutex_unlock(log_lock);
530
 
 
531
 
  if (need_data_lock)
532
 
    pthread_mutex_unlock(&rli->data_lock);
533
 
  if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
534
 
    *errmsg= "Invalid Format_description log event; could be out of memory";
535
 
 
536
 
  return ((*errmsg) ? 1 : 0);
537
 
}
538
 
 
539
 
 
540
 
/*
541
 
  Waits until the SQL thread reaches (has executed up to) the
542
 
  log/position or timed out.
543
 
 
544
 
  SYNOPSIS
545
 
    wait_for_pos()
546
 
    thd             client thread that sent SELECT MASTER_POS_WAIT
547
 
    log_name        log name to wait for
548
 
    log_pos         position to wait for
549
 
    timeout         timeout in seconds before giving up waiting
550
 
 
551
 
  NOTES
552
 
    timeout is int64_t whereas it should be uint32_t ; but this is
553
 
    to catch if the user submitted a negative timeout.
554
 
 
555
 
  RETURN VALUES
556
 
    -2          improper arguments (log_pos<0)
557
 
                or slave not running, or master info changed
558
 
                during the function's execution,
559
 
                or client thread killed. -2 is translated to NULL by caller
560
 
    -1          timed out
561
 
    >=0         number of log events the function had to wait
562
 
                before reaching the desired log/position
563
 
 */
564
 
 
565
 
int32_t Relay_log_info::wait_for_pos(THD* thd, String* log_name,
566
 
                                    int64_t log_pos,
567
 
                                    int64_t timeout)
568
 
{
569
 
  int32_t event_count = 0;
570
 
  uint32_t init_abort_pos_wait;
571
 
  int32_t error=0;
572
 
  struct timespec abstime; // for timeout checking
573
 
  const char *msg;
574
 
 
575
 
  if (!inited)
576
 
    return(-2);
577
 
 
578
 
  set_timespec(abstime,timeout);
579
 
  pthread_mutex_lock(&data_lock);
580
 
  msg= thd->enter_cond(&data_cond, &data_lock,
581
 
                       "Waiting for the slave SQL thread to "
582
 
                       "advance position");
583
 
  /*
584
 
     This function will abort when it notices that some CHANGE MASTER or
585
 
     RESET MASTER has changed the master info.
586
 
     To catch this, these commands modify abort_pos_wait ; We just monitor
587
 
     abort_pos_wait and see if it has changed.
588
 
     Why do we have this mechanism instead of simply monitoring slave_running
589
 
     in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
590
 
     the SQL thread be stopped?
591
 
     This is becasue if someones does:
592
 
     STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
593
 
     the change may happen very quickly and we may not notice that
594
 
     slave_running briefly switches between 1/0/1.
595
 
  */
596
 
  init_abort_pos_wait= abort_pos_wait;
597
 
 
598
 
  /*
599
 
    We'll need to
600
 
    handle all possible log names comparisons (e.g. 999 vs 1000).
601
 
    We use uint32_t for string->number conversion ; this is no
602
 
    stronger limitation than in find_uniq_filename in sql/log.cc
603
 
  */
604
 
  uint32_t log_name_extension;
605
 
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
606
 
 
607
 
  strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), (uint32_t)FN_REFLEN-1));
608
 
 
609
 
  char *p= fn_ext(log_name_tmp);
610
 
  char *p_end;
611
 
  if (!*p || log_pos<0)
612
 
  {
613
 
    error= -2; //means improper arguments
614
 
    goto err;
615
 
  }
616
 
  // Convert 0-3 to 4
617
 
  log_pos= max(log_pos, (int64_t)BIN_LOG_HEADER_SIZE);
618
 
  /* p points to '.' */
619
 
  log_name_extension= strtoul(++p, &p_end, 10);
620
 
  /*
621
 
    p_end points to the first invalid character.
622
 
    If it equals to p, no digits were found, error.
623
 
    If it contains '\0' it means conversion went ok.
624
 
  */
625
 
  if (p_end==p || *p_end)
626
 
  {
627
 
    error= -2;
628
 
    goto err;
629
 
  }
630
 
 
631
 
  /* The "compare and wait" main loop */
632
 
  while (!thd->killed &&
633
 
         init_abort_pos_wait == abort_pos_wait &&
634
 
         slave_running)
635
 
  {
636
 
    bool pos_reached;
637
 
    int32_t cmp_result= 0;
638
 
 
639
 
    /*
640
 
      group_master_log_name can be "", if we are just after a fresh
641
 
      replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
642
 
      (before we have executed one Rotate event from the master) or
643
 
      (rare) if the user is doing a weird slave setup (see next
644
 
      paragraph).  If group_master_log_name is "", we assume we don't
645
 
      have enough info to do the comparison yet, so we just wait until
646
 
      more data. In this case master_log_pos is always 0 except if
647
 
      somebody (wrongly) sets this slave to be a slave of itself
648
 
      without using --replicate-same-server-id (an unsupported
649
 
      configuration which does nothing), then group_master_log_pos
650
 
      will grow and group_master_log_name will stay "".
651
 
    */
652
 
    if (*group_master_log_name)
653
 
    {
654
 
      char *basename= (group_master_log_name +
655
 
                       dirname_length(group_master_log_name));
656
 
      /*
657
 
        First compare the parts before the extension.
658
 
        Find the dot in the master's log basename,
659
 
        and protect against user's input error :
660
 
        if the names do not match up to '.' included, return error
661
 
      */
662
 
      char *q= (char*)(fn_ext(basename)+1);
663
 
      if (strncmp(basename, log_name_tmp, (int32_t)(q-basename)))
664
 
      {
665
 
        error= -2;
666
 
        break;
667
 
      }
668
 
      // Now compare extensions.
669
 
      char *q_end;
670
 
      uint32_t group_master_log_name_extension= strtoul(q, &q_end, 10);
671
 
      if (group_master_log_name_extension < log_name_extension)
672
 
        cmp_result= -1 ;
673
 
      else
674
 
        cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
675
 
 
676
 
      pos_reached= ((!cmp_result && group_master_log_pos >= (uint64_t)log_pos) ||
677
 
                    cmp_result > 0);
678
 
      if (pos_reached || thd->killed)
679
 
        break;
680
 
    }
681
 
 
682
 
    //wait for master update, with optional timeout.
683
 
 
684
 
    /*
685
 
      We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
686
 
      will wake us up.
687
 
    */
688
 
    if (timeout > 0)
689
 
    {
690
 
      /*
691
 
        Note that pthread_cond_timedwait checks for the timeout
692
 
        before for the condition ; i.e. it returns ETIMEDOUT
693
 
        if the system time equals or exceeds the time specified by abstime
694
 
        before the condition variable is signaled or broadcast, _or_ if
695
 
        the absolute time specified by abstime has already passed at the time
696
 
        of the call.
697
 
        For that reason, pthread_cond_timedwait will do the "timeoutting" job
698
 
        even if its condition is always immediately signaled (case of a loaded
699
 
        master).
700
 
      */
701
 
      error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
702
 
    }
703
 
    else
704
 
      pthread_cond_wait(&data_cond, &data_lock);
705
 
    if (error == ETIMEDOUT || error == ETIME)
706
 
    {
707
 
      error= -1;
708
 
      break;
709
 
    }
710
 
    error=0;
711
 
    event_count++;
712
 
  }
713
 
 
714
 
err:
715
 
  thd->exit_cond(msg);
716
 
  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
717
 
      !slave_running)
718
 
  {
719
 
    error= -2;
720
 
  }
721
 
  return( error ? error : event_count );
722
 
}
723
 
 
724
 
 
725
 
void Relay_log_info::inc_group_relay_log_pos(uint64_t log_pos,
726
 
                                                bool skip_lock)
727
 
{
728
 
  if (!skip_lock)
729
 
    pthread_mutex_lock(&data_lock);
730
 
  inc_event_relay_log_pos();
731
 
  group_relay_log_pos= event_relay_log_pos;
732
 
  strmake(group_relay_log_name,event_relay_log_name,
733
 
          sizeof(group_relay_log_name)-1);
734
 
 
735
 
  notify_group_relay_log_name_update();
736
 
 
737
 
  /*
738
 
    If the slave does not support transactions and replicates a transaction,
739
 
    users should not trust group_master_log_pos (which they can display with
740
 
    SHOW SLAVE STATUS or read from relay-log.info), because to compute
741
 
    group_master_log_pos the slave relies on log_pos stored in the master's
742
 
    binlog, but if we are in a master's transaction these positions are always
743
 
    the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
744
 
    not advance as it should on the non-transactional slave (it advances by
745
 
    big leaps, whereas it should advance by small leaps).
746
 
  */
747
 
  /*
748
 
    In 4.x we used the event's len to compute the positions here. This is
749
 
    wrong if the event was 3.23/4.0 and has been converted to 5.0, because
750
 
    then the event's len is not what is was in the master's binlog, so this
751
 
    will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
752
 
    replication: Exec_master_log_pos is wrong). Only way to solve this is to
753
 
    have the original offset of the end of the event the relay log. This is
754
 
    what we do in 5.0: log_pos has become "end_log_pos" (because the real use
755
 
    of log_pos in 4.0 was to compute the end_log_pos; so better to store
756
 
    end_log_pos instead of begin_log_pos.
757
 
    If we had not done this fix here, the problem would also have appeared
758
 
    when the slave and master are 5.0 but with different event length (for
759
 
    example the slave is more recent than the master and features the event
760
 
    UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
761
 
    SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
762
 
    value which would lead to badly broken replication.
763
 
    Even the relay_log_pos will be corrupted in this case, because the len is
764
 
    the relay log is not "val".
765
 
    With the end_log_pos solution, we avoid computations involving lengthes.
766
 
  */
767
 
  if (log_pos) // 3.23 binlogs don't have log_posx
768
 
  {
769
 
    group_master_log_pos= log_pos;
770
 
  }
771
 
  pthread_cond_broadcast(&data_cond);
772
 
  if (!skip_lock)
773
 
    pthread_mutex_unlock(&data_lock);
774
 
  return;
775
 
}
776
 
 
777
 
 
778
 
void Relay_log_info::close_temporary_tables()
779
 
{
780
 
  TABLE *table,*next;
781
 
 
782
 
  for (table=save_temporary_tables ; table ; table=next)
783
 
  {
784
 
    next=table->next;
785
 
    /*
786
 
      Don't ask for disk deletion. For now, anyway they will be deleted when
787
 
      slave restarts, but it is a better intention to not delete them.
788
 
    */
789
 
    close_temporary(table, 1, 0);
790
 
  }
791
 
  save_temporary_tables= 0;
792
 
  slave_open_temp_tables= 0;
793
 
  return;
794
 
}
795
 
 
796
 
/*
797
 
  purge_relay_logs()
798
 
 
799
 
  NOTES
800
 
    Assumes to have a run lock on rli and that no slave thread are running.
801
 
*/
802
 
 
803
 
int32_t purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
804
 
                     const char** errmsg)
805
 
{
806
 
  int32_t error=0;
807
 
 
808
 
  /*
809
 
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
810
 
    Indeed, rli->inited==0 does not imply that they already are empty.
811
 
    It could be that slave's info initialization partly succeeded :
812
 
    for example if relay-log.info existed but *relay-bin*.*
813
 
    have been manually removed, init_relay_log_info reads the old
814
 
    relay-log.info and fills rli->master_log_*, then init_relay_log_info
815
 
    checks for the existence of the relay log, this fails and
816
 
    init_relay_log_info leaves rli->inited to 0.
817
 
    In that pathological case, rli->master_log_pos* will be properly reinited
818
 
    at the next START SLAVE (as RESET SLAVE or CHANGE
819
 
    MASTER, the callers of purge_relay_logs, will delete bogus *.info files
820
 
    or replace them with correct files), however if the user does SHOW SLAVE
821
 
    STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
822
 
    In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
823
 
    to display fine in any case.
824
 
  */
825
 
 
826
 
  rli->group_master_log_name[0]= 0;
827
 
  rli->group_master_log_pos= 0;
828
 
 
829
 
  if (!rli->inited)
830
 
  {
831
 
    return(0);
832
 
  }
833
 
 
834
 
  assert(rli->slave_running == 0);
835
 
  assert(rli->mi->slave_running == 0);
836
 
 
837
 
  rli->slave_skip_counter=0;
838
 
  pthread_mutex_lock(&rli->data_lock);
839
 
 
840
 
  /*
841
 
    we close the relay log fd possibly left open by the slave SQL thread,
842
 
    to be able to delete it; the relay log fd possibly left open by the slave
843
 
    I/O thread will be closed naturally in reset_logs() by the
844
 
    close(LOG_CLOSE_TO_BE_OPENED) call
845
 
  */
846
 
  if (rli->cur_log_fd >= 0)
847
 
  {
848
 
    end_io_cache(&rli->cache_buf);
849
 
    my_close(rli->cur_log_fd, MYF(MY_WME));
850
 
    rli->cur_log_fd= -1;
851
 
  }
852
 
 
853
 
  if (rli->relay_log.reset_logs(thd))
854
 
  {
855
 
    *errmsg = "Failed during log reset";
856
 
    error=1;
857
 
    goto err;
858
 
  }
859
 
  /* Save name of used relay log file */
860
 
  strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
861
 
          sizeof(rli->group_relay_log_name)-1);
862
 
  strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
863
 
          sizeof(rli->event_relay_log_name)-1);
864
 
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
865
 
  if (count_relay_log_space(rli))
866
 
  {
867
 
    *errmsg= "Error counting relay log space";
868
 
    goto err;
869
 
  }
870
 
  if (!just_reset)
871
 
    error= init_relay_log_pos(rli, rli->group_relay_log_name,
872
 
                              rli->group_relay_log_pos,
873
 
                              0 /* do not need data lock */, errmsg, 0);
874
 
 
875
 
err:
876
 
  pthread_mutex_unlock(&rli->data_lock);
877
 
  return(error);
878
 
}
879
 
 
880
 
 
881
 
/*
882
 
     Check if condition stated in UNTIL clause of START SLAVE is reached.
883
 
   SYNOPSYS
884
 
     Relay_log_info::is_until_satisfied()
885
 
     master_beg_pos    position of the beginning of to be executed event
886
 
                       (not log_pos member of the event that points to the
887
 
                        beginning of the following event)
888
 
 
889
 
 
890
 
   DESCRIPTION
891
 
     Checks if UNTIL condition is reached. Uses caching result of last
892
 
     comparison of current log file name and target log file name. So cached
893
 
     value should be invalidated if current log file name changes
894
 
     (see Relay_log_info::notify_... functions).
895
 
 
896
 
     This caching is needed to avoid of expensive string comparisons and
897
 
     strtol() conversions needed for log names comparison. We don't need to
898
 
     compare them each time this function is called, we only need to do this
899
 
     when current log name changes. If we have UNTIL_MASTER_POS condition we
900
 
     need to do this only after Rotate_log_event::do_apply_event() (which is
901
 
     rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
902
 
     condition then we should invalidate cached comarison value after
903
 
     inc_group_relay_log_pos() which called for each group of events (so we
904
 
     have some benefit if we have something like queries that use
905
 
     autoincrement or if we have transactions).
906
 
 
907
 
     Should be called ONLY if until_condition != UNTIL_NONE !
908
 
   RETURN VALUE
909
 
     true - condition met or error happened (condition seems to have
910
 
            bad log file name)
911
 
     false - condition not met
912
 
*/
913
 
 
914
 
bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
915
 
{
916
 
  const char *log_name;
917
 
  uint64_t log_pos;
918
 
 
919
 
  assert(until_condition != UNTIL_NONE);
920
 
 
921
 
  if (until_condition == UNTIL_MASTER_POS)
922
 
  {
923
 
    log_name= group_master_log_name;
924
 
    log_pos= master_beg_pos;
925
 
  }
926
 
  else
927
 
  { /* until_condition == UNTIL_RELAY_POS */
928
 
    log_name= group_relay_log_name;
929
 
    log_pos= group_relay_log_pos;
930
 
  }
931
 
 
932
 
  if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
933
 
  {
934
 
    /*
935
 
      We have no cached comparison results so we should compare log names
936
 
      and cache result.
937
 
      If we are after RESET SLAVE, and the SQL slave thread has not processed
938
 
      any event yet, it could be that group_master_log_name is "". In that case,
939
 
      just wait for more events (as there is no sensible comparison to do).
940
 
    */
941
 
 
942
 
    if (*log_name)
943
 
    {
944
 
      const char *basename= log_name + dirname_length(log_name);
945
 
 
946
 
      const char *q= (const char*)(fn_ext(basename)+1);
947
 
      if (strncmp(basename, until_log_name, (int32_t)(q-basename)) == 0)
948
 
      {
949
 
        /* Now compare extensions. */
950
 
        char *q_end;
951
 
        uint32_t log_name_extension= strtoul(q, &q_end, 10);
952
 
        if (log_name_extension < until_log_name_extension)
953
 
          until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
954
 
        else
955
 
          until_log_names_cmp_result=
956
 
            (log_name_extension > until_log_name_extension) ?
957
 
            UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
958
 
      }
959
 
      else
960
 
      {
961
 
        /* Probably error so we aborting */
962
 
        sql_print_error("Slave SQL thread is stopped because UNTIL "
963
 
                        "condition is bad.");
964
 
        return(true);
965
 
      }
966
 
    }
967
 
    else
968
 
      return(until_log_pos == 0);
969
 
  }
970
 
 
971
 
  return(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
972
 
           log_pos >= until_log_pos) ||
973
 
          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
974
 
}
975
 
 
976
 
 
977
 
void Relay_log_info::cached_charset_invalidate()
978
 
{
979
 
  /* Full of zeroes means uninitialized. */
980
 
  memset(cached_charset, 0, sizeof(cached_charset));
981
 
  return;
982
 
}
983
 
 
984
 
 
985
 
bool Relay_log_info::cached_charset_compare(char *charset) const
986
 
{
987
 
  if (memcmp(cached_charset, charset, sizeof(cached_charset)))
988
 
  {
989
 
    memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
990
 
    return(1);
991
 
  }
992
 
  return(0);
993
 
}
994
 
 
995
 
 
996
 
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
997
 
                                  time_t event_creation_time)
998
 
{
999
 
  extern uint32_t debug_not_change_ts_if_art_event;
1000
 
  clear_flag(IN_STMT);
1001
 
 
1002
 
  /*
1003
 
    If in a transaction, and if the slave supports transactions, just
1004
 
    inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
1005
 
    (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
1006
 
    BEGIN/COMMIT, not with SET AUTOCOMMIT= .
1007
 
 
1008
 
    CAUTION: opt_using_transactions means innodb || bdb ; suppose the
1009
 
    master supports InnoDB and BDB, but the slave supports only BDB,
1010
 
    problems will arise: - suppose an InnoDB table is created on the
1011
 
    master, - then it will be MyISAM on the slave - but as
1012
 
    opt_using_transactions is true, the slave will believe he is
1013
 
    transactional with the MyISAM table. And problems will come when
1014
 
    one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
1015
 
    resume at BEGIN whereas there has not been any rollback).  This is
1016
 
    the problem of using opt_using_transactions instead of a finer
1017
 
    "does the slave support _transactional handler used on the
1018
 
    master_".
1019
 
 
1020
 
    More generally, we'll have problems when a query mixes a
1021
 
    transactional handler and MyISAM and STOP SLAVE is issued in the
1022
 
    middle of the "transaction". START SLAVE will resume at BEGIN
1023
 
    while the MyISAM table has already been updated.
1024
 
  */
1025
 
  if ((sql_thd->options & OPTION_BEGIN) && opt_using_transactions)
1026
 
    inc_event_relay_log_pos();
1027
 
  else
1028
 
  {
1029
 
    inc_group_relay_log_pos(event_master_log_pos);
1030
 
    flush_relay_log_info(this);
1031
 
    /*
1032
 
      Note that Rotate_log_event::do_apply_event() does not call this
1033
 
      function, so there is no chance that a fake rotate event resets
1034
 
      last_master_timestamp.  Note that we update without mutex
1035
 
      (probably ok - except in some very rare cases, only consequence
1036
 
      is that value may take some time to display in
1037
 
      Seconds_Behind_Master - not critical).
1038
 
    */
1039
 
    if (!(event_creation_time == 0 && debug_not_change_ts_if_art_event > 0))
1040
 
      last_master_timestamp= event_creation_time;
1041
 
  }
1042
 
}
1043
 
 
1044
 
#if !defined(DRIZZLE_CLIENT) && defined(HAVE_REPLICATION)
1045
 
void Relay_log_info::cleanup_context(THD *thd, bool error)
1046
 
{
1047
 
  assert(sql_thd == thd);
1048
 
  /*
1049
 
    1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
1050
 
    may have opened tables, which we cannot be sure have been closed (because
1051
 
    maybe the Rows_log_event have not been found or will not be, because slave
1052
 
    SQL thread is stopping, or relay log has a missing tail etc). So we close
1053
 
    all thread's tables. And so the table mappings have to be cancelled.
1054
 
    2) Rows_log_event::do_apply_event() may even have started statements or
1055
 
    transactions on them, which we need to rollback in case of error.
1056
 
    3) If finding a Format_description_log_event after a BEGIN, we also need
1057
 
    to rollback before continuing with the next events.
1058
 
    4) so we need this "context cleanup" function.
1059
 
  */
1060
 
  if (error)
1061
 
  {
1062
 
    ha_autocommit_or_rollback(thd, 1); // if a "statement transaction"
1063
 
    end_trans(thd, ROLLBACK); // if a "real transaction"
1064
 
  }
1065
 
  m_table_map.clear_tables();
1066
 
  close_thread_tables(thd);
1067
 
  clear_tables_to_lock();
1068
 
  clear_flag(IN_STMT);
1069
 
  /*
1070
 
    Cleanup for the flags that have been set at do_apply_event.
1071
 
  */
1072
 
  thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1073
 
  thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1074
 
  last_event_start_time= 0;
1075
 
  return;
1076
 
}
1077
 
 
1078
 
void Relay_log_info::clear_tables_to_lock()
1079
 
{
1080
 
  while (tables_to_lock)
1081
 
  {
1082
 
    uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
1083
 
    if (tables_to_lock->m_tabledef_valid)
1084
 
    {
1085
 
      tables_to_lock->m_tabledef.table_def::~table_def();
1086
 
      tables_to_lock->m_tabledef_valid= false;
1087
 
    }
1088
 
    tables_to_lock=
1089
 
      static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
1090
 
    tables_to_lock_count--;
1091
 
    my_free(to_free, MYF(MY_WME));
1092
 
  }
1093
 
  assert(tables_to_lock == NULL && tables_to_lock_count == 0);
1094
 
}
1095
 
 
1096
 
#endif