~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to server/rpl_rli.cc

Removed/replaced DBUG symbols and standardized TRUE/FALSE

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
   along with this program; if not, write to the Free Software
14
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
 
#include <drizzled/server_includes.h>
17
 
 
18
 
#if TIME_WITH_SYS_TIME
19
 
# include <sys/time.h>
20
 
# include <time.h>
21
 
#else
22
 
# if HAVE_SYS_TIME_H
23
 
#  include <sys/time.h>
24
 
# else
25
 
#  include <time.h>
26
 
# endif
27
 
#endif
28
 
 
 
16
#include "mysql_priv.h"
29
17
 
30
18
#include "rpl_mi.h"
31
19
#include "rpl_rli.h"
 
20
#include <my_dir.h>
32
21
#include "sql_repl.h"  // For check_binlog_magic
33
22
#include "rpl_utility.h"
34
23
 
35
 
#include <libdrizzle/gettext.h>
36
 
 
37
 
static int32_t count_relay_log_space(Relay_log_info* rli);
 
24
static int count_relay_log_space(Relay_log_info* rli);
38
25
 
39
26
// Defined in slave.cc
40
 
int32_t init_intvar_from_file(int32_t* var, IO_CACHE* f, int32_t default_val);
41
 
int32_t init_strvar_from_file(char *var, int32_t max_size, IO_CACHE *f,
 
27
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
 
28
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
42
29
                          const char *default_val);
43
30
 
44
31
 
61
48
  group_relay_log_name[0]= event_relay_log_name[0]=
62
49
    group_master_log_name[0]= 0;
63
50
  until_log_name[0]= ign_master_log_name_end[0]= 0;
64
 
  memset(&info_file, 0, sizeof(info_file));
65
 
  memset(&cache_buf, 0, sizeof(cache_buf));
 
51
  bzero((char*) &info_file, sizeof(info_file));
 
52
  bzero((char*) &cache_buf, sizeof(cache_buf));
 
53
  cached_charset_invalidate();
66
54
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
67
55
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
68
56
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
89
77
}
90
78
 
91
79
 
92
 
int32_t init_relay_log_info(Relay_log_info* rli,
 
80
int init_relay_log_info(Relay_log_info* rli,
93
81
                        const char* info_fname)
94
82
{
95
83
  char fname[FN_REFLEN+128];
96
 
  int32_t info_fd;
 
84
  int info_fd;
97
85
  const char* msg = 0;
98
 
  int32_t error = 0;
 
86
  int error = 0;
99
87
  assert(!rli->no_storage);         // Don't init if there is no storage
100
88
 
101
89
  if (rli->inited)                       // Set if this function called
142
130
        instead require a name. But as we don't want to break many existing
143
131
        setups, we only give warning, not error.
144
132
      */
145
 
      sql_print_warning(_("Neither --relay-log nor --relay-log-index were used;"
 
133
      sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
146
134
                        " so replication "
147
135
                        "may break when this MySQL server acts as a "
148
136
                        "slave and has his hostname changed!! Please "
149
 
                        "use '--relay-log=%s' to avoid this problem."), ln);
 
137
                        "use '--relay-log=%s' to avoid this problem.", ln);
150
138
      name_warning_sent= 1;
151
139
    }
152
140
    /*
159
147
                            max_binlog_size), 1))
160
148
    {
161
149
      pthread_mutex_unlock(&rli->data_lock);
162
 
      sql_print_error(_("Failed in open_log() called from "
163
 
                        "init_relay_log_info()"));
 
150
      sql_print_error("Failed in open_log() called from init_relay_log_info()");
164
151
      return(1);
165
152
    }
166
153
  }
168
155
  /* if file does not exist */
169
156
  if (access(fname,F_OK))
170
157
  {
171
 
    /* Create a new file */
 
158
    /*
 
159
      If someone removed the file from underneath our feet, just close
 
160
      the old descriptor and re-create the old file
 
161
    */
 
162
    if (info_fd >= 0)
 
163
      my_close(info_fd, MYF(MY_WME));
 
164
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
 
165
    {
 
166
      sql_print_error("Failed to create a new relay log info file (\
 
167
file '%s', errno %d)", fname, my_errno);
 
168
      msg= current_thd->main_da.message();
 
169
      goto err;
 
170
    }
 
171
    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
 
172
                      MYF(MY_WME)))
 
173
    {
 
174
      sql_print_error("Failed to create a cache on relay log info file '%s'",
 
175
                      fname);
 
176
      msg= current_thd->main_da.message();
 
177
      goto err;
 
178
    }
 
179
 
 
180
    /* Init relay log with first entry in the relay index file */
 
181
    if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
 
182
                           &msg, 0))
 
183
    {
 
184
      sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
 
185
      goto err;
 
186
    }
 
187
    rli->group_master_log_name[0]= 0;
 
188
    rli->group_master_log_pos= 0;
 
189
    rli->info_fd= info_fd;
172
190
  }
173
191
  else // file exists
174
192
  {
175
 
    /* Open up fname here and pull out the relay.info data */
 
193
    if (info_fd >= 0)
 
194
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
 
195
    else
 
196
    {
 
197
      int error=0;
 
198
      if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
 
199
      {
 
200
        sql_print_error("\
 
201
Failed to open the existing relay log info file '%s' (errno %d)",
 
202
                        fname, my_errno);
 
203
        error= 1;
 
204
      }
 
205
      else if (init_io_cache(&rli->info_file, info_fd,
 
206
                             IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
 
207
      {
 
208
        sql_print_error("Failed to create a cache on relay log info file '%s'",
 
209
                        fname);
 
210
        error= 1;
 
211
      }
 
212
      if (error)
 
213
      {
 
214
        if (info_fd >= 0)
 
215
          my_close(info_fd, MYF(0));
 
216
        rli->info_fd= -1;
 
217
        rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
218
        pthread_mutex_unlock(&rli->data_lock);
 
219
        return(1);
 
220
      }
 
221
    }
 
222
 
 
223
    rli->info_fd = info_fd;
 
224
    int relay_log_pos, master_log_pos;
 
225
    if (init_strvar_from_file(rli->group_relay_log_name,
 
226
                              sizeof(rli->group_relay_log_name),
 
227
                              &rli->info_file, "") ||
 
228
       init_intvar_from_file(&relay_log_pos,
 
229
                             &rli->info_file, BIN_LOG_HEADER_SIZE) ||
 
230
       init_strvar_from_file(rli->group_master_log_name,
 
231
                             sizeof(rli->group_master_log_name),
 
232
                             &rli->info_file, "") ||
 
233
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
 
234
    {
 
235
      msg="Error reading slave log configuration";
 
236
      goto err;
 
237
    }
 
238
    strmake(rli->event_relay_log_name,rli->group_relay_log_name,
 
239
            sizeof(rli->event_relay_log_name)-1);
 
240
    rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
 
241
    rli->group_master_log_pos= master_log_pos;
 
242
 
 
243
    if (init_relay_log_pos(rli,
 
244
                           rli->group_relay_log_name,
 
245
                           rli->group_relay_log_pos,
 
246
                           0 /* no data lock*/,
 
247
                           &msg, 0))
 
248
    {
 
249
      char llbuf[22];
 
250
      sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
 
251
                      rli->group_relay_log_name,
 
252
                      llstr(rli->group_relay_log_pos, llbuf));
 
253
      goto err;
 
254
    }
176
255
  }
 
256
  char llbuf1[22], llbuf2[22];
 
257
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
 
258
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
177
259
 
178
260
  /*
179
261
    Now change the cache from READ to WRITE - must do this
181
263
  */
182
264
  reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
183
265
  if ((error= flush_relay_log_info(rli)))
184
 
    sql_print_error(_("Failed to flush relay log info file"));
 
266
    sql_print_error("Failed to flush relay log info file");
185
267
  if (count_relay_log_space(rli))
186
268
  {
187
 
    msg=_("Error counting relay log space");
 
269
    msg="Error counting relay log space";
188
270
    goto err;
189
271
  }
190
272
  rli->inited= 1;
203
285
}
204
286
 
205
287
 
206
 
static inline int32_t add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
 
288
static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
207
289
{
208
290
  struct stat s;
209
291
  if (stat(linfo->log_file_name,&s))
210
292
  {
211
 
    sql_print_error(_("log %s listed in the index, but failed to stat"),
 
293
    sql_print_error("log %s listed in the index, but failed to stat",
212
294
                    linfo->log_file_name);
213
295
    return(1);
214
296
  }
217
299
}
218
300
 
219
301
 
220
 
static int32_t count_relay_log_space(Relay_log_info* rli)
 
302
static int count_relay_log_space(Relay_log_info* rli)
221
303
{
222
304
  LOG_INFO linfo;
223
305
  rli->log_space_total= 0;
224
 
  if (rli->relay_log.find_log_pos(&linfo, NULL, 1))
 
306
  if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
225
307
  {
226
 
    sql_print_error(_("Could not find first log while counting relay "
227
 
                      "log space"));
 
308
    sql_print_error("Could not find first log while counting relay log space");
228
309
    return(1);
229
310
  }
230
311
  do
291
372
    1   error.  errmsg is set to point to the error message
292
373
*/
293
374
 
294
 
int32_t init_relay_log_pos(Relay_log_info* rli,const char* log,
295
 
                       uint64_t pos, bool need_data_lock,
 
375
int init_relay_log_pos(Relay_log_info* rli,const char* log,
 
376
                       ulonglong pos, bool need_data_lock,
296
377
                       const char** errmsg,
297
378
                       bool look_for_description_event)
298
379
{
336
417
    Test to see if the previous run was with the skip of purging
337
418
    If yes, we do not purge when we restart
338
419
  */
339
 
  if (rli->relay_log.find_log_pos(&rli->linfo, NULL, 1))
 
420
  if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
340
421
  {
341
422
    *errmsg="Could not find first log during relay log initialization";
342
423
    goto err;
347
428
    *errmsg="Could not find target log during relay log initialization";
348
429
    goto err;
349
430
  }
350
 
 
351
 
  rli->group_relay_log_name.assign(rli->linfo.log_file_name);
352
 
  rli->event_relay_log_name.assign(rli->linfo.log_file_name);
353
 
 
 
431
  strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
 
432
          sizeof(rli->group_relay_log_name)-1);
 
433
  strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
 
434
          sizeof(rli->event_relay_log_name)-1);
354
435
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
355
436
  {
356
437
    /*
470
551
    timeout         timeout in seconds before giving up waiting
471
552
 
472
553
  NOTES
473
 
    timeout is int64_t whereas it should be uint32_t ; but this is
 
554
    timeout is longlong whereas it should be ulong ; but this is
474
555
    to catch if the user submitted a negative timeout.
475
556
 
476
557
  RETURN VALUES
483
564
                before reaching the desired log/position
484
565
 */
485
566
 
486
 
int32_t Relay_log_info::wait_for_pos(THD* thd, String* log_name,
487
 
                                    int64_t log_pos,
488
 
                                    int64_t timeout)
 
567
int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
 
568
                                    longlong log_pos,
 
569
                                    longlong timeout)
489
570
{
490
 
  int32_t event_count = 0;
491
 
  uint32_t init_abort_pos_wait;
492
 
  int32_t error=0;
 
571
  int event_count = 0;
 
572
  ulong init_abort_pos_wait;
 
573
  int error=0;
493
574
  struct timespec abstime; // for timeout checking
494
575
  const char *msg;
495
576
 
519
600
  /*
520
601
    We'll need to
521
602
    handle all possible log names comparisons (e.g. 999 vs 1000).
522
 
    We use uint32_t for string->number conversion ; this is no
 
603
    We use ulong for string->number conversion ; this is no
523
604
    stronger limitation than in find_uniq_filename in sql/log.cc
524
605
  */
525
 
  uint32_t log_name_extension;
 
606
  ulong log_name_extension;
526
607
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
527
608
 
528
 
  strmake(log_name_tmp, log_name->ptr(), cmin(log_name->length(), (uint32_t)FN_REFLEN-1));
 
609
  strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), FN_REFLEN-1));
529
610
 
530
611
  char *p= fn_ext(log_name_tmp);
531
612
  char *p_end;
535
616
    goto err;
536
617
  }
537
618
  // Convert 0-3 to 4
538
 
  log_pos= cmax(log_pos, (int64_t)BIN_LOG_HEADER_SIZE);
 
619
  log_pos= max(log_pos, BIN_LOG_HEADER_SIZE);
539
620
  /* p points to '.' */
540
621
  log_name_extension= strtoul(++p, &p_end, 10);
541
622
  /*
555
636
         slave_running)
556
637
  {
557
638
    bool pos_reached;
558
 
    int32_t cmp_result= 0;
 
639
    int cmp_result= 0;
559
640
 
560
641
    /*
561
642
      group_master_log_name can be "", if we are just after a fresh
570
651
      configuration which does nothing), then group_master_log_pos
571
652
      will grow and group_master_log_name will stay "".
572
653
    */
573
 
    if (group_master_log_name.length())
 
654
    if (*group_master_log_name)
574
655
    {
575
 
      const char *basename= (group_master_log_name.c_str() +
576
 
                             dirname_length(group_master_log_name.c_str()));
 
656
      char *basename= (group_master_log_name +
 
657
                       dirname_length(group_master_log_name));
577
658
      /*
578
659
        First compare the parts before the extension.
579
660
        Find the dot in the master's log basename,
581
662
        if the names do not match up to '.' included, return error
582
663
      */
583
664
      char *q= (char*)(fn_ext(basename)+1);
584
 
      if (strncmp(basename, log_name_tmp, (int32_t)(q-basename)))
 
665
      if (strncmp(basename, log_name_tmp, (int)(q-basename)))
585
666
      {
586
667
        error= -2;
587
668
        break;
588
669
      }
589
670
      // Now compare extensions.
590
671
      char *q_end;
591
 
      uint32_t group_master_log_name_extension= strtoul(q, &q_end, 10);
 
672
      ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
592
673
      if (group_master_log_name_extension < log_name_extension)
593
674
        cmp_result= -1 ;
594
675
      else
595
676
        cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
596
677
 
597
 
      pos_reached= ((!cmp_result && group_master_log_pos >= (uint64_t)log_pos) ||
 
678
      pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
598
679
                    cmp_result > 0);
599
680
      if (pos_reached || thd->killed)
600
681
        break;
643
724
}
644
725
 
645
726
 
646
 
void Relay_log_info::inc_group_relay_log_pos(uint64_t log_pos,
 
727
void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
647
728
                                                bool skip_lock)
648
729
{
649
730
  if (!skip_lock)
650
731
    pthread_mutex_lock(&data_lock);
651
732
  inc_event_relay_log_pos();
652
733
  group_relay_log_pos= event_relay_log_pos;
653
 
  group_relay_log_name.assign(event_relay_log_name);
 
734
  strmake(group_relay_log_name,event_relay_log_name,
 
735
          sizeof(group_relay_log_name)-1);
654
736
 
655
737
  notify_group_relay_log_name_update();
656
738
 
697
779
 
698
780
void Relay_log_info::close_temporary_tables()
699
781
{
700
 
  Table *table,*next;
 
782
  TABLE *table,*next;
701
783
 
702
784
  for (table=save_temporary_tables ; table ; table=next)
703
785
  {
720
802
    Assumes to have a run lock on rli and that no slave thread are running.
721
803
*/
722
804
 
723
 
int32_t purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
 
805
int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
724
806
                     const char** errmsg)
725
807
{
726
 
  int32_t error=0;
 
808
  int error=0;
727
809
 
728
810
  /*
729
811
    Even if rli->inited==0, we still try to empty rli->master_log_* variables.
777
859
    goto err;
778
860
  }
779
861
  /* Save name of used relay log file */
780
 
  rli->group_relay_log_name.assign(rli->relay_log.get_log_fname());
781
 
  rli->event_relay_log_name.assign(rli->relay_log.get_log_fname());
 
862
  strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
 
863
          sizeof(rli->group_relay_log_name)-1);
 
864
  strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
 
865
          sizeof(rli->event_relay_log_name)-1);
782
866
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
783
867
  if (count_relay_log_space(rli))
784
868
  {
786
870
    goto err;
787
871
  }
788
872
  if (!just_reset)
789
 
    error= init_relay_log_pos(rli, rli->group_relay_log_name.c_str(),
 
873
    error= init_relay_log_pos(rli, rli->group_relay_log_name,
790
874
                              rli->group_relay_log_pos,
791
875
                              0 /* do not need data lock */, errmsg, 0);
792
876
 
832
916
bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
833
917
{
834
918
  const char *log_name;
835
 
  uint64_t log_pos;
 
919
  ulonglong log_pos;
836
920
 
837
921
  assert(until_condition != UNTIL_NONE);
838
922
 
839
923
  if (until_condition == UNTIL_MASTER_POS)
840
924
  {
841
 
    log_name= group_master_log_name.c_str();
 
925
    log_name= group_master_log_name;
842
926
    log_pos= master_beg_pos;
843
927
  }
844
928
  else
845
929
  { /* until_condition == UNTIL_RELAY_POS */
846
 
    log_name= group_relay_log_name.c_str();
 
930
    log_name= group_relay_log_name;
847
931
    log_pos= group_relay_log_pos;
848
932
  }
849
933
 
862
946
      const char *basename= log_name + dirname_length(log_name);
863
947
 
864
948
      const char *q= (const char*)(fn_ext(basename)+1);
865
 
      if (strncmp(basename, until_log_name, (int32_t)(q-basename)) == 0)
 
949
      if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
866
950
      {
867
951
        /* Now compare extensions. */
868
952
        char *q_end;
869
 
        uint32_t log_name_extension= strtoul(q, &q_end, 10);
 
953
        ulong log_name_extension= strtoul(q, &q_end, 10);
870
954
        if (log_name_extension < until_log_name_extension)
871
955
          until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
872
956
        else
877
961
      else
878
962
      {
879
963
        /* Probably error so we aborting */
880
 
        sql_print_error(_("Slave SQL thread is stopped because UNTIL "
881
 
                          "condition is bad."));
 
964
        sql_print_error("Slave SQL thread is stopped because UNTIL "
 
965
                        "condition is bad.");
882
966
        return(true);
883
967
      }
884
968
    }
892
976
}
893
977
 
894
978
 
 
979
void Relay_log_info::cached_charset_invalidate()
 
980
{
 
981
  /* Full of zeroes means uninitialized. */
 
982
  bzero(cached_charset, sizeof(cached_charset));
 
983
  return;
 
984
}
 
985
 
 
986
 
 
987
bool Relay_log_info::cached_charset_compare(char *charset) const
 
988
{
 
989
  if (bcmp((uchar*) cached_charset, (uchar*) charset,
 
990
           sizeof(cached_charset)))
 
991
  {
 
992
    memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
 
993
    return(1);
 
994
  }
 
995
  return(0);
 
996
}
 
997
 
 
998
 
895
999
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
896
1000
                                  time_t event_creation_time)
897
1001
{
898
 
  extern uint32_t debug_not_change_ts_if_art_event;
 
1002
  extern uint debug_not_change_ts_if_art_event;
899
1003
  clear_flag(IN_STMT);
900
1004
 
901
1005
  /*
940
1044
  }
941
1045
}
942
1046
 
 
1047
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
943
1048
void Relay_log_info::cleanup_context(THD *thd, bool error)
944
1049
{
945
1050
  assert(sql_thd == thd);
977
1082
{
978
1083
  while (tables_to_lock)
979
1084
  {
980
 
    unsigned char* to_free= reinterpret_cast<unsigned char*>(tables_to_lock);
 
1085
    uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
981
1086
    if (tables_to_lock->m_tabledef_valid)
982
1087
    {
983
1088
      tables_to_lock->m_tabledef.table_def::~table_def();
984
1089
      tables_to_lock->m_tabledef_valid= false;
985
1090
    }
986
1091
    tables_to_lock=
987
 
      static_cast<RPL_TableList*>(tables_to_lock->next_global);
 
1092
      static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
988
1093
    tables_to_lock_count--;
989
 
    free(to_free);
 
1094
    my_free(to_free, MYF(MY_WME));
990
1095
  }
991
1096
  assert(tables_to_lock == NULL && tables_to_lock_count == 0);
992
1097
}
 
1098
 
 
1099
#endif