~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/rpl_rli.cc

  • Committer: Monty Taylor
  • Date: 2008-09-15 17:24:04 UTC
  • Revision ID: monty@inaugust.com-20080915172404-ygh6hiyu0q7qpa9x
Removed strndup calls.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
#include <drizzled/server_includes.h>
17
 
#include <drizzled/rpl_mi.h>
18
 
#include <drizzled/rpl_rli.h>
19
 
// For check_binlog_magic
20
 
#include <drizzled/sql_repl.h>
21
 
#include <drizzled/rpl_utility.h>
22
 
#include <drizzled/data_home.h>
23
 
#include <drizzled/sql_parse.h>
24
 
#include <drizzled/gettext.h>
25
 
 
26
 
#if TIME_WITH_SYS_TIME
27
 
# include <sys/time.h>
28
 
# include <time.h>
29
 
#else
30
 
# if HAVE_SYS_TIME_H
31
 
#  include <sys/time.h>
32
 
# else
33
 
#  include <time.h>
34
 
# endif
35
 
#endif
36
 
 
37
 
 
 
17
 
 
18
#include "rpl_mi.h"
 
19
#include "rpl_rli.h"
 
20
#include "sql_repl.h"  // For check_binlog_magic
 
21
#include "rpl_utility.h"
 
22
 
 
23
#include <libdrizzle/gettext.h>
38
24
 
39
25
static int32_t count_relay_log_space(Relay_log_info* rli);
40
26
 
54
40
   cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
55
41
   group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
56
42
   last_master_timestamp(0), slave_skip_counter(0),
57
 
   abort_pos_wait(0), slave_run_id(0), sql_session(0),
 
43
   abort_pos_wait(0), slave_run_id(0), sql_thd(0),
58
44
   inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
59
45
   until_log_pos(0), retried_trans(0),
60
46
   tables_to_lock(0), tables_to_lock_count(0),
65
51
  until_log_name[0]= ign_master_log_name_end[0]= 0;
66
52
  memset(&info_file, 0, sizeof(info_file));
67
53
  memset(&cache_buf, 0, sizeof(cache_buf));
 
54
  cached_charset_invalidate();
68
55
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
69
56
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
70
57
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
92
79
 
93
80
 
94
81
int32_t init_relay_log_info(Relay_log_info* rli,
95
 
                            const char* info_fname)
 
82
                        const char* info_fname)
96
83
{
97
84
  char fname[FN_REFLEN+128];
98
85
  int32_t info_fd;
170
157
  /* if file does not exist */
171
158
  if (access(fname,F_OK))
172
159
  {
173
 
    /* Create a new file */
 
160
    /*
 
161
      If someone removed the file from underneath our feet, just close
 
162
      the old descriptor and re-create the old file
 
163
    */
 
164
    if (info_fd >= 0)
 
165
      my_close(info_fd, MYF(MY_WME));
 
166
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
 
167
    {
 
168
      sql_print_error(_("Failed to create a new relay log info file "
 
169
                        "( file '%s', errno %d)"), fname, my_errno);
 
170
      msg= current_thd->main_da.message();
 
171
      goto err;
 
172
    }
 
173
    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
 
174
                      MYF(MY_WME)))
 
175
    {
 
176
      sql_print_error(_("Failed to create a cache on relay log info file '%s'"),
 
177
                      fname);
 
178
      msg= current_thd->main_da.message();
 
179
      goto err;
 
180
    }
 
181
 
 
182
    /* Init relay log with first entry in the relay index file */
 
183
    if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
 
184
                           &msg, 0))
 
185
    {
 
186
      sql_print_error(_("Failed to open the relay log 'FIRST' (relay_log_pos 4)"));
 
187
      goto err;
 
188
    }
 
189
    rli->group_master_log_name[0]= 0;
 
190
    rli->group_master_log_pos= 0;
 
191
    rli->info_fd= info_fd;
174
192
  }
175
193
  else // file exists
176
194
  {
177
 
    /* Open up fname here and pull out the relay.info data */
 
195
    if (info_fd >= 0)
 
196
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
 
197
    else
 
198
    {
 
199
      int32_t error=0;
 
200
      if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
 
201
      {
 
202
        sql_print_error(_("Failed to open the existing relay log info "
 
203
                          "file '%s' (errno %d)"),
 
204
                        fname, my_errno);
 
205
        error= 1;
 
206
      }
 
207
      else if (init_io_cache(&rli->info_file, info_fd,
 
208
                             IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
 
209
      {
 
210
        sql_print_error(_("Failed to create a cache on relay log info "
 
211
                          "file '%s'"),
 
212
                        fname);
 
213
        error= 1;
 
214
      }
 
215
      if (error)
 
216
      {
 
217
        if (info_fd >= 0)
 
218
          my_close(info_fd, MYF(0));
 
219
        rli->info_fd= -1;
 
220
        rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
 
221
        pthread_mutex_unlock(&rli->data_lock);
 
222
        return(1);
 
223
      }
 
224
    }
 
225
 
 
226
    rli->info_fd = info_fd;
 
227
    int32_t relay_log_pos, master_log_pos;
 
228
    if (init_strvar_from_file(rli->group_relay_log_name,
 
229
                              sizeof(rli->group_relay_log_name),
 
230
                              &rli->info_file, "") ||
 
231
       init_intvar_from_file(&relay_log_pos,
 
232
                             &rli->info_file, BIN_LOG_HEADER_SIZE) ||
 
233
       init_strvar_from_file(rli->group_master_log_name,
 
234
                             sizeof(rli->group_master_log_name),
 
235
                             &rli->info_file, "") ||
 
236
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
 
237
    {
 
238
      msg="Error reading slave log configuration";
 
239
      goto err;
 
240
    }
 
241
    strmake(rli->event_relay_log_name,rli->group_relay_log_name,
 
242
            sizeof(rli->event_relay_log_name)-1);
 
243
    rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
 
244
    rli->group_master_log_pos= master_log_pos;
 
245
 
 
246
    if (init_relay_log_pos(rli,
 
247
                           rli->group_relay_log_name,
 
248
                           rli->group_relay_log_pos,
 
249
                           0 /* no data lock*/,
 
250
                           &msg, 0))
 
251
    {
 
252
      char llbuf[22];
 
253
      sql_print_error(_("Failed to open the relay log '%s' (relay_log_pos %s)"),
 
254
                      rli->group_relay_log_name,
 
255
                      llstr(rli->group_relay_log_pos, llbuf));
 
256
      goto err;
 
257
    }
178
258
  }
 
259
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
 
260
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
179
261
 
180
262
  /*
181
263
    Now change the cache from READ to WRITE - must do this
194
276
  return(error);
195
277
 
196
278
err:
197
 
  sql_print_error("%s",msg);
 
279
  sql_print_error(msg);
198
280
  end_io_cache(&rli->info_file);
199
281
  if (info_fd >= 0)
200
282
    my_close(info_fd, MYF(0));
223
305
{
224
306
  LOG_INFO linfo;
225
307
  rli->log_space_total= 0;
226
 
  if (rli->relay_log.find_log_pos(&linfo, NULL, 1))
 
308
  if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
227
309
  {
228
310
    sql_print_error(_("Could not find first log while counting relay "
229
311
                      "log space"));
338
420
    Test to see if the previous run was with the skip of purging
339
421
    If yes, we do not purge when we restart
340
422
  */
341
 
  if (rli->relay_log.find_log_pos(&rli->linfo, NULL, 1))
 
423
  if (rli->relay_log.find_log_pos(&rli->linfo, NullS, 1))
342
424
  {
343
425
    *errmsg="Could not find first log during relay log initialization";
344
426
    goto err;
349
431
    *errmsg="Could not find target log during relay log initialization";
350
432
    goto err;
351
433
  }
352
 
 
353
 
  rli->group_relay_log_name.assign(rli->linfo.log_file_name);
354
 
  rli->event_relay_log_name.assign(rli->linfo.log_file_name);
355
 
 
 
434
  strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
 
435
          sizeof(rli->group_relay_log_name)-1);
 
436
  strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
 
437
          sizeof(rli->event_relay_log_name)-1);
356
438
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
357
439
  {
358
440
    /*
466
548
 
467
549
  SYNOPSIS
468
550
    wait_for_pos()
469
 
    session             client thread that sent SELECT MASTER_POS_WAIT
 
551
    thd             client thread that sent SELECT MASTER_POS_WAIT
470
552
    log_name        log name to wait for
471
553
    log_pos         position to wait for
472
554
    timeout         timeout in seconds before giving up waiting
485
567
                before reaching the desired log/position
486
568
 */
487
569
 
488
 
int32_t Relay_log_info::wait_for_pos(Session* session, String* log_name,
 
570
int32_t Relay_log_info::wait_for_pos(THD* thd, String* log_name,
489
571
                                    int64_t log_pos,
490
572
                                    int64_t timeout)
491
573
{
500
582
 
501
583
  set_timespec(abstime,timeout);
502
584
  pthread_mutex_lock(&data_lock);
503
 
  msg= session->enter_cond(&data_cond, &data_lock,
 
585
  msg= thd->enter_cond(&data_cond, &data_lock,
504
586
                       "Waiting for the slave SQL thread to "
505
587
                       "advance position");
506
588
  /*
527
609
  uint32_t log_name_extension;
528
610
  char log_name_tmp[FN_REFLEN]; //make a char[] from String
529
611
 
530
 
  strmake(log_name_tmp, log_name->ptr(), cmin(log_name->length(), (uint32_t)FN_REFLEN-1));
 
612
  strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), (uint32_t)FN_REFLEN-1));
531
613
 
532
614
  char *p= fn_ext(log_name_tmp);
533
615
  char *p_end;
537
619
    goto err;
538
620
  }
539
621
  // Convert 0-3 to 4
540
 
  log_pos= cmax(log_pos, (int64_t)BIN_LOG_HEADER_SIZE);
 
622
  log_pos= max(log_pos, (int64_t)BIN_LOG_HEADER_SIZE);
541
623
  /* p points to '.' */
542
624
  log_name_extension= strtoul(++p, &p_end, 10);
543
625
  /*
552
634
  }
553
635
 
554
636
  /* The "compare and wait" main loop */
555
 
  while (!session->killed &&
 
637
  while (!thd->killed &&
556
638
         init_abort_pos_wait == abort_pos_wait &&
557
639
         slave_running)
558
640
  {
572
654
      configuration which does nothing), then group_master_log_pos
573
655
      will grow and group_master_log_name will stay "".
574
656
    */
575
 
    if (group_master_log_name.length())
 
657
    if (*group_master_log_name)
576
658
    {
577
 
      const char *basename= (group_master_log_name.c_str() +
578
 
                             dirname_length(group_master_log_name.c_str()));
 
659
      char *basename= (group_master_log_name +
 
660
                       dirname_length(group_master_log_name));
579
661
      /*
580
662
        First compare the parts before the extension.
581
663
        Find the dot in the master's log basename,
598
680
 
599
681
      pos_reached= ((!cmp_result && group_master_log_pos >= (uint64_t)log_pos) ||
600
682
                    cmp_result > 0);
601
 
      if (pos_reached || session->killed)
 
683
      if (pos_reached || thd->killed)
602
684
        break;
603
685
    }
604
686
 
635
717
  }
636
718
 
637
719
err:
638
 
  session->exit_cond(msg);
639
 
  if (session->killed || init_abort_pos_wait != abort_pos_wait ||
 
720
  thd->exit_cond(msg);
 
721
  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
640
722
      !slave_running)
641
723
  {
642
724
    error= -2;
652
734
    pthread_mutex_lock(&data_lock);
653
735
  inc_event_relay_log_pos();
654
736
  group_relay_log_pos= event_relay_log_pos;
655
 
  group_relay_log_name.assign(event_relay_log_name);
 
737
  strmake(group_relay_log_name,event_relay_log_name,
 
738
          sizeof(group_relay_log_name)-1);
656
739
 
657
740
  notify_group_relay_log_name_update();
658
741
 
722
805
    Assumes to have a run lock on rli and that no slave thread are running.
723
806
*/
724
807
 
725
 
int32_t purge_relay_logs(Relay_log_info* rli, Session *session, bool just_reset,
 
808
int32_t purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
726
809
                     const char** errmsg)
727
810
{
728
811
  int32_t error=0;
772
855
    rli->cur_log_fd= -1;
773
856
  }
774
857
 
775
 
  if (rli->relay_log.reset_logs(session))
 
858
  if (rli->relay_log.reset_logs(thd))
776
859
  {
777
860
    *errmsg = "Failed during log reset";
778
861
    error=1;
779
862
    goto err;
780
863
  }
781
864
  /* Save name of used relay log file */
782
 
  rli->group_relay_log_name.assign(rli->relay_log.get_log_fname());
783
 
  rli->event_relay_log_name.assign(rli->relay_log.get_log_fname());
 
865
  strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
 
866
          sizeof(rli->group_relay_log_name)-1);
 
867
  strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
 
868
          sizeof(rli->event_relay_log_name)-1);
784
869
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
785
870
  if (count_relay_log_space(rli))
786
871
  {
788
873
    goto err;
789
874
  }
790
875
  if (!just_reset)
791
 
    error= init_relay_log_pos(rli, rli->group_relay_log_name.c_str(),
 
876
    error= init_relay_log_pos(rli, rli->group_relay_log_name,
792
877
                              rli->group_relay_log_pos,
793
878
                              0 /* do not need data lock */, errmsg, 0);
794
879
 
840
925
 
841
926
  if (until_condition == UNTIL_MASTER_POS)
842
927
  {
843
 
    log_name= group_master_log_name.c_str();
 
928
    log_name= group_master_log_name;
844
929
    log_pos= master_beg_pos;
845
930
  }
846
931
  else
847
932
  { /* until_condition == UNTIL_RELAY_POS */
848
 
    log_name= group_relay_log_name.c_str();
 
933
    log_name= group_relay_log_name;
849
934
    log_pos= group_relay_log_pos;
850
935
  }
851
936
 
894
979
}
895
980
 
896
981
 
 
982
void Relay_log_info::cached_charset_invalidate()
 
983
{
 
984
  /* Full of zeroes means uninitialized. */
 
985
  memset(cached_charset, 0, sizeof(cached_charset));
 
986
  return;
 
987
}
 
988
 
 
989
 
 
990
bool Relay_log_info::cached_charset_compare(char *charset) const
 
991
{
 
992
  if (memcmp(cached_charset, charset, sizeof(cached_charset)))
 
993
  {
 
994
    memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
 
995
    return(1);
 
996
  }
 
997
  return(0);
 
998
}
 
999
 
 
1000
 
897
1001
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
898
1002
                                  time_t event_creation_time)
899
1003
{
923
1027
    middle of the "transaction". START SLAVE will resume at BEGIN
924
1028
    while the MyISAM table has already been updated.
925
1029
  */
926
 
  if ((sql_session->options & OPTION_BEGIN) && opt_using_transactions)
 
1030
  if ((sql_thd->options & OPTION_BEGIN) && opt_using_transactions)
927
1031
    inc_event_relay_log_pos();
928
1032
  else
929
1033
  {
942
1046
  }
943
1047
}
944
1048
 
945
 
void Relay_log_info::cleanup_context(Session *session, bool error)
 
1049
#if !defined(DRIZZLE_CLIENT) && defined(HAVE_REPLICATION)
 
1050
void Relay_log_info::cleanup_context(THD *thd, bool error)
946
1051
{
947
 
  assert(sql_session == session);
 
1052
  assert(sql_thd == thd);
948
1053
  /*
949
1054
    1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
950
1055
    may have opened tables, which we cannot be sure have been closed (because
959
1064
  */
960
1065
  if (error)
961
1066
  {
962
 
    ha_autocommit_or_rollback(session, 1); // if a "statement transaction"
963
 
    end_trans(session, ROLLBACK); // if a "real transaction"
 
1067
    ha_autocommit_or_rollback(thd, 1); // if a "statement transaction"
 
1068
    end_trans(thd, ROLLBACK); // if a "real transaction"
964
1069
  }
965
1070
  m_table_map.clear_tables();
966
 
  close_thread_tables(session);
 
1071
  close_thread_tables(thd);
967
1072
  clear_tables_to_lock();
968
1073
  clear_flag(IN_STMT);
969
1074
  /*
970
1075
    Cleanup for the flags that have been set at do_apply_event.
971
1076
  */
972
 
  session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
973
 
  session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
 
1077
  thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
 
1078
  thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
974
1079
  last_event_start_time= 0;
975
1080
  return;
976
1081
}
979
1084
{
980
1085
  while (tables_to_lock)
981
1086
  {
982
 
    unsigned char* to_free= reinterpret_cast<unsigned char*>(tables_to_lock);
 
1087
    uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
983
1088
    if (tables_to_lock->m_tabledef_valid)
984
1089
    {
985
1090
      tables_to_lock->m_tabledef.table_def::~table_def();
988
1093
    tables_to_lock=
989
1094
      static_cast<RPL_TableList*>(tables_to_lock->next_global);
990
1095
    tables_to_lock_count--;
991
 
    free(to_free);
 
1096
    my_free(to_free, MYF(MY_WME));
992
1097
  }
993
1098
  assert(tables_to_lock == NULL && tables_to_lock_count == 0);
994
1099
}
 
1100
 
 
1101
#endif