~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/rpl_rli.cc

  • Committer: Monty Taylor
  • Date: 2008-10-23 23:53:49 UTC
  • mto: This revision was merged to the branch mainline in revision 557.
  • Revision ID: monty@inaugust.com-20081023235349-317wgwqwgccuacmq
SplitĀ outĀ nested_join.h.

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
 
16
16
#include <drizzled/server_includes.h>
17
17
 
 
18
#if TIME_WITH_SYS_TIME
 
19
# include <sys/time.h>
 
20
# include <time.h>
 
21
#else
 
22
# if HAVE_SYS_TIME_H
 
23
#  include <sys/time.h>
 
24
# else
 
25
#  include <time.h>
 
26
# endif
 
27
#endif
 
28
 
 
29
 
18
30
#include "rpl_mi.h"
19
31
#include "rpl_rli.h"
20
32
#include "sql_repl.h"  // For check_binlog_magic
21
33
#include "rpl_utility.h"
22
34
 
23
 
#include <libdrizzle/gettext.h>
 
35
#include <drizzled/gettext.h>
24
36
 
25
37
static int32_t count_relay_log_space(Relay_log_info* rli);
26
38
 
40
52
   cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
41
53
   group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
42
54
   last_master_timestamp(0), slave_skip_counter(0),
43
 
   abort_pos_wait(0), slave_run_id(0), sql_thd(0),
 
55
   abort_pos_wait(0), slave_run_id(0), sql_session(0),
44
56
   inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
45
57
   until_log_pos(0), retried_trans(0),
46
58
   tables_to_lock(0), tables_to_lock_count(0),
51
63
  until_log_name[0]= ign_master_log_name_end[0]= 0;
52
64
  memset(&info_file, 0, sizeof(info_file));
53
65
  memset(&cache_buf, 0, sizeof(cache_buf));
54
 
  cached_charset_invalidate();
55
66
  pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
56
67
  pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
57
68
  pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
157
168
  /* if file does not exist */
158
169
  if (access(fname,F_OK))
159
170
  {
160
 
    /*
161
 
      If someone removed the file from underneath our feet, just close
162
 
      the old descriptor and re-create the old file
163
 
    */
164
 
    if (info_fd >= 0)
165
 
      my_close(info_fd, MYF(MY_WME));
166
 
    if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
167
 
    {
168
 
      sql_print_error(_("Failed to create a new relay log info file "
169
 
                        "( file '%s', errno %d)"), fname, my_errno);
170
 
      msg= current_thd->main_da.message();
171
 
      goto err;
172
 
    }
173
 
    if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
174
 
                      MYF(MY_WME)))
175
 
    {
176
 
      sql_print_error(_("Failed to create a cache on relay log info file '%s'"),
177
 
                      fname);
178
 
      msg= current_thd->main_da.message();
179
 
      goto err;
180
 
    }
181
 
 
182
 
    /* Init relay log with first entry in the relay index file */
183
 
    if (init_relay_log_pos(rli,NULL,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
184
 
                           &msg, 0))
185
 
    {
186
 
      sql_print_error(_("Failed to open the relay log 'FIRST' (relay_log_pos 4)"));
187
 
      goto err;
188
 
    }
189
 
    rli->group_master_log_name[0]= 0;
190
 
    rli->group_master_log_pos= 0;
191
 
    rli->info_fd= info_fd;
 
171
    /* Create a new file */
192
172
  }
193
173
  else // file exists
194
174
  {
195
 
    if (info_fd >= 0)
196
 
      reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
197
 
    else
198
 
    {
199
 
      int32_t error=0;
200
 
      if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
201
 
      {
202
 
        sql_print_error(_("Failed to open the existing relay log info "
203
 
                          "file '%s' (errno %d)"),
204
 
                        fname, my_errno);
205
 
        error= 1;
206
 
      }
207
 
      else if (init_io_cache(&rli->info_file, info_fd,
208
 
                             IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
209
 
      {
210
 
        sql_print_error(_("Failed to create a cache on relay log info "
211
 
                          "file '%s'"),
212
 
                        fname);
213
 
        error= 1;
214
 
      }
215
 
      if (error)
216
 
      {
217
 
        if (info_fd >= 0)
218
 
          my_close(info_fd, MYF(0));
219
 
        rli->info_fd= -1;
220
 
        rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
221
 
        pthread_mutex_unlock(&rli->data_lock);
222
 
        return(1);
223
 
      }
224
 
    }
225
 
 
226
 
    rli->info_fd = info_fd;
227
 
    int32_t relay_log_pos, master_log_pos;
228
 
    if (init_strvar_from_file(rli->group_relay_log_name,
229
 
                              sizeof(rli->group_relay_log_name),
230
 
                              &rli->info_file, "") ||
231
 
       init_intvar_from_file(&relay_log_pos,
232
 
                             &rli->info_file, BIN_LOG_HEADER_SIZE) ||
233
 
       init_strvar_from_file(rli->group_master_log_name,
234
 
                             sizeof(rli->group_master_log_name),
235
 
                             &rli->info_file, "") ||
236
 
       init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
237
 
    {
238
 
      msg="Error reading slave log configuration";
239
 
      goto err;
240
 
    }
241
 
    strmake(rli->event_relay_log_name,rli->group_relay_log_name,
242
 
            sizeof(rli->event_relay_log_name)-1);
243
 
    rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
244
 
    rli->group_master_log_pos= master_log_pos;
245
 
 
246
 
    if (init_relay_log_pos(rli,
247
 
                           rli->group_relay_log_name,
248
 
                           rli->group_relay_log_pos,
249
 
                           0 /* no data lock*/,
250
 
                           &msg, 0))
251
 
    {
252
 
      char llbuf[22];
253
 
      sql_print_error(_("Failed to open the relay log '%s' (relay_log_pos %s)"),
254
 
                      rli->group_relay_log_name,
255
 
                      llstr(rli->group_relay_log_pos, llbuf));
256
 
      goto err;
257
 
    }
 
175
    /* Open up fname here and pull out the relay.info data */
258
176
  }
259
 
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
260
 
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
261
177
 
262
178
  /*
263
179
    Now change the cache from READ to WRITE - must do this
276
192
  return(error);
277
193
 
278
194
err:
279
 
  sql_print_error(msg);
 
195
  sql_print_error("%s",msg);
280
196
  end_io_cache(&rli->info_file);
281
197
  if (info_fd >= 0)
282
198
    my_close(info_fd, MYF(0));
431
347
    *errmsg="Could not find target log during relay log initialization";
432
348
    goto err;
433
349
  }
434
 
  strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
435
 
          sizeof(rli->group_relay_log_name)-1);
436
 
  strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
437
 
          sizeof(rli->event_relay_log_name)-1);
 
350
 
 
351
  rli->group_relay_log_name.assign(rli->linfo.log_file_name);
 
352
  rli->event_relay_log_name.assign(rli->linfo.log_file_name);
 
353
 
438
354
  if (rli->relay_log.is_active(rli->linfo.log_file_name))
439
355
  {
440
356
    /*
548
464
 
549
465
  SYNOPSIS
550
466
    wait_for_pos()
551
 
    thd             client thread that sent SELECT MASTER_POS_WAIT
 
467
    session             client thread that sent SELECT MASTER_POS_WAIT
552
468
    log_name        log name to wait for
553
469
    log_pos         position to wait for
554
470
    timeout         timeout in seconds before giving up waiting
567
483
                before reaching the desired log/position
568
484
 */
569
485
 
570
 
int32_t Relay_log_info::wait_for_pos(THD* thd, String* log_name,
 
486
int32_t Relay_log_info::wait_for_pos(Session* session, String* log_name,
571
487
                                    int64_t log_pos,
572
488
                                    int64_t timeout)
573
489
{
582
498
 
583
499
  set_timespec(abstime,timeout);
584
500
  pthread_mutex_lock(&data_lock);
585
 
  msg= thd->enter_cond(&data_cond, &data_lock,
 
501
  msg= session->enter_cond(&data_cond, &data_lock,
586
502
                       "Waiting for the slave SQL thread to "
587
503
                       "advance position");
588
504
  /*
634
550
  }
635
551
 
636
552
  /* The "compare and wait" main loop */
637
 
  while (!thd->killed &&
 
553
  while (!session->killed &&
638
554
         init_abort_pos_wait == abort_pos_wait &&
639
555
         slave_running)
640
556
  {
654
570
      configuration which does nothing), then group_master_log_pos
655
571
      will grow and group_master_log_name will stay "".
656
572
    */
657
 
    if (*group_master_log_name)
 
573
    if (group_master_log_name.length())
658
574
    {
659
 
      char *basename= (group_master_log_name +
660
 
                       dirname_length(group_master_log_name));
 
575
      const char *basename= (group_master_log_name.c_str() +
 
576
                             dirname_length(group_master_log_name.c_str()));
661
577
      /*
662
578
        First compare the parts before the extension.
663
579
        Find the dot in the master's log basename,
680
596
 
681
597
      pos_reached= ((!cmp_result && group_master_log_pos >= (uint64_t)log_pos) ||
682
598
                    cmp_result > 0);
683
 
      if (pos_reached || thd->killed)
 
599
      if (pos_reached || session->killed)
684
600
        break;
685
601
    }
686
602
 
717
633
  }
718
634
 
719
635
err:
720
 
  thd->exit_cond(msg);
721
 
  if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
 
636
  session->exit_cond(msg);
 
637
  if (session->killed || init_abort_pos_wait != abort_pos_wait ||
722
638
      !slave_running)
723
639
  {
724
640
    error= -2;
734
650
    pthread_mutex_lock(&data_lock);
735
651
  inc_event_relay_log_pos();
736
652
  group_relay_log_pos= event_relay_log_pos;
737
 
  strmake(group_relay_log_name,event_relay_log_name,
738
 
          sizeof(group_relay_log_name)-1);
 
653
  group_relay_log_name.assign(event_relay_log_name);
739
654
 
740
655
  notify_group_relay_log_name_update();
741
656
 
805
720
    Assumes to have a run lock on rli and that no slave thread are running.
806
721
*/
807
722
 
808
 
int32_t purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
 
723
int32_t purge_relay_logs(Relay_log_info* rli, Session *session, bool just_reset,
809
724
                     const char** errmsg)
810
725
{
811
726
  int32_t error=0;
855
770
    rli->cur_log_fd= -1;
856
771
  }
857
772
 
858
 
  if (rli->relay_log.reset_logs(thd))
 
773
  if (rli->relay_log.reset_logs(session))
859
774
  {
860
775
    *errmsg = "Failed during log reset";
861
776
    error=1;
862
777
    goto err;
863
778
  }
864
779
  /* Save name of used relay log file */
865
 
  strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
866
 
          sizeof(rli->group_relay_log_name)-1);
867
 
  strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
868
 
          sizeof(rli->event_relay_log_name)-1);
 
780
  rli->group_relay_log_name.assign(rli->relay_log.get_log_fname());
 
781
  rli->event_relay_log_name.assign(rli->relay_log.get_log_fname());
869
782
  rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
870
783
  if (count_relay_log_space(rli))
871
784
  {
873
786
    goto err;
874
787
  }
875
788
  if (!just_reset)
876
 
    error= init_relay_log_pos(rli, rli->group_relay_log_name,
 
789
    error= init_relay_log_pos(rli, rli->group_relay_log_name.c_str(),
877
790
                              rli->group_relay_log_pos,
878
791
                              0 /* do not need data lock */, errmsg, 0);
879
792
 
925
838
 
926
839
  if (until_condition == UNTIL_MASTER_POS)
927
840
  {
928
 
    log_name= group_master_log_name;
 
841
    log_name= group_master_log_name.c_str();
929
842
    log_pos= master_beg_pos;
930
843
  }
931
844
  else
932
845
  { /* until_condition == UNTIL_RELAY_POS */
933
 
    log_name= group_relay_log_name;
 
846
    log_name= group_relay_log_name.c_str();
934
847
    log_pos= group_relay_log_pos;
935
848
  }
936
849
 
979
892
}
980
893
 
981
894
 
982
 
void Relay_log_info::cached_charset_invalidate()
983
 
{
984
 
  /* Full of zeroes means uninitialized. */
985
 
  memset(cached_charset, 0, sizeof(cached_charset));
986
 
  return;
987
 
}
988
 
 
989
 
 
990
 
bool Relay_log_info::cached_charset_compare(char *charset) const
991
 
{
992
 
  if (memcmp(cached_charset, charset, sizeof(cached_charset)))
993
 
  {
994
 
    memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
995
 
    return(1);
996
 
  }
997
 
  return(0);
998
 
}
999
 
 
1000
 
 
1001
895
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
1002
896
                                  time_t event_creation_time)
1003
897
{
1027
921
    middle of the "transaction". START SLAVE will resume at BEGIN
1028
922
    while the MyISAM table has already been updated.
1029
923
  */
1030
 
  if ((sql_thd->options & OPTION_BEGIN) && opt_using_transactions)
 
924
  if ((sql_session->options & OPTION_BEGIN) && opt_using_transactions)
1031
925
    inc_event_relay_log_pos();
1032
926
  else
1033
927
  {
1046
940
  }
1047
941
}
1048
942
 
1049
 
void Relay_log_info::cleanup_context(THD *thd, bool error)
 
943
void Relay_log_info::cleanup_context(Session *session, bool error)
1050
944
{
1051
 
  assert(sql_thd == thd);
 
945
  assert(sql_session == session);
1052
946
  /*
1053
947
    1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
1054
948
    may have opened tables, which we cannot be sure have been closed (because
1063
957
  */
1064
958
  if (error)
1065
959
  {
1066
 
    ha_autocommit_or_rollback(thd, 1); // if a "statement transaction"
1067
 
    end_trans(thd, ROLLBACK); // if a "real transaction"
 
960
    ha_autocommit_or_rollback(session, 1); // if a "statement transaction"
 
961
    end_trans(session, ROLLBACK); // if a "real transaction"
1068
962
  }
1069
963
  m_table_map.clear_tables();
1070
 
  close_thread_tables(thd);
 
964
  close_thread_tables(session);
1071
965
  clear_tables_to_lock();
1072
966
  clear_flag(IN_STMT);
1073
967
  /*
1074
968
    Cleanup for the flags that have been set at do_apply_event.
1075
969
  */
1076
 
  thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
1077
 
  thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
 
970
  session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
 
971
  session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
1078
972
  last_event_start_time= 0;
1079
973
  return;
1080
974
}