~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_repl.cc

MergeĀ fromĀ Jim

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
 
16
16
#include <drizzled/server_includes.h>
17
17
 
18
 
#include <drizzled/rpl_mi.h>
19
 
#include <drizzled/sql_repl.h>
20
 
#include <drizzled/log_event.h>
21
 
#include <drizzled/rpl_filter.h>
22
 
#include <drizzled/error.h>
23
 
#include <drizzled/gettext.h>
24
 
#include <drizzled/data_home.h>
 
18
#ifdef HAVE_REPLICATION
 
19
#include "rpl_mi.h"
 
20
#include "sql_repl.h"
 
21
#include "log_event.h"
 
22
#include "rpl_filter.h"
 
23
#include <drizzled/drizzled_error_messages.h>
25
24
 
26
25
int max_binlog_dump_events = 0; // unlimited
27
26
 
54
53
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
55
54
 
56
55
  char* p = log_file_name+dirname_length(log_file_name);
57
 
  uint32_t ident_len = (uint32_t) strlen(p);
58
 
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
 
56
  uint ident_len = (uint) strlen(p);
 
57
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
59
58
  int4store(header + SERVER_ID_OFFSET, server_id);
60
59
  int4store(header + EVENT_LEN_OFFSET, event_len);
61
60
  int2store(header + FLAGS_OFFSET, 0);
67
66
  int8store(buf+R_POS_OFFSET,position);
68
67
  packet->append(buf, ROTATE_HEADER_LEN);
69
68
  packet->append(p,ident_len);
70
 
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
69
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
71
70
  {
72
71
    *errmsg = "failed on my_net_write()";
73
72
    return(-1);
75
74
  return(0);
76
75
}
77
76
 
78
 
static int send_file(Session *session)
 
77
static int send_file(THD *thd)
79
78
{
80
 
  NET* net = &session->net;
 
79
  NET* net = &thd->net;
81
80
  int fd = -1, error = 1;
82
81
  size_t bytes;
83
82
  char fname[FN_REFLEN+1];
84
83
  const char *errmsg = 0;
85
84
  int old_timeout;
86
85
  unsigned long packet_len;
87
 
  unsigned char buf[IO_SIZE];                           // It's safe to alloc this
 
86
  uchar buf[IO_SIZE];                           // It's safe to alloc this
88
87
 
89
88
  /*
90
89
    The client might be slow loading the data, give him wait_timeout to do
91
90
    the job
92
91
  */
93
92
  old_timeout= net->read_timeout;
94
 
  my_net_set_read_timeout(net, session->variables.net_wait_timeout);
 
93
  my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
95
94
 
96
95
  /*
97
96
    We need net_flush here because the client will not know it needs to send
126
125
  }
127
126
 
128
127
 end:
129
 
  if (my_net_write(net, (unsigned char*) "", 0) || net_flush(net) ||
 
128
  if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
130
129
      (my_net_read(net) == packet_error))
131
130
  {
132
131
    errmsg = _("Failed in send_file() while negotiating file transfer close");
140
139
    (void) my_close(fd, MYF(0));
141
140
  if (errmsg)
142
141
  {
143
 
    sql_print_error("%s",errmsg);
 
142
    sql_print_error(errmsg);
144
143
  }
145
144
  return(error);
146
145
}
169
168
 
170
169
void adjust_linfo_offsets(my_off_t purge_offset)
171
170
{
172
 
  Session *tmp;
 
171
  THD *tmp;
173
172
 
174
173
  pthread_mutex_lock(&LOCK_thread_count);
175
 
  I_List_iterator<Session> it(threads);
 
174
  I_List_iterator<THD> it(threads);
176
175
 
177
176
  while ((tmp=it++))
178
177
  {
199
198
bool log_in_use(const char* log_name)
200
199
{
201
200
  int log_name_len = strlen(log_name) + 1;
202
 
  Session *tmp;
 
201
  THD *tmp;
203
202
  bool result = 0;
204
203
 
205
204
  pthread_mutex_lock(&LOCK_thread_count);
206
 
  I_List_iterator<Session> it(threads);
 
205
  I_List_iterator<THD> it(threads);
207
206
 
208
207
  while ((tmp=it++))
209
208
  {
222
221
  return result;
223
222
}
224
223
 
225
 
bool purge_error_message(Session* session, int res)
 
224
bool purge_error_message(THD* thd, int res)
226
225
{
227
 
  uint32_t errmsg= 0;
 
226
  uint errmsg= 0;
228
227
 
229
228
  switch (res)  {
230
229
  case 0: break;
244
243
    my_message(errmsg, ER(errmsg), MYF(0));
245
244
    return true;
246
245
  }
247
 
  my_ok(session);
 
246
  my_ok(thd);
248
247
  return false;
249
248
}
250
249
 
251
250
 
252
 
bool purge_master_logs(Session* session, const char* to_log)
 
251
bool purge_master_logs(THD* thd, const char* to_log)
253
252
{
254
253
  char search_file_name[FN_REFLEN];
255
254
  if (!mysql_bin_log.is_open())
256
255
  {
257
 
    my_ok(session);
 
256
    my_ok(thd);
258
257
    return false;
259
258
  }
260
259
 
261
260
  mysql_bin_log.make_log_name(search_file_name, to_log);
262
 
  return purge_error_message(session,
 
261
  return purge_error_message(thd,
263
262
                             mysql_bin_log.purge_logs(search_file_name, 0, 1,
264
263
                                                      1, NULL));
265
264
}
266
265
 
267
266
 
268
 
bool purge_master_logs_before_date(Session* session, time_t purge_time)
 
267
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
269
268
{
270
269
  if (!mysql_bin_log.is_open())
271
270
  {
272
 
    my_ok(session);
 
271
    my_ok(thd);
273
272
    return 0;
274
273
  }
275
 
  return purge_error_message(session,
 
274
  return purge_error_message(thd,
276
275
                             mysql_bin_log.purge_logs_before_date(purge_time));
277
276
}
278
277
 
310
309
  An auxiliary function for calling in mysql_binlog_send
311
310
  to initialize the heartbeat timeout in waiting for a binlogged event.
312
311
 
313
 
  @param[in]    session  Session to access a user variable
 
312
  @param[in]    thd  THD to access a user variable
314
313
 
315
314
  @return        heartbeat period an uint64_t of nanoseconds
316
315
                 or zero if heartbeat was not demanded by slave
317
316
*/ 
318
 
static uint64_t get_heartbeat_period(Session * session)
 
317
static uint64_t get_heartbeat_period(THD * thd)
319
318
{
320
 
  bool null_value;
 
319
  my_bool null_value;
321
320
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
322
321
  user_var_entry *entry= 
323
 
    (user_var_entry*) hash_search(&session->user_vars, (unsigned char*) name.str,
 
322
    (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
324
323
                                  name.length);
325
324
  return entry? entry->val_int(&null_value) : 0;
326
325
}
328
327
/*
329
328
  Function prepares and sends repliation heartbeat event.
330
329
 
331
 
  @param net                net object of Session
 
330
  @param net                net object of THD
332
331
  @param packet             buffer to store the heartbeat instance
333
332
  @param event_coordinates  binlog file name and position of the last
334
333
                            real event master sent from binlog
353
352
 
354
353
  char* p= coord->file_name + dirname_length(coord->file_name);
355
354
 
356
 
  uint32_t ident_len = strlen(p);
357
 
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN;
 
355
  uint ident_len = strlen(p);
 
356
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
358
357
  int4store(header + SERVER_ID_OFFSET, server_id);
359
358
  int4store(header + EVENT_LEN_OFFSET, event_len);
360
359
  int2store(header + FLAGS_OFFSET, 0);
364
363
  packet->append(header, sizeof(header));
365
364
  packet->append(p, ident_len);             // log_file_name
366
365
 
367
 
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) ||
 
366
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
368
367
      net_flush(net))
369
368
  {
370
369
    return(-1);
377
376
  TODO: Clean up loop to only have one call to send_file()
378
377
*/
379
378
 
380
 
void mysql_binlog_send(Session* session, char* log_ident, my_off_t pos,
381
 
                       uint16_t flags)
 
379
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
 
380
                       ushort flags)
382
381
{
383
382
  LOG_INFO linfo;
384
383
  char *log_file_name = linfo.log_file_name;
385
384
  char search_file_name[FN_REFLEN], *name;
386
385
  IO_CACHE log;
387
386
  File file = -1;
388
 
  String* packet = &session->packet;
 
387
  String* packet = &thd->packet;
389
388
  int error;
390
389
  const char *errmsg = "Unknown error";
391
 
  NET* net = &session->net;
 
390
  NET* net = &thd->net;
392
391
  pthread_mutex_t *log_lock;
393
392
  bool binlog_can_be_corrupted= false;
394
393
 
396
395
  /* 
397
396
     heartbeat_period from @master_heartbeat_period user variable
398
397
  */
399
 
  uint64_t heartbeat_period= get_heartbeat_period(session);
 
398
  uint64_t heartbeat_period= get_heartbeat_period(thd);
400
399
  struct timespec heartbeat_buf;
401
400
  struct event_coordinates coord_buf;
402
401
  struct timespec *heartbeat_ts= NULL;
403
402
  struct event_coordinates *coord= NULL;
404
 
  if (heartbeat_period != 0L)
 
403
  if (heartbeat_period != 0LL)
405
404
  {
406
405
    heartbeat_ts= &heartbeat_buf;
407
406
    set_timespec_nsec(*heartbeat_ts, 0);
439
438
  }
440
439
 
441
440
  pthread_mutex_lock(&LOCK_thread_count);
442
 
  session->current_linfo = &linfo;
 
441
  thd->current_linfo = &linfo;
443
442
  pthread_mutex_unlock(&LOCK_thread_count);
444
443
 
445
444
  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
505
504
    this larger than the corresponding packet (query) sent 
506
505
    from client to master.
507
506
  */
508
 
  session->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
 
507
  thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
509
508
 
510
509
  /*
511
510
    We can set log_lock now, it does not move (it's a member of
541
540
           to avoid destroying temp tables.
542
541
          */
543
542
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
544
 
                   ST_CREATED_OFFSET+1, (uint32_t) 0);
 
543
                   ST_CREATED_OFFSET+1, (ulong) 0);
545
544
         /* send it */
546
 
         if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
545
         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
547
546
         {
548
547
           errmsg = "Failed on my_net_write()";
549
548
           my_errno= ER_UNKNOWN_ERROR;
578
577
  /* seek to the requested position, to start the requested dump */
579
578
  my_b_seek(&log, pos);                 // Seek will done on next read
580
579
 
581
 
  while (!net->error && net->vio != 0 && !session->killed)
 
580
  while (!net->error && net->vio != 0 && !thd->killed)
582
581
  {
583
582
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
584
583
    {
597
596
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
598
597
        binlog_can_be_corrupted= false;
599
598
 
600
 
      if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
599
      if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
601
600
      {
602
601
        errmsg = "Failed on my_net_write()";
603
602
        my_errno= ER_UNKNOWN_ERROR;
606
605
 
607
606
      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
608
607
      {
609
 
        if (send_file(session))
 
608
        if (send_file(thd))
610
609
        {
611
610
          errmsg = "failed in send_file()";
612
611
          my_errno= ER_UNKNOWN_ERROR;
678
677
        case LOG_READ_EOF:
679
678
        {
680
679
          int ret;
681
 
          if (session->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
 
680
          if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
682
681
          {
683
682
            pthread_mutex_unlock(log_lock);
684
683
            goto end;
688
687
          {
689
688
            if (coord)
690
689
            {
691
 
              assert(heartbeat_ts && heartbeat_period != 0L);
 
690
              assert(heartbeat_ts && heartbeat_period != 0LL);
692
691
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
693
692
            }
694
 
            ret= mysql_bin_log.wait_for_update_bin_log(session, heartbeat_ts);
695
 
            assert(ret == 0 || (heartbeat_period != 0L && coord != NULL));
 
693
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
 
694
            assert(ret == 0 || (heartbeat_period != 0LL && coord != NULL));
696
695
            if (ret == ETIMEDOUT || ret == ETIME)
697
696
            {
698
697
              if (send_heartbeat_event(net, packet, coord))
707
706
            {
708
707
              assert(ret == 0);
709
708
            }
710
 
          } while (ret != 0 && coord != NULL && !session->killed);
 
709
          } while (ret != 0 && coord != NULL && !thd->killed);
711
710
          pthread_mutex_unlock(log_lock);
712
711
        }    
713
712
        break;
720
719
 
721
720
        if (read_packet)
722
721
        {
723
 
          session->set_proc_info("Sending binlog event to slave");
724
 
          if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) )
 
722
          thd_proc_info(thd, "Sending binlog event to slave");
 
723
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
725
724
          {
726
725
            errmsg = "Failed on my_net_write()";
727
726
            my_errno= ER_UNKNOWN_ERROR;
730
729
 
731
730
          if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
732
731
          {
733
 
            if (send_file(session))
 
732
            if (send_file(thd))
734
733
            {
735
734
              errmsg = "failed in send_file()";
736
735
              my_errno= ER_UNKNOWN_ERROR;
758
757
      bool loop_breaker = 0;
759
758
      /* need this to break out of the for loop from switch */
760
759
 
761
 
      session->set_proc_info("Finished reading one binlog; switching to next binlog");
 
760
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
762
761
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
763
762
      case LOG_INFO_EOF:
764
763
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
805
804
  end_io_cache(&log);
806
805
  (void)my_close(file, MYF(MY_WME));
807
806
 
808
 
  my_eof(session);
809
 
  session->set_proc_info("Waiting to finalize termination");
 
807
  my_eof(thd);
 
808
  thd_proc_info(thd, "Waiting to finalize termination");
810
809
  pthread_mutex_lock(&LOCK_thread_count);
811
 
  session->current_linfo = 0;
 
810
  thd->current_linfo = 0;
812
811
  pthread_mutex_unlock(&LOCK_thread_count);
813
812
  return;
814
813
 
815
814
err:
816
 
  session->set_proc_info("Waiting to finalize termination");
 
815
  thd_proc_info(thd, "Waiting to finalize termination");
817
816
  end_io_cache(&log);
818
817
  /*
819
818
    Exclude  iteration through thread list
820
819
    this is needed for purge_logs() - it will iterate through
821
 
    thread list and update session->current_linfo->index_file_offset
 
820
    thread list and update thd->current_linfo->index_file_offset
822
821
    this mutex will make sure that it never tried to update our linfo
823
822
    after we return from this stack frame
824
823
  */
825
824
  pthread_mutex_lock(&LOCK_thread_count);
826
 
  session->current_linfo = 0;
 
825
  thd->current_linfo = 0;
827
826
  pthread_mutex_unlock(&LOCK_thread_count);
828
827
  if (file >= 0)
829
828
    (void) my_close(file, MYF(MY_WME));
832
831
  return;
833
832
}
834
833
 
835
 
int start_slave(Session* session , Master_info* mi,  bool net_report)
 
834
int start_slave(THD* thd , Master_info* mi,  bool net_report)
836
835
{
837
836
  int slave_errno= 0;
838
837
  int thread_mask;
846
845
    don't wan't to touch the other thread), so set the bit to 0 for the
847
846
    other thread
848
847
  */
849
 
  if (session->lex->slave_session_opt)
850
 
    thread_mask&= session->lex->slave_session_opt;
 
848
  if (thd->lex->slave_thd_opt)
 
849
    thread_mask&= thd->lex->slave_thd_opt;
851
850
  if (thread_mask) //some threads are stopped, start them
852
851
  {
853
 
    if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
 
852
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
 
853
                         thread_mask))
854
854
      slave_errno=ER_MASTER_INFO;
855
 
    else if (server_id_supplied && *mi->getHostname())
 
855
    else if (server_id_supplied && *mi->host)
856
856
    {
857
857
      /*
858
858
        If we will start SQL thread we will care about UNTIL options If
863
863
      {
864
864
        pthread_mutex_lock(&mi->rli.data_lock);
865
865
 
866
 
        if (session->lex->mi.pos)
 
866
        if (thd->lex->mi.pos)
867
867
        {
868
868
          mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
869
 
          mi->rli.until_log_pos= session->lex->mi.pos;
 
869
          mi->rli.until_log_pos= thd->lex->mi.pos;
870
870
          /*
871
 
             We don't check session->lex->mi.log_file_name for NULL here
 
871
             We don't check thd->lex->mi.log_file_name for NULL here
872
872
             since it is checked in sql_yacc.yy
873
873
          */
874
 
          strmake(mi->rli.until_log_name, session->lex->mi.log_file_name,
 
874
          strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
875
875
                  sizeof(mi->rli.until_log_name)-1);
876
876
        }
877
 
        else if (session->lex->mi.relay_log_pos)
 
877
        else if (thd->lex->mi.relay_log_pos)
878
878
        {
879
879
          mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
880
 
          mi->rli.until_log_pos= session->lex->mi.relay_log_pos;
881
 
          strmake(mi->rli.until_log_name, session->lex->mi.relay_log_name,
 
880
          mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
 
881
          strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
882
882
                  sizeof(mi->rli.until_log_name)-1);
883
883
        }
884
884
        else
910
910
 
911
911
          /* Issuing warning then started without --skip-slave-start */
912
912
          if (!opt_skip_slave_start)
913
 
            push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
913
            push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
914
914
                         ER_MISSING_SKIP_SLAVE,
915
915
                         ER(ER_MISSING_SKIP_SLAVE));
916
916
        }
917
917
 
918
918
        pthread_mutex_unlock(&mi->rli.data_lock);
919
919
      }
920
 
      else if (session->lex->mi.pos || session->lex->mi.relay_log_pos)
921
 
        push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
 
920
      else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
 
921
        push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
922
922
                     ER(ER_UNTIL_COND_IGNORED));
923
923
 
924
924
      if (!slave_errno)
934
934
  else
935
935
  {
936
936
    /* no error if all threads are already started, only a warning */
937
 
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
 
937
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
938
938
                 ER(ER_SLAVE_WAS_RUNNING));
939
939
  }
940
940
 
947
947
    return(1);
948
948
  }
949
949
  else if (net_report)
950
 
    my_ok(session);
 
950
    my_ok(thd);
951
951
 
952
952
  return(0);
953
953
}
954
954
 
955
955
 
956
 
int stop_slave(Session* session, Master_info* mi, bool net_report )
 
956
int stop_slave(THD* thd, Master_info* mi, bool net_report )
957
957
{
958
958
  int slave_errno;
959
 
  if (!session)
960
 
    session = current_session;
 
959
  if (!thd)
 
960
    thd = current_thd;
961
961
 
962
 
  session->set_proc_info("Killing slave");
 
962
  thd_proc_info(thd, "Killing slave");
963
963
  int thread_mask;
964
964
  lock_slave_threads(mi);
965
965
  // Get a mask of _running_ threads
970
970
    was stopped (as we don't wan't to touch the other thread), so set the
971
971
    bit to 0 for the other thread
972
972
  */
973
 
  if (session->lex->slave_session_opt)
974
 
    thread_mask &= session->lex->slave_session_opt;
 
973
  if (thd->lex->slave_thd_opt)
 
974
    thread_mask &= thd->lex->slave_thd_opt;
975
975
 
976
976
  if (thread_mask)
977
977
  {
982
982
  {
983
983
    //no error if both threads are already stopped, only a warning
984
984
    slave_errno= 0;
985
 
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
 
985
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
986
986
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
987
987
  }
988
988
  unlock_slave_threads(mi);
989
 
  session->set_proc_info(0);
 
989
  thd_proc_info(thd, 0);
990
990
 
991
991
  if (slave_errno)
992
992
  {
995
995
    return(1);
996
996
  }
997
997
  else if (net_report)
998
 
    my_ok(session);
 
998
    my_ok(thd);
999
999
 
1000
1000
  return(0);
1001
1001
}
1006
1006
 
1007
1007
  SYNOPSIS
1008
1008
    reset_slave()
1009
 
    session                     Thread handler
 
1009
    thd                 Thread handler
1010
1010
    mi                  Master info for the slave
1011
1011
 
1012
1012
  RETURN
1015
1015
*/
1016
1016
 
1017
1017
 
1018
 
int reset_slave(Session *session, Master_info* mi)
 
1018
int reset_slave(THD *thd, Master_info* mi)
1019
1019
{
1020
1020
  struct stat stat_area;
1021
1021
  char fname[FN_REFLEN];
1022
1022
  int thread_mask= 0, error= 0;
1023
 
  uint32_t sql_errno=0;
 
1023
  uint sql_errno=0;
1024
1024
  const char* errmsg=0;
1025
1025
 
1026
1026
  lock_slave_threads(mi);
1032
1032
    goto err;
1033
1033
  }
1034
1034
 
 
1035
  ha_reset_slave(thd);
 
1036
 
1035
1037
  // delete relay logs, clear relay log coordinates
1036
 
  if ((error= purge_relay_logs(&mi->rli, session,
 
1038
  if ((error= purge_relay_logs(&mi->rli, thd,
1037
1039
                               1 /* just reset */,
1038
1040
                               &errmsg)))
1039
1041
    goto err;
1040
1042
 
1041
1043
  /* Clear master's log coordinates */
1042
 
  mi->reset();
 
1044
  init_master_log_pos(mi);
1043
1045
  /*
1044
1046
     Reset errors (the idea is that we forget about the
1045
1047
     old master).
1048
1050
  mi->rli.clear_until_condition();
1049
1051
 
1050
1052
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1051
 
  mi->end_master_info();
 
1053
  end_master_info(mi);
1052
1054
  // and delete these two files
1053
1055
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
1054
1056
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1094
1096
void kill_zombie_dump_threads(uint32_t slave_server_id)
1095
1097
{
1096
1098
  pthread_mutex_lock(&LOCK_thread_count);
1097
 
  I_List_iterator<Session> it(threads);
1098
 
  Session *tmp;
 
1099
  I_List_iterator<THD> it(threads);
 
1100
  THD *tmp;
1099
1101
 
1100
1102
  while ((tmp=it++))
1101
1103
  {
1114
1116
      it will be slow because it will iterate through the list
1115
1117
      again. We just to do kill the thread ourselves.
1116
1118
    */
1117
 
    tmp->awake(Session::KILL_QUERY);
 
1119
    tmp->awake(THD::KILL_QUERY);
1118
1120
    pthread_mutex_unlock(&tmp->LOCK_delete);
1119
1121
  }
1120
1122
}
1121
1123
 
1122
1124
 
1123
 
bool change_master(Session* session, Master_info* mi)
 
1125
bool change_master(THD* thd, Master_info* mi)
1124
1126
{
1125
1127
  int thread_mask;
1126
1128
  const char* errmsg= 0;
1135
1137
    return(true);
1136
1138
  }
1137
1139
 
1138
 
  session->set_proc_info("Changing master");
1139
 
  LEX_MASTER_INFO* lex_mi= &session->lex->mi;
 
1140
  thd_proc_info(thd, "Changing master");
 
1141
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1140
1142
  // TODO: see if needs re-write
1141
 
  if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
 
1143
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
 
1144
                       thread_mask))
1142
1145
  {
1143
1146
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1144
1147
    unlock_slave_threads(mi);
1157
1160
  */
1158
1161
 
1159
1162
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1160
 
    mi->reset();
 
1163
  {
 
1164
    mi->master_log_name[0] = 0;
 
1165
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
 
1166
  }
1161
1167
 
1162
1168
  if (lex_mi->log_file_name)
1163
 
    mi->setLogName(lex_mi->log_file_name);
 
1169
    strmake(mi->master_log_name, lex_mi->log_file_name,
 
1170
            sizeof(mi->master_log_name)-1);
1164
1171
  if (lex_mi->pos)
1165
1172
  {
1166
 
    mi->setLogPosition(lex_mi->pos);
 
1173
    mi->master_log_pos= lex_mi->pos;
1167
1174
  }
1168
1175
 
1169
1176
  if (lex_mi->host)
1170
 
    mi->setHost(lex_mi->host, lex_mi->port);
 
1177
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1171
1178
  if (lex_mi->user)
1172
 
    mi->setUsername(lex_mi->user);
 
1179
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
1173
1180
  if (lex_mi->password)
1174
 
    mi->setPassword(lex_mi->password);
 
1181
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
 
1182
  if (lex_mi->port)
 
1183
    mi->port = lex_mi->port;
1175
1184
  if (lex_mi->connect_retry)
1176
1185
    mi->connect_retry = lex_mi->connect_retry;
1177
1186
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1178
1187
    mi->heartbeat_period = lex_mi->heartbeat_period;
1179
1188
  else
1180
 
    mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
 
1189
    mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
1181
1190
                                      (slave_net_timeout/2.0));
1182
 
  mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
 
1191
  mi->received_heartbeats= 0LL; // counter lives until master is CHANGEd
 
1192
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1193
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
1194
 
 
1195
  if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1196
    mi->ssl_verify_server_cert=
 
1197
      (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
1198
 
 
1199
  if (lex_mi->ssl_ca)
 
1200
    strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
 
1201
  if (lex_mi->ssl_capath)
 
1202
    strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
 
1203
  if (lex_mi->ssl_cert)
 
1204
    strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
 
1205
  if (lex_mi->ssl_cipher)
 
1206
    strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
 
1207
  if (lex_mi->ssl_key)
 
1208
    strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
 
1209
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
 
1210
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
 
1211
      lex_mi->ssl_verify_server_cert )
 
1212
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
1213
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1183
1214
 
1184
1215
  if (lex_mi->relay_log_name)
1185
1216
  {
1186
1217
    need_relay_log_purge= 0;
1187
 
    mi->rli.event_relay_log_name.assign(lex_mi->relay_log_name);
 
1218
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
 
1219
            sizeof(mi->rli.group_relay_log_name)-1);
 
1220
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
 
1221
            sizeof(mi->rli.event_relay_log_name)-1);
1188
1222
  }
1189
1223
 
1190
1224
  if (lex_mi->relay_log_pos)
1213
1247
   {
1214
1248
     /*
1215
1249
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1216
 
       not initialized), so we use a cmax().
 
1250
       not initialized), so we use a max().
1217
1251
       What happens to mi->rli.master_log_pos during the initialization stages
1218
1252
       of replication is not 100% clear, so we guard against problems using
1219
 
       cmax().
 
1253
       max().
1220
1254
      */
1221
 
     mi->setLogPosition(((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1222
 
                         ? BIN_LOG_HEADER_SIZE
1223
 
                         : mi->rli.group_master_log_pos));
1224
 
     mi->setLogName(mi->rli.group_master_log_name.c_str());
 
1255
     mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
 
1256
                              mi->rli.group_master_log_pos);
 
1257
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
 
1258
             sizeof(mi->master_log_name)-1);
1225
1259
  }
1226
1260
  /*
1227
1261
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1228
1262
    a slave before).
1229
1263
  */
1230
 
  if (mi->flush())
 
1264
  if (flush_master_info(mi, 0))
1231
1265
  {
1232
1266
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1233
1267
    unlock_slave_threads(mi);
1236
1270
  if (need_relay_log_purge)
1237
1271
  {
1238
1272
    relay_log_purge= 1;
1239
 
    session->set_proc_info("Purging old relay logs");
1240
 
    if (purge_relay_logs(&mi->rli, session,
 
1273
    thd_proc_info(thd, "Purging old relay logs");
 
1274
    if (purge_relay_logs(&mi->rli, thd,
1241
1275
                         0 /* not only reset, but also reinit */,
1242
1276
                         &errmsg))
1243
1277
    {
1252
1286
    relay_log_purge= 0;
1253
1287
    /* Relay log is already initialized */
1254
1288
    if (init_relay_log_pos(&mi->rli,
1255
 
                           mi->rli.group_relay_log_name.c_str(),
 
1289
                           mi->rli.group_relay_log_name,
1256
1290
                           mi->rli.group_relay_log_pos,
1257
1291
                           0 /*no data lock*/,
1258
1292
                           &msg, 0))
1272
1306
    ''/0: we have lost all copies of the original good coordinates.
1273
1307
    That's why we always save good coords in rli.
1274
1308
  */
1275
 
  mi->rli.group_master_log_pos= mi->getLogPosition();
1276
 
  mi->rli.group_master_log_name.assign(mi->getLogName());
 
1309
  mi->rli.group_master_log_pos= mi->master_log_pos;
 
1310
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
 
1311
          sizeof(mi->rli.group_master_log_name)-1);
1277
1312
 
1278
 
  if (mi->rli.group_master_log_name.size() == 0) // uninitialized case
1279
 
    mi->rli.group_master_log_pos= 0;
 
1313
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
 
1314
    mi->rli.group_master_log_pos=0;
1280
1315
 
1281
1316
  pthread_mutex_lock(&mi->rli.data_lock);
1282
1317
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1295
1330
  pthread_mutex_unlock(&mi->rli.data_lock);
1296
1331
 
1297
1332
  unlock_slave_threads(mi);
1298
 
  session->set_proc_info(0);
1299
 
  my_ok(session);
 
1333
  thd_proc_info(thd, 0);
 
1334
  my_ok(thd);
1300
1335
  return(false);
1301
1336
}
1302
1337
 
1303
 
int reset_master(Session* session)
 
1338
int reset_master(THD* thd)
1304
1339
{
1305
1340
  if (!mysql_bin_log.is_open())
1306
1341
  {
1308
1343
               ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
1309
1344
    return 1;
1310
1345
  }
1311
 
  return mysql_bin_log.reset_logs(session);
 
1346
  return mysql_bin_log.reset_logs(thd);
1312
1347
}
1313
1348
 
1314
1349
int cmp_master_pos(const char* log_file_name1, uint64_t log_pos1,
1315
1350
                   const char* log_file_name2, uint64_t log_pos2)
1316
1351
{
1317
1352
  int res;
1318
 
  uint32_t log_file_name1_len=  strlen(log_file_name1);
1319
 
  uint32_t log_file_name2_len=  strlen(log_file_name2);
 
1353
  uint log_file_name1_len=  strlen(log_file_name1);
 
1354
  uint log_file_name2_len=  strlen(log_file_name2);
1320
1355
 
1321
1356
  //  We assume that both log names match up to '.'
1322
1357
  if (log_file_name1_len == log_file_name2_len)
1329
1364
}
1330
1365
 
1331
1366
 
1332
 
bool show_binlog_info(Session* session)
1333
 
{
1334
 
  Protocol *protocol= session->protocol;
 
1367
bool mysql_show_binlog_events(THD* thd)
 
1368
{
 
1369
  Protocol *protocol= thd->protocol;
 
1370
  List<Item> field_list;
 
1371
  const char *errmsg= 0;
 
1372
  bool ret= true;
 
1373
  IO_CACHE log;
 
1374
  File file= -1;
 
1375
 
 
1376
  Log_event::init_show_field_list(&field_list);
 
1377
  if (protocol->send_fields(&field_list,
 
1378
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1379
    return(true);
 
1380
 
 
1381
  Format_description_log_event *description_event= new
 
1382
    Format_description_log_event(3); /* MySQL 4.0 by default */
 
1383
 
 
1384
  /*
 
1385
    Wait for handlers to insert any pending information
 
1386
    into the binlog.  For e.g. ndb which updates the binlog asynchronously
 
1387
    this is needed so that the uses sees all its own commands in the binlog
 
1388
  */
 
1389
  ha_binlog_wait(thd);
 
1390
 
 
1391
  if (mysql_bin_log.is_open())
 
1392
  {
 
1393
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
 
1394
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
 
1395
    ha_rows event_count, limit_start, limit_end;
 
1396
    my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
 
1397
    char search_file_name[FN_REFLEN], *name;
 
1398
    const char *log_file_name = lex_mi->log_file_name;
 
1399
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
 
1400
    LOG_INFO linfo;
 
1401
    Log_event* ev;
 
1402
 
 
1403
    unit->set_limit(thd->lex->current_select);
 
1404
    limit_start= unit->offset_limit_cnt;
 
1405
    limit_end= unit->select_limit_cnt;
 
1406
 
 
1407
    name= search_file_name;
 
1408
    if (log_file_name)
 
1409
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
 
1410
    else
 
1411
      name=0;                                   // Find first log
 
1412
 
 
1413
    linfo.index_file_offset = 0;
 
1414
 
 
1415
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
 
1416
    {
 
1417
      errmsg = "Could not find target log";
 
1418
      goto err;
 
1419
    }
 
1420
 
 
1421
    pthread_mutex_lock(&LOCK_thread_count);
 
1422
    thd->current_linfo = &linfo;
 
1423
    pthread_mutex_unlock(&LOCK_thread_count);
 
1424
 
 
1425
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
 
1426
      goto err;
 
1427
 
 
1428
    /*
 
1429
      to account binlog event header size
 
1430
    */
 
1431
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
 
1432
 
 
1433
    pthread_mutex_lock(log_lock);
 
1434
 
 
1435
    /*
 
1436
      open_binlog() sought to position 4.
 
1437
      Read the first event in case it's a Format_description_log_event, to
 
1438
      know the format. If there's no such event, we are 3.23 or 4.x. This
 
1439
      code, like before, can't read 3.23 binlogs.
 
1440
      This code will fail on a mixed relay log (one which has Format_desc then
 
1441
      Rotate then Format_desc).
 
1442
    */
 
1443
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
 
1444
    if (ev)
 
1445
    {
 
1446
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
 
1447
      {
 
1448
        delete description_event;
 
1449
        description_event= (Format_description_log_event*) ev;
 
1450
      }
 
1451
      else
 
1452
        delete ev;
 
1453
    }
 
1454
 
 
1455
    my_b_seek(&log, pos);
 
1456
 
 
1457
    if (!description_event->is_valid())
 
1458
    {
 
1459
      errmsg="Invalid Format_description event; could be out of memory";
 
1460
      goto err;
 
1461
    }
 
1462
 
 
1463
    for (event_count = 0;
 
1464
         (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
 
1465
                                         description_event)); )
 
1466
    {
 
1467
      if (event_count >= limit_start &&
 
1468
          ev->net_send(protocol, linfo.log_file_name, pos))
 
1469
      {
 
1470
        errmsg = "Net error";
 
1471
        delete ev;
 
1472
        pthread_mutex_unlock(log_lock);
 
1473
        goto err;
 
1474
      }
 
1475
 
 
1476
      pos = my_b_tell(&log);
 
1477
      delete ev;
 
1478
 
 
1479
      if (++event_count >= limit_end)
 
1480
        break;
 
1481
    }
 
1482
 
 
1483
    if (event_count < limit_end && log.error)
 
1484
    {
 
1485
      errmsg = "Wrong offset or I/O error";
 
1486
      pthread_mutex_unlock(log_lock);
 
1487
      goto err;
 
1488
    }
 
1489
 
 
1490
    pthread_mutex_unlock(log_lock);
 
1491
  }
 
1492
 
 
1493
  ret= false;
 
1494
 
 
1495
err:
 
1496
  delete description_event;
 
1497
  if (file >= 0)
 
1498
  {
 
1499
    end_io_cache(&log);
 
1500
    (void) my_close(file, MYF(MY_WME));
 
1501
  }
 
1502
 
 
1503
  if (errmsg)
 
1504
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
 
1505
             "SHOW BINLOG EVENTS", errmsg);
 
1506
  else
 
1507
    my_eof(thd);
 
1508
 
 
1509
  pthread_mutex_lock(&LOCK_thread_count);
 
1510
  thd->current_linfo = 0;
 
1511
  pthread_mutex_unlock(&LOCK_thread_count);
 
1512
  return(ret);
 
1513
}
 
1514
 
 
1515
 
 
1516
bool show_binlog_info(THD* thd)
 
1517
{
 
1518
  Protocol *protocol= thd->protocol;
1335
1519
  List<Item> field_list;
1336
1520
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1337
1521
  field_list.push_back(new Item_return_int("Position",20,
1356
1540
    if (protocol->write())
1357
1541
      return(true);
1358
1542
  }
1359
 
  my_eof(session);
 
1543
  my_eof(thd);
1360
1544
  return(false);
1361
1545
}
1362
1546
 
1366
1550
 
1367
1551
  SYNOPSIS
1368
1552
    show_binlogs()
1369
 
    session             Thread specific variable
 
1553
    thd         Thread specific variable
1370
1554
 
1371
1555
  RETURN VALUES
1372
1556
    false OK
1373
1557
    true  error
1374
1558
*/
1375
1559
 
1376
 
bool show_binlogs(Session* session)
 
1560
bool show_binlogs(THD* thd)
1377
1561
{
1378
1562
  IO_CACHE *index_file;
1379
1563
  LOG_INFO cur;
1380
1564
  File file;
1381
1565
  char fname[FN_REFLEN];
1382
1566
  List<Item> field_list;
1383
 
  uint32_t length;
 
1567
  uint length;
1384
1568
  int cur_dir_len;
1385
 
  Protocol *protocol= session->protocol;
 
1569
  Protocol *protocol= thd->protocol;
1386
1570
 
1387
1571
  if (!mysql_bin_log.is_open())
1388
1572
  {
1425
1609
    else
1426
1610
    {
1427
1611
      /* this is an old log, open it and find the size */
1428
 
      if ((file= my_open(fname, O_RDONLY,
 
1612
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
1429
1613
                         MYF(0))) >= 0)
1430
1614
      {
1431
1615
        file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));
1437
1621
      goto err;
1438
1622
  }
1439
1623
  mysql_bin_log.unlock_index();
1440
 
  my_eof(session);
 
1624
  my_eof(thd);
1441
1625
  return(false);
1442
1626
 
1443
1627
err:
1457
1641
int log_loaded_block(IO_CACHE* file)
1458
1642
{
1459
1643
  LOAD_FILE_INFO *lf_info;
1460
 
  uint32_t block_len;
 
1644
  uint block_len;
1461
1645
  /* buffer contains position where we started last read */
1462
 
  unsigned char* buffer= (unsigned char*) my_b_get_buffer_start(file);
1463
 
  uint32_t max_event_size= current_session->variables.max_allowed_packet;
 
1646
  uchar* buffer= (uchar*) my_b_get_buffer_start(file);
 
1647
  uint max_event_size= current_thd->variables.max_allowed_packet;
1464
1648
  lf_info= (LOAD_FILE_INFO*) file->arg;
1465
 
  if (lf_info->session->current_stmt_binlog_row_based)
 
1649
  if (lf_info->thd->current_stmt_binlog_row_based)
1466
1650
    return(0);
1467
1651
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1468
1652
      lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1469
1653
    return(0);
1470
1654
  
1471
1655
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1472
 
       buffer += cmin(block_len, max_event_size),
1473
 
       block_len -= cmin(block_len, max_event_size))
 
1656
       buffer += min(block_len, max_event_size),
 
1657
       block_len -= min(block_len, max_event_size))
1474
1658
  {
1475
1659
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1476
1660
    if (lf_info->wrote_create_file)
1477
1661
    {
1478
 
      Append_block_log_event a(lf_info->session, lf_info->session->db, buffer,
1479
 
                               cmin(block_len, max_event_size),
 
1662
      Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
 
1663
                               min(block_len, max_event_size),
1480
1664
                               lf_info->log_delayed);
1481
1665
      mysql_bin_log.write(&a);
1482
1666
    }
1483
1667
    else
1484
1668
    {
1485
 
      Begin_load_query_log_event b(lf_info->session, lf_info->session->db,
 
1669
      Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1486
1670
                                   buffer,
1487
 
                                   cmin(block_len, max_event_size),
 
1671
                                   min(block_len, max_event_size),
1488
1672
                                   lf_info->log_delayed);
1489
1673
      mysql_bin_log.write(&b);
1490
1674
      lf_info->wrote_create_file= 1;
1503
1687
  sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
1504
1688
    :sys_var(name_arg)
1505
1689
  { chain_sys_var(chain); }
1506
 
  bool check(Session *session, set_var *var);
1507
 
  bool update(Session *session, set_var *var);
 
1690
  bool check(THD *thd, set_var *var);
 
1691
  bool update(THD *thd, set_var *var);
1508
1692
  bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
1509
1693
  /*
1510
1694
    We can't retrieve the value of this, so we don't have to define
1518
1702
  sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
1519
1703
                             ulong *value_ptr)
1520
1704
    :sys_var_long_ptr(chain, name_arg,value_ptr) {}
1521
 
  bool update(Session *session, set_var *var);
 
1705
  bool update(THD *thd, set_var *var);
1522
1706
};
1523
1707
 
1524
 
static void fix_slave_net_timeout(Session *session,
 
1708
static void fix_slave_net_timeout(THD *thd,
1525
1709
                                  enum_var_type type __attribute__((unused)))
1526
1710
{
 
1711
#ifdef HAVE_REPLICATION
1527
1712
  pthread_mutex_lock(&LOCK_active_mi);
1528
1713
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1529
 
    push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1714
    push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1530
1715
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1531
1716
                        "The currect value for master_heartbeat_period"
1532
1717
                        " exceeds the new value of `slave_net_timeout' sec."
1533
1718
                        " A sensible value for the period should be"
1534
1719
                        " less than the timeout.");
1535
1720
  pthread_mutex_unlock(&LOCK_active_mi);
 
1721
#endif
1536
1722
  return;
1537
1723
}
1538
1724
 
1548
1734
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
1549
1735
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
1550
1736
 
1551
 
static int show_slave_skip_errors(Session *session, SHOW_VAR *var, char *buff);
1552
 
 
1553
 
 
1554
 
static int show_slave_skip_errors(Session *session __attribute__((unused)),
 
1737
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
 
1738
 
 
1739
 
 
1740
static int show_slave_skip_errors(THD *thd __attribute__((unused)),
1555
1741
                                  SHOW_VAR *var, char *buff)
1556
1742
{
1557
1743
  var->type=SHOW_CHAR;
1583
1769
    if (var->value != buff)
1584
1770
      buff--;                           // Remove last ','
1585
1771
    if (i < MAX_SLAVE_ERROR)
1586
 
      buff= my_stpcpy(buff, "...");  // Couldn't show all errors
 
1772
      buff= strmov(buff, "...");  // Couldn't show all errors
1587
1773
    *buff=0;
1588
1774
  }
1589
1775
  return 0;
1603
1789
  {"slave_skip_errors",       (char*) &show_slave_skip_errors_cont,      SHOW_FUNC},
1604
1790
};
1605
1791
 
1606
 
bool sys_var_slave_skip_counter::check(Session *session __attribute__((unused)),
 
1792
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((unused)),
1607
1793
                                       set_var *var)
1608
1794
{
1609
1795
  int result= 0;
1616
1802
  }
1617
1803
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1618
1804
  pthread_mutex_unlock(&LOCK_active_mi);
1619
 
  var->save_result.uint32_t_value= (ulong) var->value->val_int();
 
1805
  var->save_result.ulong_value= (ulong) var->value->val_int();
1620
1806
  return result;
1621
1807
}
1622
1808
 
1623
1809
 
1624
 
bool sys_var_slave_skip_counter::update(Session *session __attribute__((unused)),
 
1810
bool sys_var_slave_skip_counter::update(THD *thd __attribute__((unused)),
1625
1811
                                        set_var *var)
1626
1812
{
1627
1813
  pthread_mutex_lock(&LOCK_active_mi);
1634
1820
  if (!active_mi->rli.slave_running)
1635
1821
  {
1636
1822
    pthread_mutex_lock(&active_mi->rli.data_lock);
1637
 
    active_mi->rli.slave_skip_counter= var->save_result.uint32_t_value;
 
1823
    active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
1638
1824
    pthread_mutex_unlock(&active_mi->rli.data_lock);
1639
1825
  }
1640
1826
  pthread_mutex_unlock(&active_mi->rli.run_lock);
1643
1829
}
1644
1830
 
1645
1831
 
1646
 
bool sys_var_sync_binlog_period::update(Session *session __attribute__((unused)),
 
1832
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((unused)),
1647
1833
                                        set_var *var)
1648
1834
{
1649
 
  sync_binlog_period= (uint32_t) var->save_result.uint64_t_value;
 
1835
  sync_binlog_period= (ulong) var->save_result.uint64_t_value;
1650
1836
  return 0;
1651
1837
}
1652
1838
 
1662
1848
  }
1663
1849
  return 0;
1664
1850
}
 
1851
 
 
1852
#endif /* HAVE_REPLICATION */