~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to sql/rpl_rli.cc

  • Committer: Monty Taylor
  • Date: 2008-07-05 18:10:38 UTC
  • mto: This revision was merged to the branch mainline in revision 63.
  • Revision ID: monty@inaugust.com-20080705181038-0ih0nnamu5qrut0y
Fixed prototypes. Cleaned define a little bit.

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
   along with this program; if not, write to the Free Software
14
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
 
#include <drizzled/server_includes.h>
17
 
 
18
 
#if TIME_WITH_SYS_TIME
19
 
# include <sys/time.h>
20
 
# include <time.h>
21
 
#else
22
 
# if HAVE_SYS_TIME_H
23
 
#  include <sys/time.h>
24
 
# else
25
 
#  include <time.h>
26
 
# endif
27
 
#endif
28
 
 
 
16
#include "mysql_priv.h"
29
17
 
30
18
#include "rpl_mi.h"
31
19
#include "rpl_rli.h"
 
20
#include <my_dir.h>
32
21
#include "sql_repl.h"  // For check_binlog_magic
33
22
#include "rpl_utility.h"
34
23
 
35
 
#include <libdrizzle/gettext.h>
36
 
 
37
 
static int32_t count_relay_log_space(Relay_log_info* rli);
 
24
static int count_relay_log_space(Relay_log_info* rli);
38
25
 
39
26
// Defined in slave.cc
40
 
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val);
41
 
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
 
27
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
 
28
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
42
29
                          const char *default_val);
43
30
 
44
31
 
45
32
Relay_log_info::Relay_log_info()
46
33
  :Slave_reporting_capability("SQL"),
47
 
   no_storage(false), replicate_same_server_id(::replicate_same_server_id),
 
34
   no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id),
48
35
   info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
49
 
#if defined(HAVE_purify) && HAVE_purify
50
 
   is_fake(false),
 
36
#if HAVE_purify
 
37
   is_fake(FALSE),
51
38
#endif
52
39
   cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
53
40
   group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
58
45
   tables_to_lock(0), tables_to_lock_count(0),
59
46
   last_event_start_time(0), m_flags(0)
60
47
{
 
48
  DBUG_ENTER("Relay_log_info::Relay_log_info");
 
49
 
61
50
  group_relay_log_name[0]= event_relay_log_name[0]=
62
51
    group_master_log_name[0]= 0;
63
52
  until_log_name[0]= ign_master_log_name_end[0]= 0;
64
 
  memset(&info_file, 0, sizeof(info_file));
65
 
  memset(&cache_buf, 0, sizeof(cache_buf));
 
53
  bzero((char*) &info_file, sizeof(info_file));
 
54
  bzero((char*) &cache_buf, sizeof(cache_buf));
 
55
  cached_charset_invalidate();
66
56
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
67
57
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
68
58
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
71
61
  pthread_cond_init(&stop_cond, NULL);
72
62
  pthread_cond_init(&log_space_cond, NULL);
73
63
  relay_log.init_pthread_objects();
74
 
  return;
 
64
  DBUG_VOID_RETURN;
75
65
}
76
66
 
77
67
 
78
68
Relay_log_info::~Relay_log_info()
79
69
{
 
70
  DBUG_ENTER("Relay_log_info::~Relay_log_info");
 
71
 
80
72
  pthread_mutex_destroy(&run_lock);
81
73
  pthread_mutex_destroy(&data_lock);
82
74
  pthread_mutex_destroy(&log_space_lock);
85
77
  pthread_cond_destroy(&stop_cond);
86
78
  pthread_cond_destroy(&log_space_cond);
87
79
  relay_log.cleanup();
88
 
  return;
 
80
  DBUG_VOID_RETURN;
89
81
}
90
82
 
91
83
 
92
 
int32_t init_relay_log_info(Relay_log_info* rli,
 
84
int init_relay_log_info(Relay_log_info* rli,
93
85
                        const char* info_fname)
94
86
{
95
87
  char fname[FN_REFLEN+128];
96
 
  int32_t info_fd;
 
88
  int info_fd;
97
89
  const char* msg = 0;
98
 
  int32_t error = 0;
99
 
  assert(!rli->no_storage);         // Don't init if there is no storage
 
90
  int error = 0;
 
91
  DBUG_ENTER("init_relay_log_info");
 
92
  DBUG_ASSERT(!rli->no_storage);         // Don't init if there is no storage
100
93
 
101
94
  if (rli->inited)                       // Set if this function called
102
 
    return(0);
 
95
    DBUG_RETURN(0);
103
96
  fn_format(fname, info_fname, mysql_data_home, "", 4+32);
104
97
  pthread_mutex_lock(&rli->data_lock);
105
98
  info_fd = rli->info_fd;
142
135
        instead require a name. But as we don't want to break many existing
143
136
        setups, we only give warning, not error.
144
137
      */
145
 
      sql_print_warning(_("Neither --relay-log nor --relay-log-index were used;"
 
138
      sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
146
139
                        " so replication "
147
140
                        "may break when this MySQL server acts as a "
148
141
                        "slave and has his hostname changed!! Please "
149
 
                        "use '--relay-log=%s' to avoid this problem."), ln);
 
142
                        "use '--relay-log=%s' to avoid this problem.", ln);
150
143
      name_warning_sent= 1;
151
144
    }
152
145
    /*
159
152
                            max_binlog_size), 1))
160
153
    {
161
154
      pthread_mutex_unlock(&rli->data_lock);
162
 
      sql_print_error(_("Failed in open_log() called from "
163
 
                        "init_relay_log_info()"));
164
 
      return(1);
 
155
      sql_print_error("Failed in open_log() called from init_relay_log_info()");
 
156
      DBUG_RETURN(1);
165
157
    }
166
158
  }
167
159
 
168
160
  /* if file does not exist */
169
161
  if (access(fname,F_OK))
170
162
  {
171
 
    /* Create a new file */
 
163
    /*
 
164
      If someone removed the file from underneath our feet, just close
 
165
      the old descriptor and re-create the old file
 
166
    */
 
167
    if (info_fd >= 0)
 
168
      my_close(info_fd, MYF(MY_WME));
 
169
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
 
170
    {
 
171
      sql_print_error("Failed to create a new relay log info file (\
 
172
file '%s', errno %d)", fname, my_errno);
 
173
      msg= current_thd->main_da.message();
 
174
      goto err;
 
175
    }
 
176
    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
 
177
                      MYF(MY_WME)))
 
178
    {
 
179
      sql_print_error("Failed to create a cache on relay log info file '%s'",
 
180
                      fname);
 
181
      msg= current_thd->main_da.message();
 
182
      goto err;
 
183
    }
 
184
 
 
185
    /* Init relay log with first entry in the relay index file */
 
186
    if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
 
187
                           &msg, 0))
 
188
    {
 
189
      sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
 
190
      goto err;
 
191
    }
 
192
    rli->group_master_log_name[0]= 0;
 
193
    rli->group_master_log_pos= 0;
 
194
    rli->info_fd= info_fd;
172
195
  }
173
196
  else // file exists
174
197
  {
175
 
    /* Open up fname here and pull out the relay.info data */
176
 
  }
 
198
    if (info_fd >= 0)
 
199
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
 
200
    else
 
201
    {
 
202
      int error=0;
 
203
      if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
 
204
      {
 
205
        sql_print_error("\
 
206
Failed to open the existing relay log info file '%s' (errno %d)",
 
207
                        fname, my_errno);
 
208
        error= 1;
 
209
      }
 
210
      else if (init_io_cache(&rli->info_file, info_fd,
 
211
                             IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
 
212
      {
 
213
        sql_print_error("Failed to create a cache on relay log info file '%s'",
 
214
                        fname);
 
215
        error= 1;
 
216
      }
 
217
      if (error)
 
218
      {
 
219
        if (info_fd >= 0)
 
220
          my_close(info_fd, MYF(0));
 
221
        rli->info_fd= -1;
 
222
        rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
223
        pthread_mutex_unlock(&rli->data_lock);
 
224
        DBUG_RETURN(1);
 
225
      }
 
226
    }
 
227
 
 
228
    rli->info_fd = info_fd;
 
229
    int relay_log_pos, master_log_pos;
 
230
    if (init_strvar_from_file(rli->group_relay_log_name,
 
231
                              sizeof(rli->group_relay_log_name),
 
232
                              &rli->info_file, "") ||
 
233
       init_intvar_from_file(&relay_log_pos,
 
234
                             &rli->info_file, BIN_LOG_HEADER_SIZE) ||
 
235
       init_strvar_from_file(rli->group_master_log_name,
 
236
                             sizeof(rli->group_master_log_name),
 
237
                             &rli->info_file, "") ||
 
238
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
 
239
    {
 
240
      msg="Error reading slave log configuration";
 
241
      goto err;
 
242
    }
 
243
    strmake(rli->event_relay_log_name,rli->group_relay_log_name,
 
244
            sizeof(rli->event_relay_log_name)-1);
 
245
    rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
 
246
    rli->group_master_log_pos= master_log_pos;
 
247
 
 
248
    if (init_relay_log_pos(rli,
 
249
                           rli->group_relay_log_name,
 
250
                           rli->group_relay_log_pos,
 
251
                           0 /* no data lock*/,
 
252
                           &msg, 0))
 
253
    {
 
254
      char llbuf[22];
 
255
      sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
 
256
                      rli->group_relay_log_name,
 
257
                      llstr(rli->group_relay_log_pos, llbuf));
 
258
      goto err;
 
259
    }
 
260
  }
 
261
 
 
262
#ifndef DBUG_OFF
 
263
  {
 
264
    char llbuf1[22], llbuf2[22];
 
265
    DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
 
266
                        llstr(my_b_tell(rli->cur_log),llbuf1),
 
267
                        llstr(rli->event_relay_log_pos,llbuf2)));
 
268
    DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
 
269
    DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
 
270
  }
 
271
#endif
177
272
 
178
273
  /*
179
274
    Now change the cache from READ to WRITE - must do this
181
276
  */
182
277
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
183
278
  if ((error= flush_relay_log_info(rli)))
184
 
    sql_print_error(_("Failed to flush relay log info file"));
 
279
    sql_print_error("Failed to flush relay log info file");
185
280
  if (count_relay_log_space(rli))
186
281
  {
187
 
    msg=_("Error counting relay log space");
 
282
    msg="Error counting relay log space";
188
283
    goto err;
189
284
  }
190
285
  rli->inited= 1;
191
286
  pthread_mutex_unlock(&rli->data_lock);
192
 
  return(error);
 
287
  DBUG_RETURN(error);
193
288
 
194
289
err:
195
290
  sql_print_error(msg);
199
294
  rli->info_fd= -1;
200
295
  rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
201
296
  pthread_mutex_unlock(&rli->data_lock);
202
 
  return(1);
 
297
  DBUG_RETURN(1);
203
298
}
204
299
 
205
300
 
206
 
static inline int32_t add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
 
301
static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
207
302
{
208
303
  struct stat s;
 
304
  DBUG_ENTER("add_relay_log");
209
305
  if (stat(linfo->log_file_name,&s))
210
306
  {
211
 
    sql_print_error(_("log %s listed in the index, but failed to stat"),
 
307
    sql_print_error("log %s listed in the index, but failed to stat",
212
308
                    linfo->log_file_name);
213
 
    return(1);
 
309
    DBUG_RETURN(1);
214
310
  }
215
311
  rli->log_space_total += s.st_size;
216
 
  return(0);
 
312
#ifndef DBUG_OFF
 
313
  char buf[22];
 
314
  DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
 
315
#endif
 
316
  DBUG_RETURN(0);
217
317
}
218
318
 
219
319
 
220
 
static int32_t count_relay_log_space(Relay_log_info* rli)
 
320
static int count_relay_log_space(Relay_log_info* rli)
221
321
{
222
322
  LOG_INFO linfo;
 
323
  DBUG_ENTER("count_relay_log_space");
223
324
  rli->log_space_total= 0;
224
 
  if (rli->relay_log.find_log_pos(&linfo, NULL, 1))
 
325
  if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
225
326
  {
226
 
    sql_print_error(_("Could not find first log while counting relay "
227
 
                      "log space"));
228
 
    return(1);
 
327
    sql_print_error("Could not find first log while counting relay log space");
 
328
    DBUG_RETURN(1);
229
329
  }
230
330
  do
231
331
  {
232
332
    if (add_relay_log(rli,&linfo))
233
 
      return(1);
 
333
      DBUG_RETURN(1);
234
334
  } while (!rli->relay_log.find_next_log(&linfo, 1));
235
335
  /*
236
336
     As we have counted everything, including what may have written in a
238
338
     twice.
239
339
  */
240
340
  rli->relay_log.reset_bytes_written();
241
 
  return(0);
 
341
  DBUG_RETURN(0);
242
342
}
243
343
 
244
344
 
252
352
 
253
353
void Relay_log_info::clear_until_condition()
254
354
{
 
355
  DBUG_ENTER("clear_until_condition");
 
356
 
255
357
  until_condition= Relay_log_info::UNTIL_NONE;
256
358
  until_log_name[0]= 0;
257
359
  until_log_pos= 0;
258
 
  return;
 
360
  DBUG_VOID_RETURN;
259
361
}
260
362
 
261
363
 
291
393
    1   error.  errmsg is set to point to the error message
292
394
*/
293
395
 
294
 
int32_t init_relay_log_pos(Relay_log_info* rli,const char* log,
295
 
                       uint64_t pos, bool need_data_lock,
 
396
int init_relay_log_pos(Relay_log_info* rli,const char* log,
 
397
                       ulonglong pos, bool need_data_lock,
296
398
                       const char** errmsg,
297
399
                       bool look_for_description_event)
298
400
{
 
401
  DBUG_ENTER("init_relay_log_pos");
 
402
  DBUG_PRINT("info", ("pos: %lu", (ulong) pos));
 
403
 
299
404
  *errmsg=0;
300
405
  pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
301
406
 
336
441
    Test to see if the previous run was with the skip of purging
337
442
    If yes, we do not purge when we restart
338
443
  */
339
 
  if (rli->relay_log.find_log_pos(&rli->linfo, NULL, 1))
 
444
  if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
340
445
  {
341
446
    *errmsg="Could not find first log during relay log initialization";
342
447
    goto err;
347
452
    *errmsg="Could not find target log during relay log initialization";
348
453
    goto err;
349
454
  }
350
 
 
351
 
  rli->group_relay_log_name.assign(rli->linfo.log_file_name);
352
 
  rli->event_relay_log_name.assign(rli->linfo.log_file_name);
353
 
 
 
455
  strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
 
456
          sizeof(rli->group_relay_log_name)-1);
 
457
  strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
 
458
          sizeof(rli->event_relay_log_name)-1);
354
459
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
355
460
  {
356
461
    /*
386
491
        Read the possible Format_description_log_event; if position
387
492
        was 4, no need, it will be read naturally.
388
493
      */
 
494
      DBUG_PRINT("info",("looking for a Format_description_log_event"));
 
495
 
389
496
      if (my_b_tell(rli->cur_log) >= pos)
390
497
        break;
391
498
 
396
503
      if (!(ev=Log_event::read_log_event(rli->cur_log,0,
397
504
                                         rli->relay_log.description_event_for_exec)))
398
505
      {
 
506
        DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d",
 
507
                           rli->cur_log->error));
399
508
        if (rli->cur_log->error) /* not EOF */
400
509
        {
401
510
          *errmsg= "I/O error reading event at position 4";
405
514
      }
406
515
      else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
407
516
      {
 
517
        DBUG_PRINT("info",("found Format_description_log_event"));
408
518
        delete rli->relay_log.description_event_for_exec;
409
519
        rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
410
520
        /*
430
540
      }
431
541
      else
432
542
      {
 
543
        DBUG_PRINT("info",("found event of another type=%d",
 
544
                           ev->get_type_code()));
433
545
        look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
434
546
        delete ev;
435
547
      }
436
548
    }
437
549
    my_b_seek(rli->cur_log,(off_t)pos);
 
550
#ifndef DBUG_OFF
 
551
  {
 
552
    char llbuf1[22], llbuf2[22];
 
553
    DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
 
554
                        llstr(my_b_tell(rli->cur_log),llbuf1),
 
555
                        llstr(rli->event_relay_log_pos,llbuf2)));
 
556
  }
 
557
#endif
438
558
 
439
559
  }
440
560
 
454
574
  if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
455
575
    *errmsg= "Invalid Format_description log event; could be out of memory";
456
576
 
457
 
  return ((*errmsg) ? 1 : 0);
 
577
  DBUG_RETURN ((*errmsg) ? 1 : 0);
458
578
}
459
579
 
460
580
 
470
590
    timeout         timeout in seconds before giving up waiting
471
591
 
472
592
  NOTES
473
 
    timeout is int64_t whereas it should be uint32_t ; but this is
 
593
    timeout is longlong whereas it should be ulong ; but this is
474
594
    to catch if the user submitted a negative timeout.
475
595
 
476
596
  RETURN VALUES
483
603
                before reaching the desired log/position
484
604
 */
485
605
 
486
 
int32_t Relay_log_info::wait_for_pos(THD* thd, String* log_name,
487
 
                                    int64_t log_pos,
488
 
                                    int64_t timeout)
 
606
int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
 
607
                                    longlong log_pos,
 
608
                                    longlong timeout)
489
609
{
490
 
  int32_t event_count = 0;
491
 
  uint32_t init_abort_pos_wait;
492
 
  int32_t error=0;
 
610
  int event_count = 0;
 
611
  ulong init_abort_pos_wait;
 
612
  int error=0;
493
613
  struct timespec abstime; // for timeout checking
494
614
  const char *msg;
 
615
  DBUG_ENTER("Relay_log_info::wait_for_pos");
495
616
 
496
617
  if (!inited)
497
 
    return(-2);
 
618
    DBUG_RETURN(-2);
 
619
 
 
620
  DBUG_PRINT("enter",("log_name: '%s'  log_pos: %lu  timeout: %lu",
 
621
                      log_name->c_ptr(), (ulong) log_pos, (ulong) timeout));
498
622
 
499
623
  set_timespec(abstime,timeout);
500
624
  pthread_mutex_lock(&data_lock);
519
643
  /*
520
644
    We'll need to
521
645
    handle all possible log names comparisons (e.g. 999 vs 1000).
522
 
    We use uint32_t for string->number conversion ; this is no
 
646
    We use ulong for string->number conversion ; this is no
523
647
    stronger limitation than in find_uniq_filename in sql/log.cc
524
648
  */
525
 
  uint32_t log_name_extension;
 
649
  ulong log_name_extension;
526
650
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
527
651
 
528
 
  strmake(log_name_tmp, log_name->ptr(), cmin(log_name->length(), (uint32_t)FN_REFLEN-1));
 
652
  strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), FN_REFLEN-1));
529
653
 
530
654
  char *p= fn_ext(log_name_tmp);
531
655
  char *p_end;
535
659
    goto err;
536
660
  }
537
661
  // Convert 0-3 to 4
538
 
  log_pos= cmax(log_pos, (int64_t)BIN_LOG_HEADER_SIZE);
 
662
  log_pos= max(log_pos, BIN_LOG_HEADER_SIZE);
539
663
  /* p points to '.' */
540
664
  log_name_extension= strtoul(++p, &p_end, 10);
541
665
  /*
555
679
         slave_running)
556
680
  {
557
681
    bool pos_reached;
558
 
    int32_t cmp_result= 0;
 
682
    int cmp_result= 0;
 
683
 
 
684
    DBUG_PRINT("info",
 
685
               ("init_abort_pos_wait: %ld  abort_pos_wait: %ld",
 
686
                init_abort_pos_wait, abort_pos_wait));
 
687
    DBUG_PRINT("info",("group_master_log_name: '%s'  pos: %lu",
 
688
                       group_master_log_name, (ulong) group_master_log_pos));
559
689
 
560
690
    /*
561
691
      group_master_log_name can be "", if we are just after a fresh
570
700
      configuration which does nothing), then group_master_log_pos
571
701
      will grow and group_master_log_name will stay "".
572
702
    */
573
 
    if (group_master_log_name.length())
 
703
    if (*group_master_log_name)
574
704
    {
575
 
      const char *basename= (group_master_log_name.c_str() +
576
 
                             dirname_length(group_master_log_name.c_str()));
 
705
      char *basename= (group_master_log_name +
 
706
                       dirname_length(group_master_log_name));
577
707
      /*
578
708
        First compare the parts before the extension.
579
709
        Find the dot in the master's log basename,
581
711
        if the names do not match up to '.' included, return error
582
712
      */
583
713
      char *q= (char*)(fn_ext(basename)+1);
584
 
      if (strncmp(basename, log_name_tmp, (int32_t)(q-basename)))
 
714
      if (strncmp(basename, log_name_tmp, (int)(q-basename)))
585
715
      {
586
716
        error= -2;
587
717
        break;
588
718
      }
589
719
      // Now compare extensions.
590
720
      char *q_end;
591
 
      uint32_t group_master_log_name_extension= strtoul(q, &q_end, 10);
 
721
      ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
592
722
      if (group_master_log_name_extension < log_name_extension)
593
723
        cmp_result= -1 ;
594
724
      else
595
725
        cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
596
726
 
597
 
      pos_reached= ((!cmp_result && group_master_log_pos >= (uint64_t)log_pos) ||
 
727
      pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
598
728
                    cmp_result > 0);
599
729
      if (pos_reached || thd->killed)
600
730
        break;
602
732
 
603
733
    //wait for master update, with optional timeout.
604
734
 
 
735
    DBUG_PRINT("info",("Waiting for master update"));
605
736
    /*
606
737
      We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
607
738
      will wake us up.
623
754
    }
624
755
    else
625
756
      pthread_cond_wait(&data_cond, &data_lock);
 
757
    DBUG_PRINT("info",("Got signal of master update or timed out"));
626
758
    if (error == ETIMEDOUT || error == ETIME)
627
759
    {
628
760
      error= -1;
630
762
    }
631
763
    error=0;
632
764
    event_count++;
 
765
    DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
633
766
  }
634
767
 
635
768
err:
636
769
  thd->exit_cond(msg);
 
770
  DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d \
 
771
improper_arguments: %d  timed_out: %d",
 
772
                     thd->killed_errno(),
 
773
                     (int) (init_abort_pos_wait != abort_pos_wait),
 
774
                     (int) slave_running,
 
775
                     (int) (error == -2),
 
776
                     (int) (error == -1)));
637
777
  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
638
778
      !slave_running)
639
779
  {
640
780
    error= -2;
641
781
  }
642
 
  return( error ? error : event_count );
 
782
  DBUG_RETURN( error ? error : event_count );
643
783
}
644
784
 
645
785
 
646
 
void Relay_log_info::inc_group_relay_log_pos(uint64_t log_pos,
 
786
void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
647
787
                                                bool skip_lock)
648
788
{
 
789
  DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
 
790
 
649
791
  if (!skip_lock)
650
792
    pthread_mutex_lock(&data_lock);
651
793
  inc_event_relay_log_pos();
652
794
  group_relay_log_pos= event_relay_log_pos;
653
 
  group_relay_log_name.assign(event_relay_log_name);
 
795
  strmake(group_relay_log_name,event_relay_log_name,
 
796
          sizeof(group_relay_log_name)-1);
654
797
 
655
798
  notify_group_relay_log_name_update();
656
799
 
684
827
    the relay log is not "val".
685
828
    With the end_log_pos solution, we avoid computations involving lengthes.
686
829
  */
 
830
  DBUG_PRINT("info", ("log_pos: %lu  group_master_log_pos: %lu",
 
831
                      (long) log_pos, (long) group_master_log_pos));
687
832
  if (log_pos) // 3.23 binlogs don't have log_posx
688
833
  {
689
834
    group_master_log_pos= log_pos;
691
836
  pthread_cond_broadcast(&data_cond);
692
837
  if (!skip_lock)
693
838
    pthread_mutex_unlock(&data_lock);
694
 
  return;
 
839
  DBUG_VOID_RETURN;
695
840
}
696
841
 
697
842
 
698
843
void Relay_log_info::close_temporary_tables()
699
844
{
700
 
  Table *table,*next;
 
845
  TABLE *table,*next;
 
846
  DBUG_ENTER("Relay_log_info::close_temporary_tables");
701
847
 
702
848
  for (table=save_temporary_tables ; table ; table=next)
703
849
  {
706
852
      Don't ask for disk deletion. For now, anyway they will be deleted when
707
853
      slave restarts, but it is a better intention to not delete them.
708
854
    */
 
855
    DBUG_PRINT("info", ("table: 0x%lx", (long) table));
709
856
    close_temporary(table, 1, 0);
710
857
  }
711
858
  save_temporary_tables= 0;
712
859
  slave_open_temp_tables= 0;
713
 
  return;
 
860
  DBUG_VOID_RETURN;
714
861
}
715
862
 
716
863
/*
720
867
    Assumes to have a run lock on rli and that no slave thread are running.
721
868
*/
722
869
 
723
 
int32_t purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
 
870
int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
724
871
                     const char** errmsg)
725
872
{
726
 
  int32_t error=0;
 
873
  int error=0;
 
874
  DBUG_ENTER("purge_relay_logs");
727
875
 
728
876
  /*
729
877
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
748
896
 
749
897
  if (!rli->inited)
750
898
  {
751
 
    return(0);
 
899
    DBUG_PRINT("info", ("rli->inited == 0"));
 
900
    DBUG_RETURN(0);
752
901
  }
753
902
 
754
 
  assert(rli->slave_running == 0);
755
 
  assert(rli->mi->slave_running == 0);
 
903
  DBUG_ASSERT(rli->slave_running == 0);
 
904
  DBUG_ASSERT(rli->mi->slave_running == 0);
756
905
 
757
906
  rli->slave_skip_counter=0;
758
907
  pthread_mutex_lock(&rli->data_lock);
777
926
    goto err;
778
927
  }
779
928
  /* Save name of used relay log file */
780
 
  rli->group_relay_log_name.assign(rli->relay_log.get_log_fname());
781
 
  rli->event_relay_log_name.assign(rli->relay_log.get_log_fname());
 
929
  strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
 
930
          sizeof(rli->group_relay_log_name)-1);
 
931
  strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
 
932
          sizeof(rli->event_relay_log_name)-1);
782
933
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
783
934
  if (count_relay_log_space(rli))
784
935
  {
786
937
    goto err;
787
938
  }
788
939
  if (!just_reset)
789
 
    error= init_relay_log_pos(rli, rli->group_relay_log_name.c_str(),
 
940
    error= init_relay_log_pos(rli, rli->group_relay_log_name,
790
941
                              rli->group_relay_log_pos,
791
942
                              0 /* do not need data lock */, errmsg, 0);
792
943
 
793
944
err:
 
945
#ifndef DBUG_OFF
 
946
  char buf[22];
 
947
#endif
 
948
  DBUG_PRINT("info",("log_space_total: %s",llstr(rli->log_space_total,buf)));
794
949
  pthread_mutex_unlock(&rli->data_lock);
795
 
  return(error);
 
950
  DBUG_RETURN(error);
796
951
}
797
952
 
798
953
 
832
987
bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
833
988
{
834
989
  const char *log_name;
835
 
  uint64_t log_pos;
 
990
  ulonglong log_pos;
 
991
  DBUG_ENTER("Relay_log_info::is_until_satisfied");
836
992
 
837
 
  assert(until_condition != UNTIL_NONE);
 
993
  DBUG_ASSERT(until_condition != UNTIL_NONE);
838
994
 
839
995
  if (until_condition == UNTIL_MASTER_POS)
840
996
  {
841
 
    log_name= group_master_log_name.c_str();
 
997
    log_name= group_master_log_name;
842
998
    log_pos= master_beg_pos;
843
999
  }
844
1000
  else
845
1001
  { /* until_condition == UNTIL_RELAY_POS */
846
 
    log_name= group_relay_log_name.c_str();
 
1002
    log_name= group_relay_log_name;
847
1003
    log_pos= group_relay_log_pos;
848
1004
  }
849
1005
 
 
1006
#ifndef DBUG_OFF
 
1007
  {
 
1008
    char buf[32];
 
1009
    DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
 
1010
                        group_master_log_name, llstr(group_master_log_pos, buf)));
 
1011
    DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
 
1012
                        group_relay_log_name, llstr(group_relay_log_pos, buf)));
 
1013
    DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
 
1014
                        until_condition == UNTIL_MASTER_POS ? "master" : "relay",
 
1015
                        log_name, llstr(log_pos, buf)));
 
1016
    DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
 
1017
                        until_condition == UNTIL_MASTER_POS ? "master" : "relay",
 
1018
                        until_log_name, llstr(until_log_pos, buf)));
 
1019
  }
 
1020
#endif
 
1021
 
850
1022
  if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
851
1023
  {
852
1024
    /*
862
1034
      const char *basename= log_name + dirname_length(log_name);
863
1035
 
864
1036
      const char *q= (const char*)(fn_ext(basename)+1);
865
 
      if (strncmp(basename, until_log_name, (int32_t)(q-basename)) == 0)
 
1037
      if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
866
1038
      {
867
1039
        /* Now compare extensions. */
868
1040
        char *q_end;
869
 
        uint32_t log_name_extension= strtoul(q, &q_end, 10);
 
1041
        ulong log_name_extension= strtoul(q, &q_end, 10);
870
1042
        if (log_name_extension < until_log_name_extension)
871
1043
          until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
872
1044
        else
877
1049
      else
878
1050
      {
879
1051
        /* Probably error so we aborting */
880
 
        sql_print_error(_("Slave SQL thread is stopped because UNTIL "
881
 
                          "condition is bad."));
882
 
        return(true);
 
1052
        sql_print_error("Slave SQL thread is stopped because UNTIL "
 
1053
                        "condition is bad.");
 
1054
        DBUG_RETURN(TRUE);
883
1055
      }
884
1056
    }
885
1057
    else
886
 
      return(until_log_pos == 0);
 
1058
      DBUG_RETURN(until_log_pos == 0);
887
1059
  }
888
1060
 
889
 
  return(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
 
1061
  DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
890
1062
           log_pos >= until_log_pos) ||
891
1063
          until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
892
1064
}
893
1065
 
894
1066
 
 
1067
void Relay_log_info::cached_charset_invalidate()
 
1068
{
 
1069
  DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
 
1070
 
 
1071
  /* Full of zeroes means uninitialized. */
 
1072
  bzero(cached_charset, sizeof(cached_charset));
 
1073
  DBUG_VOID_RETURN;
 
1074
}
 
1075
 
 
1076
 
 
1077
bool Relay_log_info::cached_charset_compare(char *charset) const
 
1078
{
 
1079
  DBUG_ENTER("Relay_log_info::cached_charset_compare");
 
1080
 
 
1081
  if (bcmp((uchar*) cached_charset, (uchar*) charset,
 
1082
           sizeof(cached_charset)))
 
1083
  {
 
1084
    memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
 
1085
    DBUG_RETURN(1);
 
1086
  }
 
1087
  DBUG_RETURN(0);
 
1088
}
 
1089
 
 
1090
 
895
1091
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
896
1092
                                  time_t event_creation_time)
897
1093
{
898
 
  extern uint32_t debug_not_change_ts_if_art_event;
 
1094
#ifndef DBUG_OFF
 
1095
  extern uint debug_not_change_ts_if_art_event;
 
1096
#endif
899
1097
  clear_flag(IN_STMT);
900
1098
 
901
1099
  /*
935
1133
      is that value may take some time to display in
936
1134
      Seconds_Behind_Master - not critical).
937
1135
    */
 
1136
#ifndef DBUG_OFF
938
1137
    if (!(event_creation_time == 0 && debug_not_change_ts_if_art_event > 0))
939
 
      last_master_timestamp= event_creation_time;
 
1138
#else
 
1139
      if (event_creation_time != 0)
 
1140
#endif
 
1141
        last_master_timestamp= event_creation_time;
940
1142
  }
941
1143
}
942
1144
 
 
1145
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
943
1146
void Relay_log_info::cleanup_context(THD *thd, bool error)
944
1147
{
945
 
  assert(sql_thd == thd);
 
1148
  DBUG_ENTER("Relay_log_info::cleanup_context");
 
1149
 
 
1150
  DBUG_ASSERT(sql_thd == thd);
946
1151
  /*
947
1152
    1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
948
1153
    may have opened tables, which we cannot be sure have been closed (because
970
1175
  thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
971
1176
  thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
972
1177
  last_event_start_time= 0;
973
 
  return;
 
1178
  DBUG_VOID_RETURN;
974
1179
}
975
1180
 
976
1181
void Relay_log_info::clear_tables_to_lock()
977
1182
{
978
1183
  while (tables_to_lock)
979
1184
  {
980
 
    unsigned char* to_free= reinterpret_cast<unsigned char*>(tables_to_lock);
 
1185
    uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
981
1186
    if (tables_to_lock->m_tabledef_valid)
982
1187
    {
983
1188
      tables_to_lock->m_tabledef.table_def::~table_def();
984
 
      tables_to_lock->m_tabledef_valid= false;
 
1189
      tables_to_lock->m_tabledef_valid= FALSE;
985
1190
    }
986
1191
    tables_to_lock=
987
 
      static_cast<RPL_TableList*>(tables_to_lock->next_global);
 
1192
      static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
988
1193
    tables_to_lock_count--;
989
 
    free(to_free);
 
1194
    my_free(to_free, MYF(MY_WME));
990
1195
  }
991
 
  assert(tables_to_lock == NULL && tables_to_lock_count == 0);
 
1196
  DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
992
1197
}
 
1198
 
 
1199
#endif