~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_repl.cc

  • Committer: Brian Aker
  • Date: 2008-11-04 15:39:09 UTC
  • mfrom: (575.1.2 devel)
  • Revision ID: brian@tangent.org-20081104153909-c72hn65udxs1ccal
Merge of Monty's work

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
 
16
16
#include <drizzled/server_includes.h>
17
17
 
18
 
#ifdef HAVE_REPLICATION
19
 
#include "rpl_mi.h"
20
 
#include "sql_repl.h"
21
 
#include "log_event.h"
22
 
#include "rpl_filter.h"
23
 
#include <drizzled/drizzled_error_messages.h>
 
18
#include <drizzled/rpl_mi.h>
 
19
#include <drizzled/sql_repl.h>
 
20
#include <drizzled/log_event.h>
 
21
#include <drizzled/rpl_filter.h>
 
22
#include <drizzled/error.h>
 
23
#include <drizzled/gettext.h>
 
24
#include <drizzled/data_home.h>
24
25
 
25
26
int max_binlog_dump_events = 0; // unlimited
26
27
 
53
54
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
54
55
 
55
56
  char* p = log_file_name+dirname_length(log_file_name);
56
 
  uint ident_len = (uint) strlen(p);
 
57
  uint32_t ident_len = (uint32_t) strlen(p);
57
58
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
58
59
  int4store(header + SERVER_ID_OFFSET, server_id);
59
60
  int4store(header + EVENT_LEN_OFFSET, event_len);
66
67
  int8store(buf+R_POS_OFFSET,position);
67
68
  packet->append(buf, ROTATE_HEADER_LEN);
68
69
  packet->append(p,ident_len);
69
 
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
 
70
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
70
71
  {
71
72
    *errmsg = "failed on my_net_write()";
72
73
    return(-1);
74
75
  return(0);
75
76
}
76
77
 
77
 
static int send_file(THD *thd)
 
78
static int send_file(Session *session)
78
79
{
79
 
  NET* net = &thd->net;
 
80
  NET* net = &session->net;
80
81
  int fd = -1, error = 1;
81
82
  size_t bytes;
82
83
  char fname[FN_REFLEN+1];
83
84
  const char *errmsg = 0;
84
85
  int old_timeout;
85
86
  unsigned long packet_len;
86
 
  uchar buf[IO_SIZE];                           // It's safe to alloc this
 
87
  unsigned char buf[IO_SIZE];                           // It's safe to alloc this
87
88
 
88
89
  /*
89
90
    The client might be slow loading the data, give him wait_timeout to do
90
91
    the job
91
92
  */
92
93
  old_timeout= net->read_timeout;
93
 
  my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
 
94
  my_net_set_read_timeout(net, session->variables.net_wait_timeout);
94
95
 
95
96
  /*
96
97
    We need net_flush here because the client will not know it needs to send
125
126
  }
126
127
 
127
128
 end:
128
 
  if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
 
129
  if (my_net_write(net, (unsigned char*) "", 0) || net_flush(net) ||
129
130
      (my_net_read(net) == packet_error))
130
131
  {
131
132
    errmsg = _("Failed in send_file() while negotiating file transfer close");
139
140
    (void) my_close(fd, MYF(0));
140
141
  if (errmsg)
141
142
  {
142
 
    sql_print_error(errmsg);
 
143
    sql_print_error("%s",errmsg);
143
144
  }
144
145
  return(error);
145
146
}
168
169
 
169
170
void adjust_linfo_offsets(my_off_t purge_offset)
170
171
{
171
 
  THD *tmp;
 
172
  Session *tmp;
172
173
 
173
174
  pthread_mutex_lock(&LOCK_thread_count);
174
 
  I_List_iterator<THD> it(threads);
 
175
  I_List_iterator<Session> it(threads);
175
176
 
176
177
  while ((tmp=it++))
177
178
  {
198
199
bool log_in_use(const char* log_name)
199
200
{
200
201
  int log_name_len = strlen(log_name) + 1;
201
 
  THD *tmp;
 
202
  Session *tmp;
202
203
  bool result = 0;
203
204
 
204
205
  pthread_mutex_lock(&LOCK_thread_count);
205
 
  I_List_iterator<THD> it(threads);
 
206
  I_List_iterator<Session> it(threads);
206
207
 
207
208
  while ((tmp=it++))
208
209
  {
221
222
  return result;
222
223
}
223
224
 
224
 
bool purge_error_message(THD* thd, int res)
 
225
bool purge_error_message(Session* session, int res)
225
226
{
226
 
  uint errmsg= 0;
 
227
  uint32_t errmsg= 0;
227
228
 
228
229
  switch (res)  {
229
230
  case 0: break;
243
244
    my_message(errmsg, ER(errmsg), MYF(0));
244
245
    return true;
245
246
  }
246
 
  my_ok(thd);
 
247
  my_ok(session);
247
248
  return false;
248
249
}
249
250
 
250
251
 
251
 
bool purge_master_logs(THD* thd, const char* to_log)
 
252
bool purge_master_logs(Session* session, const char* to_log)
252
253
{
253
254
  char search_file_name[FN_REFLEN];
254
255
  if (!mysql_bin_log.is_open())
255
256
  {
256
 
    my_ok(thd);
 
257
    my_ok(session);
257
258
    return false;
258
259
  }
259
260
 
260
261
  mysql_bin_log.make_log_name(search_file_name, to_log);
261
 
  return purge_error_message(thd,
 
262
  return purge_error_message(session,
262
263
                             mysql_bin_log.purge_logs(search_file_name, 0, 1,
263
264
                                                      1, NULL));
264
265
}
265
266
 
266
267
 
267
 
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
 
268
bool purge_master_logs_before_date(Session* session, time_t purge_time)
268
269
{
269
270
  if (!mysql_bin_log.is_open())
270
271
  {
271
 
    my_ok(thd);
 
272
    my_ok(session);
272
273
    return 0;
273
274
  }
274
 
  return purge_error_message(thd,
 
275
  return purge_error_message(session,
275
276
                             mysql_bin_log.purge_logs_before_date(purge_time));
276
277
}
277
278
 
309
310
  An auxiliary function for calling in mysql_binlog_send
310
311
  to initialize the heartbeat timeout in waiting for a binlogged event.
311
312
 
312
 
  @param[in]    thd  THD to access a user variable
 
313
  @param[in]    session  Session to access a user variable
313
314
 
314
315
  @return        heartbeat period an uint64_t of nanoseconds
315
316
                 or zero if heartbeat was not demanded by slave
316
317
*/ 
317
 
static uint64_t get_heartbeat_period(THD * thd)
 
318
static uint64_t get_heartbeat_period(Session * session)
318
319
{
319
320
  bool null_value;
320
321
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
321
322
  user_var_entry *entry= 
322
 
    (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
 
323
    (user_var_entry*) hash_search(&session->user_vars, (unsigned char*) name.str,
323
324
                                  name.length);
324
325
  return entry? entry->val_int(&null_value) : 0;
325
326
}
327
328
/*
328
329
  Function prepares and sends repliation heartbeat event.
329
330
 
330
 
  @param net                net object of THD
 
331
  @param net                net object of Session
331
332
  @param packet             buffer to store the heartbeat instance
332
333
  @param event_coordinates  binlog file name and position of the last
333
334
                            real event master sent from binlog
352
353
 
353
354
  char* p= coord->file_name + dirname_length(coord->file_name);
354
355
 
355
 
  uint ident_len = strlen(p);
 
356
  uint32_t ident_len = strlen(p);
356
357
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN;
357
358
  int4store(header + SERVER_ID_OFFSET, server_id);
358
359
  int4store(header + EVENT_LEN_OFFSET, event_len);
363
364
  packet->append(header, sizeof(header));
364
365
  packet->append(p, ident_len);             // log_file_name
365
366
 
366
 
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
 
367
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) ||
367
368
      net_flush(net))
368
369
  {
369
370
    return(-1);
376
377
  TODO: Clean up loop to only have one call to send_file()
377
378
*/
378
379
 
379
 
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
380
 
                       ushort flags)
 
380
void mysql_binlog_send(Session* session, char* log_ident, my_off_t pos,
 
381
                       uint16_t flags)
381
382
{
382
383
  LOG_INFO linfo;
383
384
  char *log_file_name = linfo.log_file_name;
384
385
  char search_file_name[FN_REFLEN], *name;
385
386
  IO_CACHE log;
386
387
  File file = -1;
387
 
  String* packet = &thd->packet;
 
388
  String* packet = &session->packet;
388
389
  int error;
389
390
  const char *errmsg = "Unknown error";
390
 
  NET* net = &thd->net;
 
391
  NET* net = &session->net;
391
392
  pthread_mutex_t *log_lock;
392
393
  bool binlog_can_be_corrupted= false;
393
394
 
395
396
  /* 
396
397
     heartbeat_period from @master_heartbeat_period user variable
397
398
  */
398
 
  uint64_t heartbeat_period= get_heartbeat_period(thd);
 
399
  uint64_t heartbeat_period= get_heartbeat_period(session);
399
400
  struct timespec heartbeat_buf;
400
401
  struct event_coordinates coord_buf;
401
402
  struct timespec *heartbeat_ts= NULL;
402
403
  struct event_coordinates *coord= NULL;
403
 
  if (heartbeat_period != 0LL)
 
404
  if (heartbeat_period != 0L)
404
405
  {
405
406
    heartbeat_ts= &heartbeat_buf;
406
407
    set_timespec_nsec(*heartbeat_ts, 0);
438
439
  }
439
440
 
440
441
  pthread_mutex_lock(&LOCK_thread_count);
441
 
  thd->current_linfo = &linfo;
 
442
  session->current_linfo = &linfo;
442
443
  pthread_mutex_unlock(&LOCK_thread_count);
443
444
 
444
445
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
504
505
    this larger than the corresponding packet (query) sent 
505
506
    from client to master.
506
507
  */
507
 
  thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
 
508
  session->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
508
509
 
509
510
  /*
510
511
    We can set log_lock now, it does not move (it's a member of
542
543
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
543
544
                   ST_CREATED_OFFSET+1, (uint32_t) 0);
544
545
         /* send it */
545
 
         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
 
546
         if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
546
547
         {
547
548
           errmsg = "Failed on my_net_write()";
548
549
           my_errno= ER_UNKNOWN_ERROR;
577
578
  /* seek to the requested position, to start the requested dump */
578
579
  my_b_seek(&log, pos);                 // Seek will done on next read
579
580
 
580
 
  while (!net->error && net->vio != 0 && !thd->killed)
 
581
  while (!net->error && net->vio != 0 && !session->killed)
581
582
  {
582
583
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
583
584
    {
596
597
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
597
598
        binlog_can_be_corrupted= false;
598
599
 
599
 
      if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
 
600
      if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
600
601
      {
601
602
        errmsg = "Failed on my_net_write()";
602
603
        my_errno= ER_UNKNOWN_ERROR;
605
606
 
606
607
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
607
608
      {
608
 
        if (send_file(thd))
 
609
        if (send_file(session))
609
610
        {
610
611
          errmsg = "failed in send_file()";
611
612
          my_errno= ER_UNKNOWN_ERROR;
677
678
        case LOG_READ_EOF:
678
679
        {
679
680
          int ret;
680
 
          if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
 
681
          if (session->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
681
682
          {
682
683
            pthread_mutex_unlock(log_lock);
683
684
            goto end;
687
688
          {
688
689
            if (coord)
689
690
            {
690
 
              assert(heartbeat_ts && heartbeat_period != 0LL);
 
691
              assert(heartbeat_ts && heartbeat_period != 0L);
691
692
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
692
693
            }
693
 
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
694
 
            assert(ret == 0 || (heartbeat_period != 0LL && coord != NULL));
 
694
            ret= mysql_bin_log.wait_for_update_bin_log(session, heartbeat_ts);
 
695
            assert(ret == 0 || (heartbeat_period != 0L && coord != NULL));
695
696
            if (ret == ETIMEDOUT || ret == ETIME)
696
697
            {
697
698
              if (send_heartbeat_event(net, packet, coord))
706
707
            {
707
708
              assert(ret == 0);
708
709
            }
709
 
          } while (ret != 0 && coord != NULL && !thd->killed);
 
710
          } while (ret != 0 && coord != NULL && !session->killed);
710
711
          pthread_mutex_unlock(log_lock);
711
712
        }    
712
713
        break;
719
720
 
720
721
        if (read_packet)
721
722
        {
722
 
          thd_proc_info(thd, "Sending binlog event to slave");
723
 
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
 
723
          session->set_proc_info("Sending binlog event to slave");
 
724
          if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) )
724
725
          {
725
726
            errmsg = "Failed on my_net_write()";
726
727
            my_errno= ER_UNKNOWN_ERROR;
729
730
 
730
731
          if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
731
732
          {
732
 
            if (send_file(thd))
 
733
            if (send_file(session))
733
734
            {
734
735
              errmsg = "failed in send_file()";
735
736
              my_errno= ER_UNKNOWN_ERROR;
757
758
      bool loop_breaker = 0;
758
759
      /* need this to break out of the for loop from switch */
759
760
 
760
 
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
 
761
      session->set_proc_info("Finished reading one binlog; switching to next binlog");
761
762
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
762
763
      case LOG_INFO_EOF:
763
764
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
804
805
  end_io_cache(&log);
805
806
  (void)my_close(file, MYF(MY_WME));
806
807
 
807
 
  my_eof(thd);
808
 
  thd_proc_info(thd, "Waiting to finalize termination");
 
808
  my_eof(session);
 
809
  session->set_proc_info("Waiting to finalize termination");
809
810
  pthread_mutex_lock(&LOCK_thread_count);
810
 
  thd->current_linfo = 0;
 
811
  session->current_linfo = 0;
811
812
  pthread_mutex_unlock(&LOCK_thread_count);
812
813
  return;
813
814
 
814
815
err:
815
 
  thd_proc_info(thd, "Waiting to finalize termination");
 
816
  session->set_proc_info("Waiting to finalize termination");
816
817
  end_io_cache(&log);
817
818
  /*
818
819
    Exclude  iteration through thread list
819
820
    this is needed for purge_logs() - it will iterate through
820
 
    thread list and update thd->current_linfo->index_file_offset
 
821
    thread list and update session->current_linfo->index_file_offset
821
822
    this mutex will make sure that it never tried to update our linfo
822
823
    after we return from this stack frame
823
824
  */
824
825
  pthread_mutex_lock(&LOCK_thread_count);
825
 
  thd->current_linfo = 0;
 
826
  session->current_linfo = 0;
826
827
  pthread_mutex_unlock(&LOCK_thread_count);
827
828
  if (file >= 0)
828
829
    (void) my_close(file, MYF(MY_WME));
831
832
  return;
832
833
}
833
834
 
834
 
int start_slave(THD* thd , Master_info* mi,  bool net_report)
 
835
int start_slave(Session* session , Master_info* mi,  bool net_report)
835
836
{
836
837
  int slave_errno= 0;
837
838
  int thread_mask;
845
846
    don't wan't to touch the other thread), so set the bit to 0 for the
846
847
    other thread
847
848
  */
848
 
  if (thd->lex->slave_thd_opt)
849
 
    thread_mask&= thd->lex->slave_thd_opt;
 
849
  if (session->lex->slave_session_opt)
 
850
    thread_mask&= session->lex->slave_session_opt;
850
851
  if (thread_mask) //some threads are stopped, start them
851
852
  {
852
 
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
853
 
                         thread_mask))
 
853
    if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
854
854
      slave_errno=ER_MASTER_INFO;
855
 
    else if (server_id_supplied && *mi->host)
 
855
    else if (server_id_supplied && *mi->getHostname())
856
856
    {
857
857
      /*
858
858
        If we will start SQL thread we will care about UNTIL options If
863
863
      {
864
864
        pthread_mutex_lock(&mi->rli.data_lock);
865
865
 
866
 
        if (thd->lex->mi.pos)
 
866
        if (session->lex->mi.pos)
867
867
        {
868
868
          mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
869
 
          mi->rli.until_log_pos= thd->lex->mi.pos;
 
869
          mi->rli.until_log_pos= session->lex->mi.pos;
870
870
          /*
871
 
             We don't check thd->lex->mi.log_file_name for NULL here
 
871
             We don't check session->lex->mi.log_file_name for NULL here
872
872
             since it is checked in sql_yacc.yy
873
873
          */
874
 
          strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
 
874
          strmake(mi->rli.until_log_name, session->lex->mi.log_file_name,
875
875
                  sizeof(mi->rli.until_log_name)-1);
876
876
        }
877
 
        else if (thd->lex->mi.relay_log_pos)
 
877
        else if (session->lex->mi.relay_log_pos)
878
878
        {
879
879
          mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
880
 
          mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
881
 
          strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
 
880
          mi->rli.until_log_pos= session->lex->mi.relay_log_pos;
 
881
          strmake(mi->rli.until_log_name, session->lex->mi.relay_log_name,
882
882
                  sizeof(mi->rli.until_log_name)-1);
883
883
        }
884
884
        else
910
910
 
911
911
          /* Issuing warning then started without --skip-slave-start */
912
912
          if (!opt_skip_slave_start)
913
 
            push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
913
            push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
914
914
                         ER_MISSING_SKIP_SLAVE,
915
915
                         ER(ER_MISSING_SKIP_SLAVE));
916
916
        }
917
917
 
918
918
        pthread_mutex_unlock(&mi->rli.data_lock);
919
919
      }
920
 
      else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
921
 
        push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
 
920
      else if (session->lex->mi.pos || session->lex->mi.relay_log_pos)
 
921
        push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
922
922
                     ER(ER_UNTIL_COND_IGNORED));
923
923
 
924
924
      if (!slave_errno)
934
934
  else
935
935
  {
936
936
    /* no error if all threads are already started, only a warning */
937
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
 
937
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
938
938
                 ER(ER_SLAVE_WAS_RUNNING));
939
939
  }
940
940
 
947
947
    return(1);
948
948
  }
949
949
  else if (net_report)
950
 
    my_ok(thd);
 
950
    my_ok(session);
951
951
 
952
952
  return(0);
953
953
}
954
954
 
955
955
 
956
 
int stop_slave(THD* thd, Master_info* mi, bool net_report )
 
956
int stop_slave(Session* session, Master_info* mi, bool net_report )
957
957
{
958
958
  int slave_errno;
959
 
  if (!thd)
960
 
    thd = current_thd;
 
959
  if (!session)
 
960
    session = current_session;
961
961
 
962
 
  thd_proc_info(thd, "Killing slave");
 
962
  session->set_proc_info("Killing slave");
963
963
  int thread_mask;
964
964
  lock_slave_threads(mi);
965
965
  // Get a mask of _running_ threads
970
970
    was stopped (as we don't wan't to touch the other thread), so set the
971
971
    bit to 0 for the other thread
972
972
  */
973
 
  if (thd->lex->slave_thd_opt)
974
 
    thread_mask &= thd->lex->slave_thd_opt;
 
973
  if (session->lex->slave_session_opt)
 
974
    thread_mask &= session->lex->slave_session_opt;
975
975
 
976
976
  if (thread_mask)
977
977
  {
982
982
  {
983
983
    //no error if both threads are already stopped, only a warning
984
984
    slave_errno= 0;
985
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
 
985
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
986
986
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
987
987
  }
988
988
  unlock_slave_threads(mi);
989
 
  thd_proc_info(thd, 0);
 
989
  session->set_proc_info(0);
990
990
 
991
991
  if (slave_errno)
992
992
  {
995
995
    return(1);
996
996
  }
997
997
  else if (net_report)
998
 
    my_ok(thd);
 
998
    my_ok(session);
999
999
 
1000
1000
  return(0);
1001
1001
}
1006
1006
 
1007
1007
  SYNOPSIS
1008
1008
    reset_slave()
1009
 
    thd                 Thread handler
 
1009
    session                     Thread handler
1010
1010
    mi                  Master info for the slave
1011
1011
 
1012
1012
  RETURN
1015
1015
*/
1016
1016
 
1017
1017
 
1018
 
int reset_slave(THD *thd, Master_info* mi)
 
1018
int reset_slave(Session *session, Master_info* mi)
1019
1019
{
1020
1020
  struct stat stat_area;
1021
1021
  char fname[FN_REFLEN];
1022
1022
  int thread_mask= 0, error= 0;
1023
 
  uint sql_errno=0;
 
1023
  uint32_t sql_errno=0;
1024
1024
  const char* errmsg=0;
1025
1025
 
1026
1026
  lock_slave_threads(mi);
1032
1032
    goto err;
1033
1033
  }
1034
1034
 
1035
 
  ha_reset_slave(thd);
1036
 
 
1037
1035
  // delete relay logs, clear relay log coordinates
1038
 
  if ((error= purge_relay_logs(&mi->rli, thd,
 
1036
  if ((error= purge_relay_logs(&mi->rli, session,
1039
1037
                               1 /* just reset */,
1040
1038
                               &errmsg)))
1041
1039
    goto err;
1042
1040
 
1043
1041
  /* Clear master's log coordinates */
1044
 
  init_master_log_pos(mi);
 
1042
  mi->reset();
1045
1043
  /*
1046
1044
     Reset errors (the idea is that we forget about the
1047
1045
     old master).
1050
1048
  mi->rli.clear_until_condition();
1051
1049
 
1052
1050
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1053
 
  end_master_info(mi);
 
1051
  mi->end_master_info();
1054
1052
  // and delete these two files
1055
1053
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
1056
1054
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1096
1094
void kill_zombie_dump_threads(uint32_t slave_server_id)
1097
1095
{
1098
1096
  pthread_mutex_lock(&LOCK_thread_count);
1099
 
  I_List_iterator<THD> it(threads);
1100
 
  THD *tmp;
 
1097
  I_List_iterator<Session> it(threads);
 
1098
  Session *tmp;
1101
1099
 
1102
1100
  while ((tmp=it++))
1103
1101
  {
1116
1114
      it will be slow because it will iterate through the list
1117
1115
      again. We just to do kill the thread ourselves.
1118
1116
    */
1119
 
    tmp->awake(THD::KILL_QUERY);
 
1117
    tmp->awake(Session::KILL_QUERY);
1120
1118
    pthread_mutex_unlock(&tmp->LOCK_delete);
1121
1119
  }
1122
1120
}
1123
1121
 
1124
1122
 
1125
 
bool change_master(THD* thd, Master_info* mi)
 
1123
bool change_master(Session* session, Master_info* mi)
1126
1124
{
1127
1125
  int thread_mask;
1128
1126
  const char* errmsg= 0;
1137
1135
    return(true);
1138
1136
  }
1139
1137
 
1140
 
  thd_proc_info(thd, "Changing master");
1141
 
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
 
1138
  session->set_proc_info("Changing master");
 
1139
  LEX_MASTER_INFO* lex_mi= &session->lex->mi;
1142
1140
  // TODO: see if needs re-write
1143
 
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
1144
 
                       thread_mask))
 
1141
  if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
1145
1142
  {
1146
1143
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1147
1144
    unlock_slave_threads(mi);
1160
1157
  */
1161
1158
 
1162
1159
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1163
 
  {
1164
 
    mi->master_log_name[0] = 0;
1165
 
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
1166
 
  }
 
1160
    mi->reset();
1167
1161
 
1168
1162
  if (lex_mi->log_file_name)
1169
 
    strmake(mi->master_log_name, lex_mi->log_file_name,
1170
 
            sizeof(mi->master_log_name)-1);
 
1163
    mi->setLogName(lex_mi->log_file_name);
1171
1164
  if (lex_mi->pos)
1172
1165
  {
1173
 
    mi->master_log_pos= lex_mi->pos;
 
1166
    mi->setLogPosition(lex_mi->pos);
1174
1167
  }
1175
1168
 
1176
1169
  if (lex_mi->host)
1177
 
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
 
1170
    mi->setHost(lex_mi->host, lex_mi->port);
1178
1171
  if (lex_mi->user)
1179
 
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
 
1172
    mi->setUsername(lex_mi->user);
1180
1173
  if (lex_mi->password)
1181
 
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
1182
 
  if (lex_mi->port)
1183
 
    mi->port = lex_mi->port;
 
1174
    mi->setPassword(lex_mi->password);
1184
1175
  if (lex_mi->connect_retry)
1185
1176
    mi->connect_retry = lex_mi->connect_retry;
1186
1177
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1187
1178
    mi->heartbeat_period = lex_mi->heartbeat_period;
1188
1179
  else
1189
 
    mi->heartbeat_period= (float) min((double)SLAVE_MAX_HEARTBEAT_PERIOD,
 
1180
    mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
1190
1181
                                      (slave_net_timeout/2.0));
1191
 
  mi->received_heartbeats= 0LL; // counter lives until master is CHANGEd
1192
 
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1193
 
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
1194
 
 
1195
 
  if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1196
 
    mi->ssl_verify_server_cert=
1197
 
      (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
1198
 
 
1199
 
  if (lex_mi->ssl_ca)
1200
 
    strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
1201
 
  if (lex_mi->ssl_capath)
1202
 
    strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
1203
 
  if (lex_mi->ssl_cert)
1204
 
    strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
1205
 
  if (lex_mi->ssl_cipher)
1206
 
    strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
1207
 
  if (lex_mi->ssl_key)
1208
 
    strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
1209
 
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
1210
 
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
1211
 
      lex_mi->ssl_verify_server_cert )
1212
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1213
 
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
 
1182
  mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
1214
1183
 
1215
1184
  if (lex_mi->relay_log_name)
1216
1185
  {
1217
1186
    need_relay_log_purge= 0;
1218
 
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
1219
 
            sizeof(mi->rli.group_relay_log_name)-1);
1220
 
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
1221
 
            sizeof(mi->rli.event_relay_log_name)-1);
 
1187
    mi->rli.event_relay_log_name.assign(lex_mi->relay_log_name);
1222
1188
  }
1223
1189
 
1224
1190
  if (lex_mi->relay_log_pos)
1247
1213
   {
1248
1214
     /*
1249
1215
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1250
 
       not initialized), so we use a max().
 
1216
       not initialized), so we use a cmax().
1251
1217
       What happens to mi->rli.master_log_pos during the initialization stages
1252
1218
       of replication is not 100% clear, so we guard against problems using
1253
 
       max().
 
1219
       cmax().
1254
1220
      */
1255
 
     mi->master_log_pos = ((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1256
 
                           ? BIN_LOG_HEADER_SIZE
1257
 
                           : mi->rli.group_master_log_pos);
1258
 
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
1259
 
             sizeof(mi->master_log_name)-1);
 
1221
     mi->setLogPosition(((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
 
1222
                         ? BIN_LOG_HEADER_SIZE
 
1223
                         : mi->rli.group_master_log_pos));
 
1224
     mi->setLogName(mi->rli.group_master_log_name.c_str());
1260
1225
  }
1261
1226
  /*
1262
1227
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1263
1228
    a slave before).
1264
1229
  */
1265
 
  if (flush_master_info(mi, 0))
 
1230
  if (mi->flush())
1266
1231
  {
1267
1232
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1268
1233
    unlock_slave_threads(mi);
1271
1236
  if (need_relay_log_purge)
1272
1237
  {
1273
1238
    relay_log_purge= 1;
1274
 
    thd_proc_info(thd, "Purging old relay logs");
1275
 
    if (purge_relay_logs(&mi->rli, thd,
 
1239
    session->set_proc_info("Purging old relay logs");
 
1240
    if (purge_relay_logs(&mi->rli, session,
1276
1241
                         0 /* not only reset, but also reinit */,
1277
1242
                         &errmsg))
1278
1243
    {
1287
1252
    relay_log_purge= 0;
1288
1253
    /* Relay log is already initialized */
1289
1254
    if (init_relay_log_pos(&mi->rli,
1290
 
                           mi->rli.group_relay_log_name,
 
1255
                           mi->rli.group_relay_log_name.c_str(),
1291
1256
                           mi->rli.group_relay_log_pos,
1292
1257
                           0 /*no data lock*/,
1293
1258
                           &msg, 0))
1307
1272
    ''/0: we have lost all copies of the original good coordinates.
1308
1273
    That's why we always save good coords in rli.
1309
1274
  */
1310
 
  mi->rli.group_master_log_pos= mi->master_log_pos;
1311
 
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
1312
 
          sizeof(mi->rli.group_master_log_name)-1);
 
1275
  mi->rli.group_master_log_pos= mi->getLogPosition();
 
1276
  mi->rli.group_master_log_name.assign(mi->getLogName());
1313
1277
 
1314
 
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
1315
 
    mi->rli.group_master_log_pos=0;
 
1278
  if (mi->rli.group_master_log_name.size() == 0) // uninitialized case
 
1279
    mi->rli.group_master_log_pos= 0;
1316
1280
 
1317
1281
  pthread_mutex_lock(&mi->rli.data_lock);
1318
1282
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1331
1295
  pthread_mutex_unlock(&mi->rli.data_lock);
1332
1296
 
1333
1297
  unlock_slave_threads(mi);
1334
 
  thd_proc_info(thd, 0);
1335
 
  my_ok(thd);
 
1298
  session->set_proc_info(0);
 
1299
  my_ok(session);
1336
1300
  return(false);
1337
1301
}
1338
1302
 
1339
 
int reset_master(THD* thd)
 
1303
int reset_master(Session* session)
1340
1304
{
1341
1305
  if (!mysql_bin_log.is_open())
1342
1306
  {
1344
1308
               ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
1345
1309
    return 1;
1346
1310
  }
1347
 
  return mysql_bin_log.reset_logs(thd);
 
1311
  return mysql_bin_log.reset_logs(session);
1348
1312
}
1349
1313
 
1350
1314
int cmp_master_pos(const char* log_file_name1, uint64_t log_pos1,
1351
1315
                   const char* log_file_name2, uint64_t log_pos2)
1352
1316
{
1353
1317
  int res;
1354
 
  uint log_file_name1_len=  strlen(log_file_name1);
1355
 
  uint log_file_name2_len=  strlen(log_file_name2);
 
1318
  uint32_t log_file_name1_len=  strlen(log_file_name1);
 
1319
  uint32_t log_file_name2_len=  strlen(log_file_name2);
1356
1320
 
1357
1321
  //  We assume that both log names match up to '.'
1358
1322
  if (log_file_name1_len == log_file_name2_len)
1365
1329
}
1366
1330
 
1367
1331
 
1368
 
bool mysql_show_binlog_events(THD* thd)
1369
 
{
1370
 
  Protocol *protocol= thd->protocol;
1371
 
  List<Item> field_list;
1372
 
  const char *errmsg= 0;
1373
 
  bool ret= true;
1374
 
  IO_CACHE log;
1375
 
  File file= -1;
1376
 
 
1377
 
  Log_event::init_show_field_list(&field_list);
1378
 
  if (protocol->send_fields(&field_list,
1379
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1380
 
    return(true);
1381
 
 
1382
 
  Format_description_log_event *description_event= new
1383
 
    Format_description_log_event(3); /* MySQL 4.0 by default */
1384
 
 
1385
 
  /*
1386
 
    Wait for handlers to insert any pending information
1387
 
    into the binlog.  For e.g. ndb which updates the binlog asynchronously
1388
 
    this is needed so that the uses sees all its own commands in the binlog
1389
 
  */
1390
 
  ha_binlog_wait(thd);
1391
 
 
1392
 
  if (mysql_bin_log.is_open())
1393
 
  {
1394
 
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1395
 
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
1396
 
    ha_rows event_count, limit_start, limit_end;
1397
 
    my_off_t pos = max((uint64_t)BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1398
 
    char search_file_name[FN_REFLEN], *name;
1399
 
    const char *log_file_name = lex_mi->log_file_name;
1400
 
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1401
 
    LOG_INFO linfo;
1402
 
    Log_event* ev;
1403
 
 
1404
 
    unit->set_limit(thd->lex->current_select);
1405
 
    limit_start= unit->offset_limit_cnt;
1406
 
    limit_end= unit->select_limit_cnt;
1407
 
 
1408
 
    name= search_file_name;
1409
 
    if (log_file_name)
1410
 
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
1411
 
    else
1412
 
      name=0;                                   // Find first log
1413
 
 
1414
 
    linfo.index_file_offset = 0;
1415
 
 
1416
 
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1417
 
    {
1418
 
      errmsg = "Could not find target log";
1419
 
      goto err;
1420
 
    }
1421
 
 
1422
 
    pthread_mutex_lock(&LOCK_thread_count);
1423
 
    thd->current_linfo = &linfo;
1424
 
    pthread_mutex_unlock(&LOCK_thread_count);
1425
 
 
1426
 
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
1427
 
      goto err;
1428
 
 
1429
 
    /*
1430
 
      to account binlog event header size
1431
 
    */
1432
 
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
1433
 
 
1434
 
    pthread_mutex_lock(log_lock);
1435
 
 
1436
 
    /*
1437
 
      open_binlog() sought to position 4.
1438
 
      Read the first event in case it's a Format_description_log_event, to
1439
 
      know the format. If there's no such event, we are 3.23 or 4.x. This
1440
 
      code, like before, can't read 3.23 binlogs.
1441
 
      This code will fail on a mixed relay log (one which has Format_desc then
1442
 
      Rotate then Format_desc).
1443
 
    */
1444
 
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
1445
 
    if (ev)
1446
 
    {
1447
 
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
1448
 
      {
1449
 
        delete description_event;
1450
 
        description_event= (Format_description_log_event*) ev;
1451
 
      }
1452
 
      else
1453
 
        delete ev;
1454
 
    }
1455
 
 
1456
 
    my_b_seek(&log, pos);
1457
 
 
1458
 
    if (!description_event->is_valid())
1459
 
    {
1460
 
      errmsg="Invalid Format_description event; could be out of memory";
1461
 
      goto err;
1462
 
    }
1463
 
 
1464
 
    for (event_count = 0;
1465
 
         (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
1466
 
                                         description_event)); )
1467
 
    {
1468
 
      if (event_count >= limit_start &&
1469
 
          ev->net_send(protocol, linfo.log_file_name, pos))
1470
 
      {
1471
 
        errmsg = "Net error";
1472
 
        delete ev;
1473
 
        pthread_mutex_unlock(log_lock);
1474
 
        goto err;
1475
 
      }
1476
 
 
1477
 
      pos = my_b_tell(&log);
1478
 
      delete ev;
1479
 
 
1480
 
      if (++event_count >= limit_end)
1481
 
        break;
1482
 
    }
1483
 
 
1484
 
    if (event_count < limit_end && log.error)
1485
 
    {
1486
 
      errmsg = "Wrong offset or I/O error";
1487
 
      pthread_mutex_unlock(log_lock);
1488
 
      goto err;
1489
 
    }
1490
 
 
1491
 
    pthread_mutex_unlock(log_lock);
1492
 
  }
1493
 
 
1494
 
  ret= false;
1495
 
 
1496
 
err:
1497
 
  delete description_event;
1498
 
  if (file >= 0)
1499
 
  {
1500
 
    end_io_cache(&log);
1501
 
    (void) my_close(file, MYF(MY_WME));
1502
 
  }
1503
 
 
1504
 
  if (errmsg)
1505
 
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
1506
 
             "SHOW BINLOG EVENTS", errmsg);
1507
 
  else
1508
 
    my_eof(thd);
1509
 
 
1510
 
  pthread_mutex_lock(&LOCK_thread_count);
1511
 
  thd->current_linfo = 0;
1512
 
  pthread_mutex_unlock(&LOCK_thread_count);
1513
 
  return(ret);
1514
 
}
1515
 
 
1516
 
 
1517
 
bool show_binlog_info(THD* thd)
1518
 
{
1519
 
  Protocol *protocol= thd->protocol;
 
1332
bool show_binlog_info(Session* session)
 
1333
{
 
1334
  Protocol *protocol= session->protocol;
1520
1335
  List<Item> field_list;
1521
1336
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1522
1337
  field_list.push_back(new Item_return_int("Position",20,
1541
1356
    if (protocol->write())
1542
1357
      return(true);
1543
1358
  }
1544
 
  my_eof(thd);
 
1359
  my_eof(session);
1545
1360
  return(false);
1546
1361
}
1547
1362
 
1551
1366
 
1552
1367
  SYNOPSIS
1553
1368
    show_binlogs()
1554
 
    thd         Thread specific variable
 
1369
    session             Thread specific variable
1555
1370
 
1556
1371
  RETURN VALUES
1557
1372
    false OK
1558
1373
    true  error
1559
1374
*/
1560
1375
 
1561
 
bool show_binlogs(THD* thd)
 
1376
bool show_binlogs(Session* session)
1562
1377
{
1563
1378
  IO_CACHE *index_file;
1564
1379
  LOG_INFO cur;
1565
1380
  File file;
1566
1381
  char fname[FN_REFLEN];
1567
1382
  List<Item> field_list;
1568
 
  uint length;
 
1383
  uint32_t length;
1569
1384
  int cur_dir_len;
1570
 
  Protocol *protocol= thd->protocol;
 
1385
  Protocol *protocol= session->protocol;
1571
1386
 
1572
1387
  if (!mysql_bin_log.is_open())
1573
1388
  {
1610
1425
    else
1611
1426
    {
1612
1427
      /* this is an old log, open it and find the size */
1613
 
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
 
1428
      if ((file= my_open(fname, O_RDONLY,
1614
1429
                         MYF(0))) >= 0)
1615
1430
      {
1616
1431
        file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));
1622
1437
      goto err;
1623
1438
  }
1624
1439
  mysql_bin_log.unlock_index();
1625
 
  my_eof(thd);
 
1440
  my_eof(session);
1626
1441
  return(false);
1627
1442
 
1628
1443
err:
1642
1457
int log_loaded_block(IO_CACHE* file)
1643
1458
{
1644
1459
  LOAD_FILE_INFO *lf_info;
1645
 
  uint block_len;
 
1460
  uint32_t block_len;
1646
1461
  /* buffer contains position where we started last read */
1647
 
  uchar* buffer= (uchar*) my_b_get_buffer_start(file);
1648
 
  uint max_event_size= current_thd->variables.max_allowed_packet;
 
1462
  unsigned char* buffer= (unsigned char*) my_b_get_buffer_start(file);
 
1463
  uint32_t max_event_size= current_session->variables.max_allowed_packet;
1649
1464
  lf_info= (LOAD_FILE_INFO*) file->arg;
1650
 
  if (lf_info->thd->current_stmt_binlog_row_based)
 
1465
  if (lf_info->session->current_stmt_binlog_row_based)
1651
1466
    return(0);
1652
1467
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1653
1468
      lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1654
1469
    return(0);
1655
1470
  
1656
1471
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1657
 
       buffer += min(block_len, max_event_size),
1658
 
       block_len -= min(block_len, max_event_size))
 
1472
       buffer += cmin(block_len, max_event_size),
 
1473
       block_len -= cmin(block_len, max_event_size))
1659
1474
  {
1660
1475
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1661
1476
    if (lf_info->wrote_create_file)
1662
1477
    {
1663
 
      Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
1664
 
                               min(block_len, max_event_size),
 
1478
      Append_block_log_event a(lf_info->session, lf_info->session->db, buffer,
 
1479
                               cmin(block_len, max_event_size),
1665
1480
                               lf_info->log_delayed);
1666
1481
      mysql_bin_log.write(&a);
1667
1482
    }
1668
1483
    else
1669
1484
    {
1670
 
      Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
 
1485
      Begin_load_query_log_event b(lf_info->session, lf_info->session->db,
1671
1486
                                   buffer,
1672
 
                                   min(block_len, max_event_size),
 
1487
                                   cmin(block_len, max_event_size),
1673
1488
                                   lf_info->log_delayed);
1674
1489
      mysql_bin_log.write(&b);
1675
1490
      lf_info->wrote_create_file= 1;
1688
1503
  sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
1689
1504
    :sys_var(name_arg)
1690
1505
  { chain_sys_var(chain); }
1691
 
  bool check(THD *thd, set_var *var);
1692
 
  bool update(THD *thd, set_var *var);
 
1506
  bool check(Session *session, set_var *var);
 
1507
  bool update(Session *session, set_var *var);
1693
1508
  bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
1694
1509
  /*
1695
1510
    We can't retrieve the value of this, so we don't have to define
1703
1518
  sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
1704
1519
                             ulong *value_ptr)
1705
1520
    :sys_var_long_ptr(chain, name_arg,value_ptr) {}
1706
 
  bool update(THD *thd, set_var *var);
 
1521
  bool update(Session *session, set_var *var);
1707
1522
};
1708
1523
 
1709
 
static void fix_slave_net_timeout(THD *thd,
 
1524
static void fix_slave_net_timeout(Session *session,
1710
1525
                                  enum_var_type type __attribute__((unused)))
1711
1526
{
1712
 
#ifdef HAVE_REPLICATION
1713
1527
  pthread_mutex_lock(&LOCK_active_mi);
1714
1528
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1715
 
    push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1529
    push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1716
1530
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1717
1531
                        "The currect value for master_heartbeat_period"
1718
1532
                        " exceeds the new value of `slave_net_timeout' sec."
1719
1533
                        " A sensible value for the period should be"
1720
1534
                        " less than the timeout.");
1721
1535
  pthread_mutex_unlock(&LOCK_active_mi);
1722
 
#endif
1723
1536
  return;
1724
1537
}
1725
1538
 
1735
1548
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
1736
1549
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
1737
1550
 
1738
 
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
1739
 
 
1740
 
 
1741
 
static int show_slave_skip_errors(THD *thd __attribute__((unused)),
 
1551
static int show_slave_skip_errors(Session *session, SHOW_VAR *var, char *buff);
 
1552
 
 
1553
 
 
1554
static int show_slave_skip_errors(Session *session __attribute__((unused)),
1742
1555
                                  SHOW_VAR *var, char *buff)
1743
1556
{
1744
1557
  var->type=SHOW_CHAR;
1770
1583
    if (var->value != buff)
1771
1584
      buff--;                           // Remove last ','
1772
1585
    if (i < MAX_SLAVE_ERROR)
1773
 
      buff= stpcpy(buff, "...");  // Couldn't show all errors
 
1586
      buff= my_stpcpy(buff, "...");  // Couldn't show all errors
1774
1587
    *buff=0;
1775
1588
  }
1776
1589
  return 0;
1790
1603
  {"slave_skip_errors",       (char*) &show_slave_skip_errors_cont,      SHOW_FUNC},
1791
1604
};
1792
1605
 
1793
 
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((unused)),
 
1606
bool sys_var_slave_skip_counter::check(Session *session __attribute__((unused)),
1794
1607
                                       set_var *var)
1795
1608
{
1796
1609
  int result= 0;
1803
1616
  }
1804
1617
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1805
1618
  pthread_mutex_unlock(&LOCK_active_mi);
1806
 
  var->save_result.ulong_value= (ulong) var->value->val_int();
 
1619
  var->save_result.uint32_t_value= (ulong) var->value->val_int();
1807
1620
  return result;
1808
1621
}
1809
1622
 
1810
1623
 
1811
 
bool sys_var_slave_skip_counter::update(THD *thd __attribute__((unused)),
 
1624
bool sys_var_slave_skip_counter::update(Session *session __attribute__((unused)),
1812
1625
                                        set_var *var)
1813
1626
{
1814
1627
  pthread_mutex_lock(&LOCK_active_mi);
1821
1634
  if (!active_mi->rli.slave_running)
1822
1635
  {
1823
1636
    pthread_mutex_lock(&active_mi->rli.data_lock);
1824
 
    active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
 
1637
    active_mi->rli.slave_skip_counter= var->save_result.uint32_t_value;
1825
1638
    pthread_mutex_unlock(&active_mi->rli.data_lock);
1826
1639
  }
1827
1640
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1830
1643
}
1831
1644
 
1832
1645
 
1833
 
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((unused)),
 
1646
bool sys_var_sync_binlog_period::update(Session *session __attribute__((unused)),
1834
1647
                                        set_var *var)
1835
1648
{
1836
1649
  sync_binlog_period= (uint32_t) var->save_result.uint64_t_value;
1849
1662
  }
1850
1663
  return 0;
1851
1664
}
1852
 
 
1853
 
#endif /* HAVE_REPLICATION */