~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/slave.cc

enable remaining subselect tests, merge with latest from the trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
#include <drizzled/server_includes.h>
27
27
 
28
28
#include <storage/myisam/myisam.h>
29
 
#include "rpl_mi.h"
30
 
#include "rpl_rli.h"
31
 
#include "sql_repl.h"
32
 
#include "rpl_filter.h"
33
 
#include "repl_failsafe.h"
 
29
#include <drizzled/replication/mi.h>
 
30
#include <drizzled/replication/rli.h>
 
31
#include <drizzled/replication/replication.h>
 
32
#include <libdrizzle/libdrizzle.h>
 
33
#include <mysys/hash.h>
34
34
#include <mysys/thr_alarm.h>
35
 
#include <libdrizzle/sql_common.h>
36
35
#include <libdrizzle/errmsg.h>
37
36
#include <mysys/mysys_err.h>
38
 
#include <drizzled/drizzled_error_messages.h>
39
 
 
40
 
#ifdef HAVE_REPLICATION
41
 
 
42
 
#include "rpl_tblmap.h"
43
 
 
44
 
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
 
37
#include <drizzled/error.h>
 
38
#include <drizzled/sql_parse.h>
 
39
#include <drizzled/gettext.h>
 
40
#include <signal.h>
 
41
#include <drizzled/session.h>
 
42
#include <drizzled/log_event.h>
 
43
#include <drizzled/item/empty_string.h>
 
44
#include <drizzled/item/return_int.h>
 
45
 
 
46
#if TIME_WITH_SYS_TIME
 
47
# include <sys/time.h>
 
48
# include <time.h>
 
49
#else
 
50
# if HAVE_SYS_TIME_H
 
51
#  include <sys/time.h>
 
52
# else
 
53
#  include <time.h>
 
54
# endif
 
55
#endif
 
56
 
 
57
#include <drizzled/tztime.h>
 
58
 
 
59
#include <drizzled/replication/tblmap.h>
45
60
 
46
61
#define MAX_SLAVE_RETRY_PAUSE 5
47
62
bool use_slave_mask = 0;
48
63
MY_BITMAP slave_error_mask;
49
64
 
50
 
typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
 
65
typedef bool (*CHECK_KILLED_FUNC)(Session*,void*);
51
66
 
52
67
char* slave_load_tmpdir = 0;
53
68
Master_info *active_mi= 0;
87
102
{
88
103
  {
89
104
    N_("Waiting to reconnect after a failed registration on master"),
90
 
    N_("Slave I/O thread killed while waitnig to reconnect after a "
 
105
    N_("Slave I/O thread killed while waiting to reconnect after a "
91
106
                 "failed registration on master"),
92
107
    N_("Reconnecting after a failed registration on master"),
93
108
    N_("failed registering on master, reconnecting to try again, "
94
 
                 "log '%s' at postion %s"),
 
109
                 "log '%s' at position %s"),
95
110
    "COM_REGISTER_SLAVE",
96
111
    N_("Slave I/O thread killed during or after reconnect")
97
112
  },
100
115
    N_("Slave I/O thread killed while retrying master dump"),
101
116
    N_("Reconnecting after a failed binlog dump request"),
102
117
    N_("failed dump request, reconnecting to try again, "
103
 
                 "log '%s' at postion %s"),
 
118
                 "log '%s' at position %s"),
104
119
    "COM_BINLOG_DUMP",
105
120
    N_("Slave I/O thread killed during or after reconnect")
106
121
  },
110
125
                 "after a failed read"),
111
126
    N_("Reconnecting after a failed master event read"),
112
127
    N_("Slave I/O thread: Failed reading log event, "
113
 
                 "reconnecting to retry, log '%s' at postion %s"),
 
128
                 "reconnecting to retry, log '%s' at position %s"),
114
129
    "",
115
130
    N_("Slave I/O thread killed during or after a "
116
131
                 "reconnect done to recover from failed read")
117
132
  }
118
133
};
119
 
 
120
 
 
121
 
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
 
134
 
 
135
 
 
136
typedef enum { SLAVE_Session_IO, SLAVE_Session_SQL} SLAVE_Session_TYPE;
122
137
 
123
138
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
124
139
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
125
140
static bool wait_for_relay_log_space(Relay_log_info* rli);
126
 
static inline bool io_slave_killed(THD* thd,Master_info* mi);
127
 
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
128
 
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
129
 
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi);
130
 
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
141
static inline bool io_slave_killed(Session* session,Master_info* mi);
 
142
static inline bool sql_slave_killed(Session* session,Relay_log_info* rli);
 
143
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type);
 
144
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi);
 
145
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
131
146
                          bool suppress_warnings);
132
 
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
147
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
133
148
                             bool reconnect, bool suppress_warnings);
134
 
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
149
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
135
150
                      void* thread_killed_arg);
136
151
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
137
152
static Log_event* next_event(Relay_log_info* rli);
138
153
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
139
 
static int32_t terminate_slave_thread(THD *thd,
 
154
static int32_t terminate_slave_thread(Session *session,
140
155
                                  pthread_mutex_t* term_lock,
141
156
                                  pthread_cond_t* term_cond,
142
157
                                  volatile uint32_t *slave_running,
143
158
                                  bool skip_lock);
144
 
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
 
159
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info);
145
160
 
146
161
/*
147
162
  Find out which replications threads are running
230
245
    goto err;
231
246
  }
232
247
 
233
 
  if (init_master_info(active_mi,master_info_file,relay_log_info_file,
234
 
                       1, (SLAVE_IO | SLAVE_SQL)))
 
248
  if (active_mi->init_master_info(master_info_file, relay_log_info_file, (SLAVE_IO | SLAVE_SQL)))
235
249
  {
236
250
    sql_print_error(_("Failed to initialize the master info structure"));
237
251
    goto err;
284
298
  use_slave_mask = 1;
285
299
  for (;my_isspace(system_charset_info,*arg);++arg)
286
300
    /* empty */;
287
 
  if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
 
301
  if (!my_strnncoll(system_charset_info,(unsigned char*)arg,4,(const unsigned char*)"all",4))
288
302
  {
289
303
    bitmap_set_all(&slave_error_mask);
290
304
    return;
313
327
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
314
328
  {
315
329
    mi->abort_slave=1;
316
 
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
 
330
    if ((error=terminate_slave_thread(mi->io_session,io_lock,
317
331
                                      &mi->stop_cond,
318
332
                                      &mi->slave_running,
319
333
                                      skip_lock)) &&
323
337
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
324
338
  {
325
339
    mi->rli.abort_slave=1;
326
 
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
 
340
    if ((error=terminate_slave_thread(mi->rli.sql_session,sql_lock,
327
341
                                      &mi->rli.stop_cond,
328
342
                                      &mi->rli.slave_running,
329
343
                                      skip_lock)) &&
364
378
   @retval 0 All OK
365
379
 */
366
380
static int32_t
367
 
terminate_slave_thread(THD *thd,
 
381
terminate_slave_thread(Session *session,
368
382
                       pthread_mutex_t* term_lock,
369
383
                       pthread_cond_t* term_cond,
370
384
                       volatile uint32_t *slave_running,
383
397
      pthread_mutex_unlock(term_lock);
384
398
    return(ER_SLAVE_NOT_RUNNING);
385
399
  }
386
 
  assert(thd != 0);
387
 
  THD_CHECK_SENTRY(thd);
 
400
  assert(session != 0);
 
401
  Session_CHECK_SENTRY(session);
388
402
 
389
403
  /*
390
404
    Is is critical to test if the slave is running. Otherwise, we might
393
407
 
394
408
  while (*slave_running)                        // Should always be true
395
409
  {
396
 
    pthread_mutex_lock(&thd->LOCK_delete);
 
410
    pthread_mutex_lock(&session->LOCK_delete);
397
411
#ifndef DONT_USE_THR_ALARM
398
412
    /*
399
413
      Error codes from pthread_kill are:
400
414
      EINVAL: invalid signal number (can't happen)
401
415
      ESRCH: thread already killed (can happen, should be ignored)
402
416
    */
403
 
    int32_t err= pthread_kill(thd->real_id, thr_client_alarm);
 
417
    int32_t err= pthread_kill(session->real_id, thr_client_alarm);
404
418
    assert(err != EINVAL);
405
419
#endif
406
 
    thd->awake(THD::NOT_KILLED);
407
 
    pthread_mutex_unlock(&thd->LOCK_delete);
 
420
    session->awake(Session::NOT_KILLED);
 
421
    pthread_mutex_unlock(&session->LOCK_delete);
408
422
 
409
423
    /*
410
424
      There is a small chance that slave thread might miss the first
474
488
  }
475
489
  if (start_cond && cond_lock) // caller has cond_lock
476
490
  {
477
 
    THD* thd = current_thd;
 
491
    Session* session = current_session;
478
492
    while (start_id == *slave_run_id)
479
493
    {
480
 
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
 
494
      const char* old_msg = session->enter_cond(start_cond,cond_lock,
481
495
                                            "Waiting for slave thread to start");
482
496
      pthread_cond_wait(start_cond,cond_lock);
483
 
      thd->exit_cond(old_msg);
 
497
      session->exit_cond(old_msg);
484
498
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
485
 
      if (thd->killed)
486
 
        return(thd->killed_errno());
 
499
      if (session->killed)
 
500
        return(session->killed_errno());
487
501
    }
488
502
  }
489
503
  if (start_lock)
502
516
*/
503
517
 
504
518
int32_t start_slave_threads(bool need_slave_mutex, bool wait_for_start,
505
 
                        Master_info* mi,
506
 
                        const char* master_info_fname __attribute__((unused)),
507
 
                        const char* slave_info_fname __attribute__((unused)),
 
519
                        Master_info* mi, const char*, const char*,
508
520
                        int32_t thread_mask)
509
521
{
510
522
  pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
543
555
 
544
556
 
545
557
#ifdef NOT_USED_YET
546
 
static int32_t end_slave_on_walk(Master_info* mi, uchar* /*unused*/)
 
558
static int32_t end_slave_on_walk(Master_info* mi, unsigned char* /*unused*/)
547
559
{
548
560
  end_master_info(mi);
549
561
  return(0);
576
588
      once multi-master code is ready.
577
589
    */
578
590
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
579
 
    end_master_info(active_mi);
 
591
    active_mi->end_master_info();
580
592
    delete active_mi;
581
593
    active_mi= 0;
582
594
  }
585
597
}
586
598
 
587
599
 
588
 
static bool io_slave_killed(THD* thd, Master_info* mi)
 
600
static bool io_slave_killed(Session* session, Master_info* mi)
589
601
{
590
 
  assert(mi->io_thd == thd);
 
602
  assert(mi->io_session == session);
591
603
  assert(mi->slave_running); // tracking buffer overrun
592
 
  return(mi->abort_slave || abort_loop || thd->killed);
 
604
  return(mi->abort_slave || abort_loop || session->killed);
593
605
}
594
606
 
595
607
 
596
 
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
 
608
static bool sql_slave_killed(Session* session, Relay_log_info* rli)
597
609
{
598
 
  assert(rli->sql_thd == thd);
 
610
  assert(rli->sql_session == session);
599
611
  assert(rli->slave_running == 1);// tracking buffer overrun
600
 
  if (abort_loop || thd->killed || rli->abort_slave)
 
612
  if (abort_loop || session->killed || rli->abort_slave)
601
613
  {
602
614
    /*
603
615
      If we are in an unsafe situation (stopping could corrupt replication),
610
622
    */
611
623
    if (rli->last_event_start_time == 0)
612
624
      return(1);
613
 
    if (difftime(time(0), rli->last_event_start_time) > 60)
 
625
    if (difftime(time(NULL), rli->last_event_start_time) > 60)
614
626
    {
615
627
      rli->report(ERROR_LEVEL, 0,
616
628
                  _("SQL thread had to stop in an unsafe situation, in "
637
649
{
638
650
  (void)net_request_file(net, "/dev/null");
639
651
  (void)my_net_read(net);                               // discard response
640
 
  (void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok
 
652
  (void)net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0); // ok
641
653
  return;
642
654
}
643
655
 
644
656
 
645
657
bool net_request_file(NET* net, const char* fname)
646
658
{
647
 
  return(net_write_command(net, 251, (uchar*) fname, strlen(fname),
648
 
                                (uchar*) "", 0));
 
659
  return(net_write_command(net, 251, (unsigned char*) fname, strlen(fname),
 
660
                                (unsigned char*) "", 0));
649
661
}
650
662
 
651
663
/*
683
695
  }
684
696
  else if (default_val)
685
697
  {
686
 
    strmake(var,  default_val, max_size-1);
 
698
    strncpy(var,  default_val, max_size-1);
687
699
    return(0);
688
700
  }
689
701
  return(1);
728
740
  return(1);
729
741
}
730
742
 
731
 
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
 
743
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info)
732
744
{
733
 
  if (io_slave_killed(thd, mi))
 
745
  if (io_slave_killed(session, mi))
734
746
  {
735
747
    if (info && global_system_variables.log_warnings)
736
 
      sql_print_information(info);
 
748
      sql_print_information("%s",info);
737
749
    return true;
738
750
  }
739
751
  return false;
847
859
      (master_row= drizzle_fetch_row(master_res)))
848
860
  {
849
861
    mi->clock_diff_with_master=
850
 
      (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
 
862
      (long) (time(NULL) - strtoul(master_row[0], 0, 10));
851
863
  }
852
 
  else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
 
864
  else if (!check_io_slave_killed(mi->io_session, mi, NULL))
853
865
  {
854
866
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
855
867
    sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
863
875
  /*
864
876
    Check that the master's server id and ours are different. Because if they
865
877
    are equal (which can result from a simple copy of master's datadir to slave,
866
 
    thus copying some my.cnf), replication will work but all events will be
 
878
    thus copying some drizzle.cnf), replication will work but all events will be
867
879
    skipped.
868
880
    Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
869
881
    master?).
883
895
          "DRIZZLE server ids; these ids must be different "
884
896
          "for replication to work (or "
885
897
          "the --replicate-same-server-id option must be used "
886
 
          "on slave but this does"
 
898
          "on slave but this does "
887
899
          "not always make sense; please check the manual before using it).");
888
900
      err_code= ER_SLAVE_FATAL_ERROR;
889
901
      sprintf(err_buff, ER(err_code), errmsg);
978
990
    char llbuf[22];
979
991
    const char query_format[]= "SET @master_heartbeat_period= %s";
980
992
    char query[sizeof(query_format) - 2 + sizeof(llbuf)];
981
 
    /* 
982
 
       the period is an uint64_t of nano-secs. 
 
993
    /*
 
994
       the period is an uint64_t of nano-secs.
983
995
    */
984
996
    llstr((uint64_t) (mi->heartbeat_period*1000000000UL), llbuf);
985
997
    sprintf(query, query_format, llbuf);
986
998
 
987
999
    if (drizzle_real_query(drizzle, query, strlen(query))
988
 
        && !check_io_slave_killed(mi->io_thd, mi, NULL))
 
1000
        && !check_io_slave_killed(mi->io_session, mi, NULL))
989
1001
    {
990
1002
      err_msg.append("The slave I/O thread stops because querying master with '");
991
1003
      err_msg.append(query);
1001
1013
    }
1002
1014
    drizzle_free_result(drizzle_store_result(drizzle));
1003
1015
  }
1004
 
  
 
1016
 
1005
1017
err:
1006
1018
  if (err_msg.length() != 0)
1007
1019
  {
1008
 
    sql_print_error(err_msg.ptr());
 
1020
    sql_print_error("%s",err_msg.ptr());
1009
1021
    assert(err_code != 0);
1010
 
    mi->report(ERROR_LEVEL, err_code, err_msg.ptr());
 
1022
    mi->report(ERROR_LEVEL, err_code, "%s",err_msg.ptr());
1011
1023
    return(1);
1012
1024
  }
1013
1025
 
1020
1032
  bool slave_killed=0;
1021
1033
  Master_info* mi = rli->mi;
1022
1034
  const char *save_proc_info;
1023
 
  THD* thd = mi->io_thd;
 
1035
  Session* session = mi->io_session;
1024
1036
 
1025
1037
  pthread_mutex_lock(&rli->log_space_lock);
1026
 
  save_proc_info= thd->enter_cond(&rli->log_space_cond,
 
1038
  save_proc_info= session->enter_cond(&rli->log_space_cond,
1027
1039
                                  &rli->log_space_lock,
1028
1040
                                  _("Waiting for the slave SQL thread "
1029
1041
                                    "to free enough relay log space"));
1030
1042
  while (rli->log_space_limit < rli->log_space_total &&
1031
 
         !(slave_killed=io_slave_killed(thd,mi)) &&
 
1043
         !(slave_killed=io_slave_killed(session,mi)) &&
1032
1044
         !rli->ignore_log_space_limit)
1033
1045
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1034
 
  thd->exit_cond(save_proc_info);
 
1046
  session->exit_cond(save_proc_info);
1035
1047
  return(slave_killed);
1036
1048
}
1037
1049
 
1041
1053
 
1042
1054
  SYNOPSIS
1043
1055
  write_ignored_events_info_to_relay_log()
1044
 
    thd             pointer to I/O thread's thd
 
1056
    session             pointer to I/O thread's session
1045
1057
    mi
1046
1058
 
1047
1059
  DESCRIPTION
1049
1061
    ignored events' end position for the use of the slave SQL thread, by
1050
1062
    calling this function. Only that thread can call it (see assertion).
1051
1063
 */
1052
 
static void write_ignored_events_info_to_relay_log(THD *thd __attribute__((unused)),
 
1064
static void write_ignored_events_info_to_relay_log(Session *session,
1053
1065
                                                   Master_info *mi)
1054
1066
{
1055
1067
  Relay_log_info *rli= &mi->rli;
1056
1068
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1057
1069
 
1058
 
  assert(thd == mi->io_thd);
 
1070
  assert(session == mi->io_session);
1059
1071
  pthread_mutex_lock(log_lock);
1060
1072
  if (rli->ign_master_log_name_end[0])
1061
1073
  {
1063
1075
                                               0, rli->ign_master_log_pos_end,
1064
1076
                                               Rotate_log_event::DUP_NAME);
1065
1077
    rli->ign_master_log_name_end[0]= 0;
1066
 
    /* can unlock before writing as slave SQL thd will soon see our Rotate */
 
1078
    /* can unlock before writing as slave SQL session will soon see our Rotate */
1067
1079
    pthread_mutex_unlock(log_lock);
1068
1080
    if (likely((bool)ev))
1069
1081
    {
1075
1087
                     " to the relay log, SHOW SLAVE STATUS may be"
1076
1088
                     " inaccurate"));
1077
1089
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1078
 
      if (flush_master_info(mi, 1))
 
1090
      if (mi->flush())
1079
1091
        sql_print_error(_("Failed to flush master info file"));
1080
1092
      delete ev;
1081
1093
    }
1094
1106
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
1095
1107
                             bool *suppress_warnings)
1096
1108
{
1097
 
  uchar buf[1024], *pos= buf;
 
1109
  unsigned char buf[1024], *pos= buf;
1098
1110
  uint32_t report_host_len, report_user_len=0, report_password_len=0;
1099
1111
 
1100
1112
  *suppress_warnings= false;
1101
1113
  if (!report_host)
1102
1114
    return(0);
1103
1115
  report_host_len= strlen(report_host);
1104
 
  if (report_user)
1105
 
    report_user_len= strlen(report_user);
1106
 
  if (report_password)
1107
 
    report_password_len= strlen(report_password);
1108
1116
  /* 30 is a good safety margin */
1109
1117
  if (report_host_len + report_user_len + report_password_len + 30 >
1110
1118
      sizeof(buf))
1111
1119
    return(0);                                     // safety
1112
1120
 
1113
1121
  int4store(pos, server_id); pos+= 4;
1114
 
  pos= net_store_data(pos, (uchar*) report_host, report_host_len);
1115
 
  pos= net_store_data(pos, (uchar*) report_user, report_user_len);
1116
 
  pos= net_store_data(pos, (uchar*) report_password, report_password_len);
 
1122
  pos= net_store_data(pos, (unsigned char*) report_host, report_host_len);
 
1123
  pos= net_store_data(pos, NULL, report_user_len);
 
1124
  pos= net_store_data(pos, NULL, report_password_len);
1117
1125
  int2store(pos, (uint16_t) report_port); pos+= 2;
1118
 
  int4store(pos, rpl_recovery_rank);    pos+= 4;
 
1126
  int4store(pos, 0);    pos+= 4;
1119
1127
  /* The master will fill in master_id */
1120
1128
  int4store(pos, 0);                    pos+= 4;
1121
1129
 
1125
1133
    {
1126
1134
      *suppress_warnings= true;                 // Suppress reconnect warning
1127
1135
    }
1128
 
    else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
 
1136
    else if (!check_io_slave_killed(mi->io_session, mi, NULL))
1129
1137
    {
1130
1138
      char buf[256];
1131
 
      snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle), 
 
1139
      snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle),
1132
1140
               drizzle_errno(drizzle));
1133
1141
      mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1134
1142
                 ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1139
1147
}
1140
1148
 
1141
1149
 
1142
 
bool show_master_info(THD* thd, Master_info* mi)
 
1150
bool show_master_info(Session* session, Master_info* mi)
1143
1151
{
1144
1152
  // TODO: fix this for multi-master
1145
1153
  List<Item> field_list;
1146
 
  Protocol *protocol= thd->protocol;
 
1154
  Protocol *protocol= session->protocol;
1147
1155
 
1148
1156
  field_list.push_back(new Item_empty_string("Slave_IO_State",
1149
1157
                                                     14));
1167
1175
                                             FN_REFLEN));
1168
1176
  field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
1169
1177
  field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1170
 
  field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
1171
 
  field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
1172
 
  field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
1173
 
  field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
1174
 
  field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
1175
 
  field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
1176
 
                                             28));
1177
1178
  field_list.push_back(new Item_return_int("Last_Errno", 4, DRIZZLE_TYPE_LONG));
1178
1179
  field_list.push_back(new Item_empty_string("Last_Error", 20));
1179
1180
  field_list.push_back(new Item_return_int("Skip_Counter", 10,
1186
1187
  field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1187
1188
  field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1188
1189
                                           DRIZZLE_TYPE_LONGLONG));
1189
 
  field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
1190
 
  field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
1191
 
                                             sizeof(mi->ssl_ca)));
1192
 
  field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
1193
 
                                             sizeof(mi->ssl_capath)));
1194
 
  field_list.push_back(new Item_empty_string("Master_SSL_Cert",
1195
 
                                             sizeof(mi->ssl_cert)));
1196
 
  field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
1197
 
                                             sizeof(mi->ssl_cipher)));
1198
 
  field_list.push_back(new Item_empty_string("Master_SSL_Key",
1199
 
                                             sizeof(mi->ssl_key)));
1200
1190
  field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1201
1191
                                           DRIZZLE_TYPE_LONGLONG));
1202
 
  field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
1203
 
                                             3));
1204
1192
  field_list.push_back(new Item_return_int("Last_IO_Errno", 4, DRIZZLE_TYPE_LONG));
1205
1193
  field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1206
1194
  field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, DRIZZLE_TYPE_LONG));
1212
1200
 
1213
1201
  if (mi->host[0])
1214
1202
  {
1215
 
    String *packet= &thd->packet;
 
1203
    String *packet= &session->packet;
1216
1204
    protocol->prepare_for_resend();
1217
1205
 
1218
1206
    /*
1219
1207
      slave_running can be accessed without run_lock but not other
1220
 
      non-volotile members like mi->io_thd, which is guarded by the mutex.
 
1208
      non-volotile members like mi->io_session, which is guarded by the mutex.
1221
1209
    */
1222
1210
    pthread_mutex_lock(&mi->run_lock);
1223
 
    protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
 
1211
    protocol->store(mi->io_session ? mi->io_session->get_proc_info() : "", &my_charset_bin);
1224
1212
    pthread_mutex_unlock(&mi->run_lock);
1225
1213
 
1226
1214
    pthread_mutex_lock(&mi->data_lock);
1227
1215
    pthread_mutex_lock(&mi->rli.data_lock);
1228
 
    protocol->store(mi->host, &my_charset_bin);
1229
 
    protocol->store(mi->user, &my_charset_bin);
1230
 
    protocol->store((uint32_t) mi->port);
1231
 
    protocol->store((uint32_t) mi->connect_retry);
1232
 
    protocol->store(mi->master_log_name, &my_charset_bin);
1233
 
    protocol->store((uint64_t) mi->master_log_pos);
1234
 
    protocol->store(mi->rli.group_relay_log_name +
1235
 
                    dirname_length(mi->rli.group_relay_log_name),
 
1216
    protocol->store(mi->getHostname(), &my_charset_bin);
 
1217
    protocol->store(mi->getUsername(), &my_charset_bin);
 
1218
    protocol->store((uint32_t) mi->getPort());
 
1219
    protocol->store(mi->getConnectionRetry());
 
1220
    protocol->store(mi->getLogName(), &my_charset_bin);
 
1221
    protocol->store((uint64_t) mi->getLogPosition());
 
1222
    protocol->store(mi->rli.group_relay_log_name.c_str() +
 
1223
                    dirname_length(mi->rli.group_relay_log_name.c_str()),
1236
1224
                    &my_charset_bin);
1237
1225
    protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1238
 
    protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
 
1226
    protocol->store(mi->rli.group_master_log_name.c_str(), &my_charset_bin);
1239
1227
    protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1240
1228
                    "Yes" : "No", &my_charset_bin);
1241
1229
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1242
 
    protocol->store(rpl_filter->get_do_db());
1243
 
    protocol->store(rpl_filter->get_ignore_db());
1244
 
 
1245
 
    char buf[256];
1246
 
    String tmp(buf, sizeof(buf), &my_charset_bin);
1247
 
    rpl_filter->get_do_table(&tmp);
1248
 
    protocol->store(&tmp);
1249
 
    rpl_filter->get_ignore_table(&tmp);
1250
 
    protocol->store(&tmp);
1251
 
    rpl_filter->get_wild_do_table(&tmp);
1252
 
    protocol->store(&tmp);
1253
 
    rpl_filter->get_wild_ignore_table(&tmp);
1254
 
    protocol->store(&tmp);
1255
1230
 
1256
1231
    protocol->store(mi->rli.last_error().number);
1257
1232
    protocol->store(mi->rli.last_error().message, &my_charset_bin);
1266
1241
    protocol->store(mi->rli.until_log_name, &my_charset_bin);
1267
1242
    protocol->store((uint64_t) mi->rli.until_log_pos);
1268
1243
 
1269
 
    protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
1270
 
    protocol->store(mi->ssl_ca, &my_charset_bin);
1271
 
    protocol->store(mi->ssl_capath, &my_charset_bin);
1272
 
    protocol->store(mi->ssl_cert, &my_charset_bin);
1273
 
    protocol->store(mi->ssl_cipher, &my_charset_bin);
1274
 
    protocol->store(mi->ssl_key, &my_charset_bin);
1275
 
 
1276
1244
    /*
1277
1245
      Seconds_Behind_Master: if SQL thread is running and I/O thread is
1278
1246
      connected, we can compute it otherwise show NULL (i.e. unknown).
1280
1248
    if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
1281
1249
        mi->rli.slave_running)
1282
1250
    {
1283
 
      long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
 
1251
      long time_diff= ((long)(time(NULL) - mi->rli.last_master_timestamp)
1284
1252
                       - mi->clock_diff_with_master);
1285
1253
      /*
1286
1254
        Apparently on some systems time_diff can be <0. Here are possible
1297
1265
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1298
1266
        between timestamp of slave and rli->last_master_timestamp is 0
1299
1267
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1300
 
        This confuses users, so we don't go below 0: hence the max().
 
1268
        This confuses users, so we don't go below 0: hence the cmax().
1301
1269
 
1302
1270
        last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1303
1271
        special marker to say "consider we have caught up".
1304
1272
      */
1305
1273
      protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1306
 
                                 max((long)0, time_diff) : 0));
 
1274
                                 cmax((long)0, time_diff) : 0));
1307
1275
    }
1308
1276
    else
1309
1277
    {
1310
1278
      protocol->store_null();
1311
1279
    }
1312
 
    protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
1313
1280
 
1314
1281
    // Last_IO_Errno
1315
1282
    protocol->store(mi->last_error().number);
1323
1290
    pthread_mutex_unlock(&mi->rli.data_lock);
1324
1291
    pthread_mutex_unlock(&mi->data_lock);
1325
1292
 
1326
 
    if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
 
1293
    if (my_net_write(&session->net, (unsigned char*) session->packet.ptr(), packet->length()))
1327
1294
      return(true);
1328
1295
  }
1329
 
  my_eof(thd);
 
1296
  my_eof(session);
1330
1297
  return(false);
1331
1298
}
1332
1299
 
1333
1300
 
1334
 
void set_slave_thread_options(THD* thd)
 
1301
void set_slave_thread_options(Session* session)
1335
1302
{
1336
1303
  /*
1337
1304
     It's nonsense to constrain the slave threads with max_join_size; if a
1342
1309
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1343
1310
     only for client threads.
1344
1311
  */
1345
 
  uint64_t options= thd->options | OPTION_BIG_SELECTS;
 
1312
  uint64_t options= session->options | OPTION_BIG_SELECTS;
1346
1313
  if (opt_log_slave_updates)
1347
1314
    options|= OPTION_BIN_LOG;
1348
1315
  else
1349
1316
    options&= ~OPTION_BIN_LOG;
1350
 
  thd->options= options;
1351
 
  thd->variables.completion_type= 0;
1352
 
  return;
1353
 
}
1354
 
 
1355
 
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
1356
 
{
1357
 
  thd->variables.character_set_client=
1358
 
    global_system_variables.character_set_client;
1359
 
  thd->variables.collation_connection=
1360
 
    global_system_variables.collation_connection;
1361
 
  thd->variables.collation_server=
1362
 
    global_system_variables.collation_server;
1363
 
  thd->update_charset();
1364
 
 
1365
 
  /*
1366
 
    We use a const cast here since the conceptual (and externally
1367
 
    visible) behavior of the function is to set the default charset of
1368
 
    the thread.  That the cache has to be invalidated is a secondary
1369
 
    effect.
1370
 
   */
1371
 
  const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
 
1317
  session->options= options;
 
1318
  session->variables.completion_type= 0;
1372
1319
  return;
1373
1320
}
1374
1321
 
1376
1323
  init_slave_thread()
1377
1324
*/
1378
1325
 
1379
 
static int32_t init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
 
1326
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type)
1380
1327
{
1381
1328
  int32_t simulate_error= 0;
1382
 
  thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
 
1329
  session->system_thread = (session_type == SLAVE_Session_SQL) ?
1383
1330
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1384
 
  thd->security_ctx->skip_grants();
1385
 
  my_net_init(&thd->net, 0);
 
1331
  session->security_ctx.skip_grants();
 
1332
  my_net_init(&session->net, 0);
1386
1333
/*
1387
1334
  Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1388
1335
  slave threads, since a replication event can become this much larger
1389
1336
  than the corresponding packet (query) sent from client to master.
1390
1337
*/
1391
 
  thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
 
1338
  session->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1392
1339
    + MAX_LOG_EVENT_HEADER;  /* note, incr over the global not session var */
1393
 
  thd->slave_thread = 1;
1394
 
  thd->enable_slow_log= opt_log_slow_slave_statements;
1395
 
  set_slave_thread_options(thd);
1396
 
  thd->client_capabilities = CLIENT_LOCAL_FILES;
 
1340
  session->slave_thread = 1;
 
1341
  set_slave_thread_options(session);
 
1342
  session->client_capabilities = CLIENT_LOCAL_FILES;
1397
1343
  pthread_mutex_lock(&LOCK_thread_count);
1398
 
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
 
1344
  session->thread_id= session->variables.pseudo_thread_id= thread_id++;
1399
1345
  pthread_mutex_unlock(&LOCK_thread_count);
1400
1346
 
1401
 
 simulate_error|= (1 << SLAVE_THD_IO);
1402
 
 simulate_error|= (1 << SLAVE_THD_SQL);
1403
 
  if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
 
1347
 simulate_error|= (1 << SLAVE_Session_IO);
 
1348
 simulate_error|= (1 << SLAVE_Session_SQL);
 
1349
  if (init_thr_lock() || session->store_globals() || simulate_error & (1<< session_type))
1404
1350
  {
1405
 
    thd->cleanup();
 
1351
    session->cleanup();
1406
1352
    return(-1);
1407
1353
  }
1408
 
  lex_start(thd);
 
1354
  lex_start(session);
1409
1355
 
1410
 
  if (thd_type == SLAVE_THD_SQL)
1411
 
    thd_proc_info(thd, "Waiting for the next event in relay log");
 
1356
  if (session_type == SLAVE_Session_SQL)
 
1357
    session->set_proc_info("Waiting for the next event in relay log");
1412
1358
  else
1413
 
    thd_proc_info(thd, "Waiting for master update");
1414
 
  thd->version=refresh_version;
1415
 
  thd->set_time();
 
1359
    session->set_proc_info("Waiting for master update");
 
1360
  session->version=refresh_version;
 
1361
  session->set_time();
1416
1362
  return(0);
1417
1363
}
1418
1364
 
1419
 
 
1420
 
static int32_t safe_sleep(THD* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1421
 
                      void* thread_killed_arg)
 
1365
/* Returns non zero on error */
 
1366
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
1367
                          void* thread_killed_arg)
1422
1368
{
1423
1369
  int32_t nap_time;
1424
1370
  thr_alarm_t alarmed;
1425
1371
 
1426
1372
  thr_alarm_init(&alarmed);
1427
 
  time_t start_time= my_time(0);
1428
 
  time_t end_time= start_time+sec;
 
1373
  time_t start_time, end_time;
 
1374
 
 
1375
  start_time= time(NULL);
 
1376
  if (start_time == (time_t)-1)
 
1377
    return -1;
 
1378
  end_time= start_time+sec;
1429
1379
 
1430
1380
  while ((nap_time= (int32_t) (end_time - start_time)) > 0)
1431
1381
  {
1439
1389
    sleep(nap_time);
1440
1390
    thr_end_alarm(&alarmed);
1441
1391
 
1442
 
    if ((*thread_killed)(thd,thread_killed_arg))
 
1392
    if ((*thread_killed)(session,thread_killed_arg))
1443
1393
      return(1);
1444
 
    start_time= my_time(0);
 
1394
 
 
1395
    start_time= time(NULL);
 
1396
    if (start_time == (time_t)-1)
 
1397
      return -1;
1445
1398
  }
1446
1399
  return(0);
1447
1400
}
1450
1403
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
1451
1404
                        bool *suppress_warnings)
1452
1405
{
1453
 
  uchar buf[FN_REFLEN + 10];
 
1406
  unsigned char buf[FN_REFLEN + 10];
1454
1407
  int32_t len;
1455
1408
  int32_t binlog_flags = 0; // for now
1456
 
  char* logname = mi->master_log_name;
1457
 
  
 
1409
  const char* logname = mi->getLogName();
 
1410
 
1458
1411
  *suppress_warnings= false;
1459
1412
 
1460
1413
  // TODO if big log files: Change next to int8store()
1461
 
  int4store(buf, (uint32_t) mi->master_log_pos);
 
1414
  int4store(buf, (uint32_t) mi->getLogPosition());
1462
1415
  int2store(buf + 4, binlog_flags);
1463
1416
  int4store(buf + 6, server_id);
1464
1417
  len = (uint32_t) strlen(logname);
1500
1453
*/
1501
1454
 
1502
1455
static uint32_t read_event(DRIZZLE *drizzle,
1503
 
                        Master_info *mi __attribute__((unused)),
1504
 
                        bool* suppress_warnings)
 
1456
                           Master_info *mi,
 
1457
                           bool* suppress_warnings)
1505
1458
{
1506
1459
  uint32_t len;
1507
1460
 
1544
1497
}
1545
1498
 
1546
1499
 
1547
 
int32_t check_expected_error(THD* thd __attribute__((unused)),
1548
 
                         Relay_log_info const *rli __attribute__((unused)),
1549
 
                         int32_t expected_error)
 
1500
int32_t check_expected_error(Session*, Relay_log_info const *,
 
1501
                             int32_t expected_error)
1550
1502
{
1551
1503
  switch (expected_error) {
1552
1504
  case ER_NET_READ_ERROR:
1568
1520
  that the error is temporary by pushing a warning with the error code
1569
1521
  ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1570
1522
*/
1571
 
static int32_t has_temporary_error(THD *thd)
 
1523
static int32_t has_temporary_error(Session *session)
1572
1524
{
1573
 
  if (thd->is_fatal_error)
 
1525
  if (session->is_fatal_error)
1574
1526
    return(0);
1575
1527
 
1576
 
  if (thd->main_da.is_error())
 
1528
  if (session->main_da.is_error())
1577
1529
  {
1578
 
    thd->clear_error();
 
1530
    session->clear_error();
1579
1531
    my_error(ER_LOCK_DEADLOCK, MYF(0));
1580
1532
  }
1581
1533
 
1582
1534
  /*
1583
 
    If there is no message in THD, we can't say if it's a temporary
 
1535
    If there is no message in Session, we can't say if it's a temporary
1584
1536
    error or not. This is currently the case for Incident_log_event,
1585
1537
    which sets no message. Return FALSE.
1586
1538
  */
1587
 
  if (!thd->is_error())
 
1539
  if (!session->is_error())
1588
1540
    return(0);
1589
1541
 
1590
1542
  /*
1592
1544
    currently, InnoDB deadlock detected by InnoDB or lock
1593
1545
    wait timeout (innodb_lock_wait_timeout exceeded
1594
1546
  */
1595
 
  if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
1596
 
      thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
 
1547
  if (session->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
 
1548
      session->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
1597
1549
    return(1);
1598
1550
 
1599
1551
  return(0);
1626
1578
  @retval 2 No error calling ev->apply_event(), but error calling
1627
1579
  ev->update_pos().
1628
1580
*/
1629
 
int32_t apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
 
1581
int32_t apply_event_and_update_pos(Log_event* ev, Session* session, Relay_log_info* rli,
1630
1582
                               bool skip)
1631
1583
{
1632
1584
  int32_t exec_res= 0;
1655
1607
    has a Rotate etc).
1656
1608
  */
1657
1609
 
1658
 
  thd->server_id = ev->server_id; // use the original server id for logging
1659
 
  thd->set_time();                            // time the query
1660
 
  thd->lex->current_select= 0;
 
1610
  session->server_id = ev->server_id; // use the original server id for logging
 
1611
  session->set_time();                            // time the query
 
1612
  session->lex->current_select= 0;
1661
1613
  if (!ev->when)
1662
 
    ev->when= my_time(0);
1663
 
  ev->thd = thd; // because up to this point, ev->thd == 0
 
1614
  {
 
1615
    ev->when= time(NULL);
 
1616
    if(ev->when == (time_t)-1)
 
1617
      return 2;
 
1618
  }
 
1619
 
 
1620
  ev->session = session; // because up to this point, ev->session == 0
1664
1621
 
1665
1622
  if (skip)
1666
1623
  {
1692
1649
                  " of the relay log information: the slave may"
1693
1650
                  " be in an inconsistent state."
1694
1651
                  " Stopped in %s position %s"),
1695
 
                  rli->group_relay_log_name,
 
1652
                  rli->group_relay_log_name.c_str(),
1696
1653
                  llstr(rli->group_relay_log_pos, buf));
1697
1654
      return(2);
1698
1655
    }
1730
1687
 
1731
1688
  @retval 1 The event was not applied.
1732
1689
*/
1733
 
static int32_t exec_relay_log_event(THD* thd, Relay_log_info* rli)
 
1690
static int32_t exec_relay_log_event(Session* session, Relay_log_info* rli)
1734
1691
{
1735
1692
  /*
1736
1693
     We acquire this mutex since we need it for all operations except
1741
1698
 
1742
1699
  Log_event * ev = next_event(rli);
1743
1700
 
1744
 
  assert(rli->sql_thd==thd);
 
1701
  assert(rli->sql_session==session);
1745
1702
 
1746
 
  if (sql_slave_killed(thd,rli))
 
1703
  if (sql_slave_killed(session,rli))
1747
1704
  {
1748
1705
    pthread_mutex_unlock(&rli->data_lock);
1749
1706
    delete ev;
1775
1732
      delete ev;
1776
1733
      return(1);
1777
1734
    }
1778
 
    exec_res= apply_event_and_update_pos(ev, thd, rli, true);
 
1735
    exec_res= apply_event_and_update_pos(ev, session, rli, true);
1779
1736
 
1780
1737
    /*
1781
1738
      Format_description_log_event should not be deleted because it will be
1797
1754
    if (slave_trans_retries)
1798
1755
    {
1799
1756
      int32_t temp_err= 0;
1800
 
      if (exec_res && (temp_err= has_temporary_error(thd)))
 
1757
      if (exec_res && (temp_err= has_temporary_error(session)))
1801
1758
      {
1802
1759
        const char *errmsg;
1803
1760
        /*
1816
1773
        */
1817
1774
        if (rli->trans_retries < slave_trans_retries)
1818
1775
        {
1819
 
          if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
 
1776
          if (rli->mi->init_master_info(0, 0, SLAVE_SQL))
1820
1777
            sql_print_error(_("Failed to initialize the master info structure"));
1821
1778
          else if (init_relay_log_pos(rli,
1822
 
                                      rli->group_relay_log_name,
 
1779
                                      rli->group_relay_log_name.c_str(),
1823
1780
                                      rli->group_relay_log_pos,
1824
1781
                                      1, &errmsg, 1))
1825
1782
            sql_print_error(_("Error initializing relay log position: %s"),
1827
1784
          else
1828
1785
          {
1829
1786
            exec_res= 0;
1830
 
            end_trans(thd, ROLLBACK);
 
1787
            end_trans(session, ROLLBACK);
1831
1788
            /* chance for concurrent connection to get more locks */
1832
 
            safe_sleep(thd, min(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
 
1789
            safe_sleep(session, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1833
1790
                       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1834
1791
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1835
1792
            rli->trans_retries++;
1838
1795
          }
1839
1796
        }
1840
1797
        else
1841
 
          sql_print_error(_("Slave SQL thread retried transaction %lu time(s) "
 
1798
          sql_print_error(_("Slave SQL thread retried transaction %"PRIu64" time(s) "
1842
1799
                            "in vain, giving up. Consider raising the value of "
1843
1800
                            "the slave_transaction_retries variable."),
1844
1801
                          slave_trans_retries);
1887
1844
  are taken from @c messages array. In case @c master_retry_count is exceeded,
1888
1845
  no messages are added to the log.
1889
1846
 
1890
 
  @param[in]     thd                 Thread context.
 
1847
  @param[in]     session                 Thread context.
1891
1848
  @param[in]     DRIZZLE               DRIZZLE connection.
1892
1849
  @param[in]     mi                  Master connection information.
1893
1850
  @param[in,out] retry_count         Number of attempts to reconnect.
1894
 
  @param[in]     suppress_warnings   TRUE when a normal net read timeout 
 
1851
  @param[in]     suppress_warnings   TRUE when a normal net read timeout
1895
1852
                                     has caused to reconnecting.
1896
 
  @param[in]     messages            Messages to print/log, see 
 
1853
  @param[in]     messages            Messages to print/log, see
1897
1854
                                     reconnect_messages[] array.
1898
1855
 
1899
1856
  @retval        0                   OK.
1900
1857
  @retval        1                   There was an error.
1901
1858
*/
1902
1859
 
1903
 
static int32_t try_to_reconnect(THD *thd, DRIZZLE *drizzle, Master_info *mi,
1904
 
                            uint32_t *retry_count, bool suppress_warnings,
1905
 
                            const char *messages[SLAVE_RECON_MSG_MAX])
 
1860
static int32_t try_to_reconnect(Session *session, DRIZZLE *drizzle, Master_info *mi,
 
1861
                                uint32_t *retry_count, bool suppress_warnings,
 
1862
                                const char *messages[SLAVE_RECON_MSG_MAX])
1906
1863
{
1907
1864
  mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1908
 
  thd->proc_info= _(messages[SLAVE_RECON_MSG_WAIT]);
1909
 
#ifdef SIGNAL_WITH_VIO_CLOSE
1910
 
  thd->clear_active_vio();
1911
 
#endif
1912
 
  end_server(drizzle);
 
1865
  session->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
 
1866
  drizzle_disconnect(drizzle);
1913
1867
  if ((*retry_count)++)
1914
1868
  {
1915
1869
    if (*retry_count > master_retry_count)
1916
1870
      return 1;                             // Don't retry forever
1917
 
    safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
 
1871
    safe_sleep(session, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1918
1872
               (void *) mi);
1919
1873
  }
1920
 
  if (check_io_slave_killed(thd, mi,
 
1874
  if (check_io_slave_killed(session, mi,
1921
1875
                            _(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1922
1876
    return 1;
1923
 
  thd->proc_info = _(messages[SLAVE_RECON_MSG_AFTER]);
 
1877
  session->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
1924
1878
  if (!suppress_warnings)
1925
1879
  {
1926
1880
    char buf[256], llbuff[22];
1927
1881
    snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1928
 
             IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
 
1882
             IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
1929
1883
    /*
1930
1884
      Raise a warining during registering on master/requesting dump.
1931
1885
      Log a message reading event.
1938
1892
    }
1939
1893
    else
1940
1894
    {
1941
 
      sql_print_information(buf);
 
1895
      sql_print_information("%s",buf);
1942
1896
    }
1943
1897
  }
1944
 
  if (safe_reconnect(thd, drizzle, mi, 1) || io_slave_killed(thd, mi))
 
1898
  if (safe_reconnect(session, drizzle, mi, 1) || io_slave_killed(session, mi))
1945
1899
  {
1946
1900
    if (global_system_variables.log_warnings)
1947
 
      sql_print_information(_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
 
1901
      sql_print_information("%s",_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1948
1902
    return 1;
1949
1903
  }
1950
1904
  return 0;
1955
1909
 
1956
1910
pthread_handler_t handle_slave_io(void *arg)
1957
1911
{
1958
 
  THD *thd; // needs to be first for thread_stack
 
1912
  Session *session; // needs to be first for thread_stack
1959
1913
  DRIZZLE *drizzle;
1960
1914
  Master_info *mi = (Master_info*)arg;
1961
1915
  Relay_log_info *rli= &mi->rli;
1975
1929
 
1976
1930
  mi->events_till_disconnect = disconnect_slave_event_count;
1977
1931
 
1978
 
  thd= new THD;
1979
 
  THD_CHECK_SENTRY(thd);
1980
 
  mi->io_thd = thd;
 
1932
  session= new Session;
 
1933
  Session_CHECK_SENTRY(session);
 
1934
  mi->io_session = session;
1981
1935
 
1982
1936
  pthread_detach_this_thread();
1983
 
  thd->thread_stack= (char*) &thd; // remember where our stack is
1984
 
  if (init_slave_thread(thd, SLAVE_THD_IO))
 
1937
  session->thread_stack= (char*) &session; // remember where our stack is
 
1938
  if (init_slave_thread(session, SLAVE_Session_IO))
1985
1939
  {
1986
1940
    pthread_cond_broadcast(&mi->start_cond);
1987
1941
    pthread_mutex_unlock(&mi->run_lock);
1989
1943
    goto err;
1990
1944
  }
1991
1945
  pthread_mutex_lock(&LOCK_thread_count);
1992
 
  threads.append(thd);
 
1946
  threads.append(session);
1993
1947
  pthread_mutex_unlock(&LOCK_thread_count);
1994
1948
  mi->slave_running = 1;
1995
1949
  mi->abort_slave = 0;
2003
1957
    goto err;
2004
1958
  }
2005
1959
 
2006
 
  thd_proc_info(thd, "Connecting to master");
 
1960
  session->set_proc_info("Connecting to master");
2007
1961
  // we can get killed during safe_connect
2008
 
  if (!safe_connect(thd, drizzle, mi))
 
1962
  if (!safe_connect(session, drizzle, mi))
2009
1963
  {
2010
1964
    sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
2011
1965
                            "replication started in log '%s' at position %s"),
2012
 
                          mi->user, mi->host, mi->port,
 
1966
                          mi->getUsername(), mi->getHostname(), mi->getPort(),
2013
1967
                          IO_RPL_LOG_NAME,
2014
 
                          llstr(mi->master_log_pos,llbuff));
 
1968
                          llstr(mi->getLogPosition(), llbuff));
2015
1969
  /*
2016
1970
    Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
2017
1971
    thread, since a replication event can become this much larger than
2018
1972
    the corresponding packet (query) sent from client to master.
2019
1973
  */
2020
 
    drizzle->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
 
1974
    drizzle->net.max_packet_size= session->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
2021
1975
  }
2022
1976
  else
2023
1977
  {
2029
1983
 
2030
1984
  // TODO: the assignment below should be under mutex (5.0)
2031
1985
  mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
2032
 
  thd->slave_net = &drizzle->net;
2033
 
  thd_proc_info(thd, "Checking master version");
 
1986
  session->slave_net = &drizzle->net;
 
1987
  session->set_proc_info("Checking master version");
2034
1988
  if (get_master_version_and_clock(drizzle, mi))
2035
1989
    goto err;
2036
 
  
 
1990
 
2037
1991
  if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
2038
1992
  {
2039
1993
    /*
2040
1994
      Register ourselves with the master.
2041
1995
    */
2042
 
    thd_proc_info(thd, "Registering slave on master");
 
1996
    session->set_proc_info("Registering slave on master");
2043
1997
    if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2044
1998
    {
2045
 
      if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
 
1999
      if (!check_io_slave_killed(session, mi, "Slave I/O thread killed "
2046
2000
                                 "while registering slave on master"))
2047
2001
      {
2048
2002
        sql_print_error(_("Slave I/O thread couldn't register on master"));
2049
 
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2003
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2050
2004
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
2051
2005
          goto err;
2052
2006
      }
2058
2012
    {
2059
2013
      retry_count_reg++;
2060
2014
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
2061
 
      if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2015
      if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2062
2016
                           reconnect_messages[SLAVE_RECON_ACT_REG]))
2063
2017
        goto err;
2064
2018
      goto connected;
2065
2019
    }
2066
2020
  }
2067
2021
 
2068
 
  while (!io_slave_killed(thd,mi))
 
2022
  while (!io_slave_killed(session,mi))
2069
2023
  {
2070
 
    thd_proc_info(thd, "Requesting binlog dump");
 
2024
    session->set_proc_info("Requesting binlog dump");
2071
2025
    if (request_dump(drizzle, mi, &suppress_warnings))
2072
2026
    {
2073
2027
      sql_print_error(_("Failed on request_dump()"));
2074
 
      if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while \
 
2028
      if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while \
2075
2029
requesting master dump")) ||
2076
 
          try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2030
          try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2077
2031
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2078
2032
        goto err;
2079
2033
      goto connected;
2082
2036
    {
2083
2037
      retry_count_dump++;
2084
2038
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
2085
 
      if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2039
      if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2086
2040
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2087
2041
        goto err;
2088
2042
      goto connected;
2089
2043
    }
2090
2044
 
2091
 
    while (!io_slave_killed(thd,mi))
 
2045
    while (!io_slave_killed(session,mi))
2092
2046
    {
2093
2047
      uint32_t event_len;
2094
2048
      /*
2097
2051
        important thing is to not confuse users by saying "reading" whereas
2098
2052
        we're in fact receiving nothing.
2099
2053
      */
2100
 
      thd_proc_info(thd, _("Waiting for master to send event"));
 
2054
      session->set_proc_info(_("Waiting for master to send event"));
2101
2055
      event_len= read_event(drizzle, mi, &suppress_warnings);
2102
 
      if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while "
 
2056
      if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while "
2103
2057
                                           "reading event")))
2104
2058
        goto err;
2105
2059
      if (!retry_count_event)
2106
2060
      {
2107
2061
        retry_count_event++;
2108
2062
        sql_print_information(_("Forcing to reconnect slave I/O thread"));
2109
 
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2063
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2110
2064
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2111
2065
          goto err;
2112
2066
        goto connected;
2118
2072
        switch (drizzle_error_number) {
2119
2073
        case CR_NET_PACKET_TOO_LARGE:
2120
2074
          sql_print_error(_("Log entry on master is longer than "
2121
 
                            "max_allowed_packet (%ld) on "
 
2075
                            "max_allowed_packet (%u) on "
2122
2076
                            "slave. If the entry is correct, restart the "
2123
2077
                            "server with a higher value of "
2124
2078
                            "max_allowed_packet"),
2125
 
                          thd->variables.max_allowed_packet);
 
2079
                          session->variables.max_allowed_packet);
2126
2080
          goto err;
2127
2081
        case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2128
2082
          sql_print_error(ER(drizzle_error_number), drizzle_error_number,
2134
2088
       _("Stopping slave I/O thread due to out-of-memory error from master"));
2135
2089
          goto err;
2136
2090
        }
2137
 
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2091
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2138
2092
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2139
2093
          goto err;
2140
2094
        goto connected;
2141
2095
      } // if (event_len == packet_error)
2142
2096
 
2143
2097
      retry_count=0;                    // ok event, reset retry counter
2144
 
      thd_proc_info(thd, _("Queueing master event to the relay log"));
 
2098
      session->set_proc_info(_("Queuing master event to the relay log"));
2145
2099
      if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2146
2100
      {
2147
2101
        goto err;
2148
2102
      }
2149
 
      if (flush_master_info(mi, 1))
 
2103
      if (mi->flush())
2150
2104
      {
2151
2105
        sql_print_error(_("Failed to flush master info file"));
2152
2106
        goto err;
2180
2134
// print the current replication position
2181
2135
  sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2182
2136
                          "position %s"),
2183
 
                        IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2184
 
  VOID(pthread_mutex_lock(&LOCK_thread_count));
2185
 
  thd->query = thd->db = 0; // extra safety
2186
 
  thd->query_length= thd->db_length= 0;
2187
 
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
 
2137
                        IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
 
2138
  pthread_mutex_lock(&LOCK_thread_count);
 
2139
  session->query = session->db = 0; // extra safety
 
2140
  session->query_length= session->db_length= 0;
 
2141
  pthread_mutex_unlock(&LOCK_thread_count);
2188
2142
  if (drizzle)
2189
2143
  {
2190
2144
    /*
2191
2145
      Here we need to clear the active VIO before closing the
2192
 
      connection with the master.  The reason is that THD::awake()
 
2146
      connection with the master.  The reason is that Session::awake()
2193
2147
      might be called from terminate_slave_thread() because somebody
2194
2148
      issued a STOP SLAVE.  If that happends, the close_active_vio()
2195
2149
      can be called in the middle of closing the VIO associated with
2196
2150
      the 'mysql' object, causing a crash.
2197
2151
    */
2198
 
#ifdef SIGNAL_WITH_VIO_CLOSE
2199
 
    thd->clear_active_vio();
2200
 
#endif
2201
2152
    drizzle_close(drizzle);
2202
2153
    mi->drizzle=0;
2203
2154
  }
2204
 
  write_ignored_events_info_to_relay_log(thd, mi);
2205
 
  thd_proc_info(thd, _("Waiting for slave mutex on exit"));
 
2155
  write_ignored_events_info_to_relay_log(session, mi);
 
2156
  session->set_proc_info(_("Waiting for slave mutex on exit"));
2206
2157
  pthread_mutex_lock(&mi->run_lock);
2207
2158
 
2208
2159
  /* Forget the relay log's format */
2209
2160
  delete mi->rli.relay_log.description_event_for_queue;
2210
2161
  mi->rli.relay_log.description_event_for_queue= 0;
2211
 
  // TODO: make rpl_status part of Master_info
2212
 
  change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2213
 
  assert(thd->net.buff != 0);
2214
 
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
2215
 
  close_thread_tables(thd);
 
2162
  assert(session->net.buff != 0);
 
2163
  net_end(&session->net); // destructor will not free it, because net.vio is 0
 
2164
  close_thread_tables(session);
2216
2165
  pthread_mutex_lock(&LOCK_thread_count);
2217
 
  THD_CHECK_SENTRY(thd);
2218
 
  delete thd;
 
2166
  Session_CHECK_SENTRY(session);
 
2167
  delete session;
2219
2168
  pthread_mutex_unlock(&LOCK_thread_count);
2220
2169
  mi->abort_slave= 0;
2221
2170
  mi->slave_running= 0;
2222
 
  mi->io_thd= 0;
 
2171
  mi->io_session= 0;
2223
2172
  /*
2224
2173
    Note: the order of the two following calls (first broadcast, then unlock)
2225
2174
    is important. Otherwise a killer_thread can execute between the calls and
2226
2175
    delete the mi structure leading to a crash! (see BUG#25306 for details)
2227
 
   */ 
 
2176
   */
2228
2177
  pthread_cond_broadcast(&mi->stop_cond);       // tell the world we are done
2229
2178
  pthread_mutex_unlock(&mi->run_lock);
2230
2179
  my_thread_end();
2237
2186
 
2238
2187
pthread_handler_t handle_slave_sql(void *arg)
2239
2188
{
2240
 
  THD *thd;                     /* needs to be first for thread_stack */
 
2189
  Session *session;                     /* needs to be first for thread_stack */
2241
2190
  char llbuff[22],llbuff1[22];
2242
2191
 
2243
2192
  Relay_log_info* rli = &((Master_info*)arg)->rli;
2251
2200
  errmsg= 0;
2252
2201
  rli->events_till_abort = abort_slave_event_count;
2253
2202
 
2254
 
  thd = new THD;
2255
 
  thd->thread_stack = (char*)&thd; // remember where our stack is
2256
 
  rli->sql_thd= thd;
2257
 
  
 
2203
  session = new Session;
 
2204
  session->thread_stack = (char*)&session; // remember where our stack is
 
2205
  rli->sql_session= session;
 
2206
 
2258
2207
  /* Inform waiting threads that slave has started */
2259
2208
  rli->slave_run_id++;
2260
2209
  rli->slave_running = 1;
2261
2210
 
2262
2211
  pthread_detach_this_thread();
2263
 
  if (init_slave_thread(thd, SLAVE_THD_SQL))
 
2212
  if (init_slave_thread(session, SLAVE_Session_SQL))
2264
2213
  {
2265
2214
    /*
2266
2215
      TODO: this is currently broken - slave start and change master
2271
2220
    sql_print_error(_("Failed during slave thread initialization"));
2272
2221
    goto err;
2273
2222
  }
2274
 
  thd->init_for_queries();
2275
 
  thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
 
2223
  session->init_for_queries();
 
2224
  session->temporary_tables = rli->save_temporary_tables; // restore temp tables
2276
2225
  pthread_mutex_lock(&LOCK_thread_count);
2277
 
  threads.append(thd);
 
2226
  threads.append(session);
2278
2227
  pthread_mutex_unlock(&LOCK_thread_count);
2279
2228
  /*
2280
2229
    We are going to set slave_running to 1. Assuming slave I/O thread is
2307
2256
  rli->trans_retries= 0; // start from "no error"
2308
2257
 
2309
2258
  if (init_relay_log_pos(rli,
2310
 
                         rli->group_relay_log_name,
 
2259
                         rli->group_relay_log_name.c_str(),
2311
2260
                         rli->group_relay_log_pos,
2312
2261
                         1 /*need data lock*/, &errmsg,
2313
2262
                         1 /*look for a description_event*/))
2316
2265
                    errmsg);
2317
2266
    goto err;
2318
2267
  }
2319
 
  THD_CHECK_SENTRY(thd);
 
2268
  Session_CHECK_SENTRY(session);
2320
2269
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2321
2270
  /*
2322
2271
    Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2331
2280
    assert().
2332
2281
  */
2333
2282
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2334
 
  assert(rli->sql_thd == thd);
 
2283
  assert(rli->sql_session == session);
2335
2284
 
2336
2285
  if (global_system_variables.log_warnings)
2337
2286
    sql_print_information(_("Slave SQL thread initialized, "
2339
2288
                            "position %s, relay log '%s' position: %s"),
2340
2289
                            RPL_LOG_NAME,
2341
2290
                          llstr(rli->group_master_log_pos,llbuff),
2342
 
                          rli->group_relay_log_name,
 
2291
                          rli->group_relay_log_name.c_str(),
2343
2292
                          llstr(rli->group_relay_log_pos,llbuff1));
2344
2293
 
2345
2294
  /* execute init_slave variable */
2346
2295
  if (sys_init_slave.value_length)
2347
2296
  {
2348
 
    execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
2349
 
    if (thd->is_slave_error)
 
2297
    execute_init_command(session, &sys_init_slave, &LOCK_sys_init_slave);
 
2298
    if (session->is_slave_error)
2350
2299
    {
2351
2300
      sql_print_error(_("Slave SQL thread aborted. "
2352
2301
                        "Can't execute init_slave query"));
2372
2321
 
2373
2322
  /* Read queries from the IO/THREAD until this thread is killed */
2374
2323
 
2375
 
  while (!sql_slave_killed(thd,rli))
 
2324
  while (!sql_slave_killed(session,rli))
2376
2325
  {
2377
 
    thd_proc_info(thd, _("Reading event from the relay log"));
2378
 
    assert(rli->sql_thd == thd);
2379
 
    THD_CHECK_SENTRY(thd);
2380
 
    if (exec_relay_log_event(thd,rli))
 
2326
    session->set_proc_info(_("Reading event from the relay log"));
 
2327
    assert(rli->sql_session == session);
 
2328
    Session_CHECK_SENTRY(session);
 
2329
    if (exec_relay_log_event(session,rli))
2381
2330
    {
2382
2331
      // do not scare the user if SQL thread was simply killed or stopped
2383
 
      if (!sql_slave_killed(thd,rli))
 
2332
      if (!sql_slave_killed(session,rli))
2384
2333
      {
2385
2334
        /*
2386
 
          retrieve as much info as possible from the thd and, error
 
2335
          retrieve as much info as possible from the session and, error
2387
2336
          codes and warnings and print this to the error log as to
2388
2337
          allow the user to locate the error
2389
2338
        */
2390
2339
        uint32_t const last_errno= rli->last_error().number;
2391
2340
 
2392
 
        if (thd->is_error())
 
2341
        if (session->is_error())
2393
2342
        {
2394
 
          char const *const errmsg= thd->main_da.message();
 
2343
          char const *const errmsg= session->main_da.message();
2395
2344
 
2396
2345
          if (last_errno == 0)
2397
2346
          {
2398
 
            rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), errmsg);
 
2347
            rli->report(ERROR_LEVEL, session->main_da.sql_errno(), "%s", errmsg);
2399
2348
          }
2400
 
          else if (last_errno != thd->main_da.sql_errno())
 
2349
          else if (last_errno != session->main_da.sql_errno())
2401
2350
          {
2402
2351
            sql_print_error(_("Slave (additional info): %s Error_code: %d"),
2403
 
                            errmsg, thd->main_da.sql_errno());
 
2352
                            errmsg, session->main_da.sql_errno());
2404
2353
          }
2405
2354
        }
2406
2355
 
2407
2356
        /* Print any warnings issued */
2408
 
        List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
 
2357
        List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
2409
2358
        DRIZZLE_ERROR *err;
2410
2359
        /*
2411
2360
          Added controlled slave thread cancel for replication
2452
2401
    request is detected only by the present function, not by events), so we
2453
2402
    must "proactively" clear playgrounds:
2454
2403
  */
2455
 
  rli->cleanup_context(thd, 1);
2456
 
  VOID(pthread_mutex_lock(&LOCK_thread_count));
 
2404
  rli->cleanup_context(session, 1);
 
2405
  pthread_mutex_lock(&LOCK_thread_count);
2457
2406
  /*
2458
2407
    Some extra safety, which should not been needed (normally, event deletion
2459
2408
    should already have done these assignments (each event which sets these
2460
2409
    variables is supposed to set them to 0 before terminating)).
2461
2410
  */
2462
 
  thd->query= thd->db= thd->catalog= 0;
2463
 
  thd->query_length= thd->db_length= 0;
2464
 
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
2465
 
  thd_proc_info(thd, "Waiting for slave mutex on exit");
 
2411
  session->query= session->db= session->catalog= 0;
 
2412
  session->query_length= session->db_length= 0;
 
2413
  pthread_mutex_unlock(&LOCK_thread_count);
 
2414
  session->set_proc_info("Waiting for slave mutex on exit");
2466
2415
  pthread_mutex_lock(&rli->run_lock);
2467
2416
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
2468
2417
  pthread_mutex_lock(&rli->data_lock);
2476
2425
  pthread_mutex_unlock(&rli->data_lock);
2477
2426
  pthread_cond_broadcast(&rli->data_cond);
2478
2427
  rli->ignore_log_space_limit= 0; /* don't need any lock */
2479
 
  /* we die so won't remember charset - re-update them on next thread start */
2480
 
  rli->cached_charset_invalidate();
2481
 
  rli->save_temporary_tables = thd->temporary_tables;
 
2428
  rli->save_temporary_tables = session->temporary_tables;
2482
2429
 
2483
2430
  /*
2484
2431
    TODO: see if we can do this conditionally in next_event() instead
2485
2432
    to avoid unneeded position re-init
2486
2433
  */
2487
 
  thd->temporary_tables = 0; // remove tempation from destructor to close them
2488
 
  assert(thd->net.buff != 0);
2489
 
  net_end(&thd->net); // destructor will not free it, because we are weird
2490
 
  assert(rli->sql_thd == thd);
2491
 
  THD_CHECK_SENTRY(thd);
2492
 
  rli->sql_thd= 0;
 
2434
  session->temporary_tables = 0; // remove tempation from destructor to close them
 
2435
  assert(session->net.buff != 0);
 
2436
  net_end(&session->net); // destructor will not free it, because we are weird
 
2437
  assert(rli->sql_session == session);
 
2438
  Session_CHECK_SENTRY(session);
 
2439
  rli->sql_session= 0;
2493
2440
  pthread_mutex_lock(&LOCK_thread_count);
2494
 
  THD_CHECK_SENTRY(thd);
2495
 
  delete thd;
 
2441
  Session_CHECK_SENTRY(session);
 
2442
  delete session;
2496
2443
  pthread_mutex_unlock(&LOCK_thread_count);
2497
2444
 /*
2498
2445
  Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
2499
2446
  is important. Otherwise a killer_thread can execute between the calls and
2500
2447
  delete the mi structure leading to a crash! (see BUG#25306 for details)
2501
 
 */ 
 
2448
 */
2502
2449
  pthread_cond_broadcast(&rli->stop_cond);
2503
2450
  pthread_mutex_unlock(&rli->run_lock);  // tell the world we are done
2504
 
  
 
2451
 
2505
2452
  my_thread_end();
2506
2453
  pthread_exit(0);
2507
2454
  return(0);                               // Can't return anything here
2517
2464
  int32_t error = 1;
2518
2465
  uint32_t num_bytes;
2519
2466
  bool cev_not_written;
2520
 
  THD *thd = mi->io_thd;
 
2467
  Session *session = mi->io_session;
2521
2468
  NET *net = &mi->drizzle->net;
2522
2469
 
2523
2470
  if (unlikely(!cev->is_valid()))
2524
2471
    return(1);
2525
2472
 
2526
 
  if (!rpl_filter->db_ok(cev->db))
2527
 
  {
2528
 
    skip_load_data_infile(net);
2529
 
    return(0);
2530
 
  }
2531
2473
  assert(cev->inited_from_old);
2532
 
  thd->file_id = cev->file_id = mi->file_id++;
2533
 
  thd->server_id = cev->server_id;
 
2474
  session->file_id = cev->file_id = mi->file_id++;
 
2475
  session->server_id = cev->server_id;
2534
2476
  cev_not_written = 1;
2535
2477
 
2536
2478
  if (unlikely(net_request_file(net,cev->fname)))
2546
2488
    in the loop
2547
2489
  */
2548
2490
  {
2549
 
    Append_block_log_event aev(thd,0,0,0,0);
 
2491
    Append_block_log_event aev(session,0,0,0,0);
2550
2492
 
2551
2493
    for (;;)
2552
2494
    {
2559
2501
      if (unlikely(!num_bytes)) /* eof */
2560
2502
      {
2561
2503
        /* 3.23 master wants it */
2562
 
        net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0);
 
2504
        net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0);
2563
2505
        /*
2564
2506
          If we wrote Create_file_log_event, then we need to write
2565
2507
          Execute_load_log_event. If we did not write Create_file_log_event,
2568
2510
        */
2569
2511
        if (unlikely(cev_not_written))
2570
2512
          break;
2571
 
        Execute_load_log_event xev(thd,0,0);
 
2513
        Execute_load_log_event xev(session,0,0);
2572
2514
        xev.log_pos = cev->log_pos;
2573
2515
        if (unlikely(mi->rli.relay_log.append(&xev)))
2574
2516
        {
2646
2588
    return(1);
2647
2589
 
2648
2590
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2649
 
  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
2650
 
  mi->master_log_pos= rev->pos;
 
2591
  mi->setLogName(rev->new_log_ident.c_str());
 
2592
  mi->setLogPosition(rev->pos);
2651
2593
  /*
2652
2594
    If we do not do this, we will be getting the first
2653
2595
    rotate event forever, so we need to not disconnect after one.
2696
2638
  */
2697
2639
  if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
2698
2640
  {
2699
 
    if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
 
2641
    if (unlikely(!(tmp_buf=(char*)malloc(event_len+1))))
2700
2642
    {
2701
2643
      mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2702
2644
                 ER(ER_SLAVE_FATAL_ERROR), _("Memory allocation failed"));
2728
2670
                      "master could be corrupt but a more likely cause "
2729
2671
                      "of this is a bug"),
2730
2672
                    errmsg);
2731
 
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
 
2673
    free((char*) tmp_buf);
2732
2674
    return(1);
2733
2675
  }
2734
2676
 
2735
2677
  pthread_mutex_lock(&mi->data_lock);
2736
 
  ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
 
2678
  ev->log_pos= mi->getLogPosition(); /* 3.23 events don't contain log_pos */
2737
2679
  switch (ev->get_type_code()) {
2738
2680
  case STOP_EVENT:
2739
2681
    ignore_event= 1;
2762
2704
    ev->log_pos+= inc_pos;
2763
2705
    int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2764
2706
    delete ev;
2765
 
    mi->master_log_pos += inc_pos;
 
2707
    mi->incrementLogPosition(inc_pos);
2766
2708
    pthread_mutex_unlock(&mi->data_lock);
2767
 
    my_free((char*)tmp_buf, MYF(0));
 
2709
    free((char*)tmp_buf);
2768
2710
    return(error);
2769
2711
  }
2770
2712
  default:
2788
2730
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2789
2731
  }
2790
2732
  delete ev;
2791
 
  mi->master_log_pos+= inc_pos;
 
2733
  mi->incrementLogPosition(inc_pos);
2792
2734
  pthread_mutex_unlock(&mi->data_lock);
2793
2735
  return(0);
2794
2736
}
2814
2756
                      "master could be corrupt but a more likely cause of "
2815
2757
                      "this is a bug"),
2816
2758
                    errmsg);
2817
 
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
 
2759
    free((char*) tmp_buf);
2818
2760
    return(1);
2819
2761
  }
2820
2762
  pthread_mutex_lock(&mi->data_lock);
2842
2784
  }
2843
2785
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2844
2786
  delete ev;
2845
 
  mi->master_log_pos+= inc_pos;
 
2787
  mi->incrementLogPosition(inc_pos);
2846
2788
err:
2847
2789
  pthread_mutex_unlock(&mi->data_lock);
2848
2790
  return(0);
2982
2924
      goto err;
2983
2925
    }
2984
2926
    mi->received_heartbeats++;
2985
 
    /* 
 
2927
    /*
2986
2928
       compare local and event's versions of log_file, log_pos.
2987
 
       
 
2929
 
2988
2930
       Heartbeat is sent only after an event corresponding to the corrdinates
2989
2931
       the heartbeat carries.
2990
2932
       Slave can not have a difference in coordinates except in the only
2994
2936
 
2995
2937
       TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2996
2938
    */
2997
 
    if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
2998
 
         && mi->master_log_name != NULL)
2999
 
        || mi->master_log_pos != hb.log_pos)
 
2939
    if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
 
2940
        || mi->getLogPosition() != hb.log_pos)
3000
2941
    {
3001
2942
      /* missed events of heartbeat from the past */
3002
2943
      error= ER_SLAVE_HEARTBEAT_FAILURE;
3011
2952
    goto skip_relay_logging;
3012
2953
  }
3013
2954
  break;
3014
 
    
 
2955
 
3015
2956
  default:
3016
2957
    inc_pos= event_len;
3017
2958
    break;
3053
2994
        buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3054
2995
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3055
2996
    {
3056
 
      mi->master_log_pos+= inc_pos;
3057
 
      memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
 
2997
      mi->incrementLogPosition(inc_pos);
 
2998
      memcpy(rli->ign_master_log_name_end, mi->getLogName(), FN_REFLEN);
3058
2999
      assert(rli->ign_master_log_name_end[0]);
3059
 
      rli->ign_master_log_pos_end= mi->master_log_pos;
 
3000
      rli->ign_master_log_pos_end= mi->getLogPosition();
3060
3001
    }
3061
3002
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3062
3003
  }
3065
3006
    /* write the event to the relay log */
3066
3007
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3067
3008
    {
3068
 
      mi->master_log_pos+= inc_pos;
 
3009
      mi->incrementLogPosition(inc_pos);
3069
3010
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3070
3011
    }
3071
3012
    else
3077
3018
  pthread_mutex_unlock(log_lock);
3078
3019
 
3079
3020
skip_relay_logging:
3080
 
  
 
3021
 
3081
3022
err:
3082
3023
  pthread_mutex_unlock(&mi->data_lock);
3083
3024
  if (error)
3122
3063
 
3123
3064
  SYNPOSIS
3124
3065
    safe_connect()
3125
 
    thd                 Thread handler for slave
 
3066
    session                 Thread handler for slave
3126
3067
    DRIZZLE               DRIZZLE connection handle
3127
3068
    mi                  Replication handle
3128
3069
 
3131
3072
    #   Error
3132
3073
*/
3133
3074
 
3134
 
static int32_t safe_connect(THD* thd, DRIZZLE *drizzle, Master_info* mi)
 
3075
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi)
3135
3076
{
3136
 
  return(connect_to_master(thd, drizzle, mi, 0, 0));
 
3077
  return(connect_to_master(session, drizzle, mi, 0, 0));
3137
3078
}
3138
3079
 
3139
3080
 
3146
3087
    master_retry_count times
3147
3088
*/
3148
3089
 
3149
 
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3150
 
                             bool reconnect, bool suppress_warnings)
 
3090
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
 
3091
                                 bool reconnect, bool suppress_warnings)
3151
3092
{
3152
3093
  int32_t slave_was_killed;
3153
3094
  int32_t last_errno= -2;                           // impossible error
3162
3103
  drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3163
3104
  drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3164
3105
 
3165
 
  drizzle_options(drizzle, DRIZZLE_SET_CHARSET_NAME, default_charset_info->csname);
3166
 
  /* This one is not strictly needed but we have it here for completeness */
3167
 
  drizzle_options(drizzle, DRIZZLE_SET_CHARSET_DIR, (char *) charsets_dir);
3168
 
 
3169
 
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
 
3106
  while (!(slave_was_killed = io_slave_killed(session,mi)) &&
3170
3107
         (reconnect ? drizzle_reconnect(drizzle) != 0 :
3171
 
          drizzle_connect(drizzle, mi->host, mi->user, mi->password, 0,
3172
 
                             mi->port, 0, client_flag) == 0))
 
3108
          drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
 
3109
                             mi->getPort(), 0, client_flag) == 0))
3173
3110
  {
3174
3111
    /* Don't repeat last error */
3175
3112
    if ((int32_t)drizzle_errno(drizzle) != last_errno)
3180
3117
                 _("error %s to master '%s@%s:%d'"
3181
3118
                   " - retry-time: %d  retries: %u"),
3182
3119
                 (reconnect ? _("reconnecting") : _("connecting")),
3183
 
                 mi->user, mi->host, mi->port,
3184
 
                 mi->connect_retry, master_retry_count);
 
3120
                 mi->getUsername(), mi->getHostname(), mi->getPort(),
 
3121
                 mi->getConnectionRetry(), master_retry_count);
3185
3122
    }
3186
3123
    /*
3187
3124
      By default we try forever. The reason is that failure will trigger
3192
3129
    if (++err_count == master_retry_count)
3193
3130
    {
3194
3131
      slave_was_killed=1;
3195
 
      if (reconnect)
3196
 
        change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
3197
3132
      break;
3198
3133
    }
3199
 
    safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
 
3134
    safe_sleep(session,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3200
3135
               (void*)mi);
3201
3136
  }
3202
3137
 
3207
3142
      if (!suppress_warnings && global_system_variables.log_warnings)
3208
3143
        sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3209
3144
                                "replication resumed in log '%s' at "
3210
 
                                "position %s"), mi->user,
3211
 
                                mi->host, mi->port,
 
3145
                                "position %s"), mi->getUsername(),
 
3146
                                mi->getHostname(), mi->getPort(),
3212
3147
                                IO_RPL_LOG_NAME,
3213
 
                                llstr(mi->master_log_pos,llbuff));
3214
 
    }
3215
 
    else
3216
 
    {
3217
 
      change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
3218
 
      general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d",
3219
 
                        mi->user, mi->host, mi->port);
3220
 
    }
3221
 
#ifdef SIGNAL_WITH_VIO_CLOSE
3222
 
    thd->set_active_vio(drizzle->net.vio);
3223
 
#endif
 
3148
                                llstr(mi->getLogPosition(),llbuff));
 
3149
    }
3224
3150
  }
3225
3151
  drizzle->reconnect= 1;
3226
3152
  return(slave_was_killed);
3235
3161
    master_retry_count times
3236
3162
*/
3237
3163
 
3238
 
static int32_t safe_reconnect(THD* thd, DRIZZLE *drizzle, Master_info* mi,
 
3164
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
3239
3165
                          bool suppress_warnings)
3240
3166
{
3241
 
  return(connect_to_master(thd, drizzle, mi, 1, suppress_warnings));
 
3167
  return(connect_to_master(session, drizzle, mi, 1, suppress_warnings));
3242
3168
}
3243
3169
 
3244
3170
 
3273
3199
 
3274
3200
bool flush_relay_log_info(Relay_log_info* rli)
3275
3201
{
3276
 
  bool error=0;
 
3202
  bool error= 0;
3277
3203
 
3278
3204
  if (unlikely(rli->no_storage))
3279
3205
    return(0);
3280
3206
 
3281
 
  IO_CACHE *file = &rli->info_file;
3282
 
  char buff[FN_REFLEN*2+22*2+4], *pos;
3283
 
 
3284
 
  my_b_seek(file, 0L);
3285
 
  pos=stpcpy(buff, rli->group_relay_log_name);
3286
 
  *pos++='\n';
3287
 
  pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
3288
 
  *pos++='\n';
3289
 
  pos=stpcpy(pos, rli->group_master_log_name);
3290
 
  *pos++='\n';
3291
 
  pos=int64_t2str(rli->group_master_log_pos, pos, 10);
3292
 
  *pos='\n';
3293
 
  if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
3294
 
    error=1;
3295
 
  if (flush_io_cache(file))
3296
 
    error=1;
3297
 
 
3298
 
  /* Flushing the relay log is done by the slave I/O thread */
3299
3207
  return(error);
3300
3208
}
3301
3209
 
3310
3218
  assert(rli->cur_log_fd == -1);
3311
3219
 
3312
3220
  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3313
 
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
3314
 
                                   errmsg)) <0)
 
3221
  if ((rli->cur_log_fd=open_binlog(cur_log, rli->event_relay_log_name.c_str(), errmsg)) < 0)
3315
3222
    return(0);
3316
3223
  /*
3317
3224
    We want to start exactly where we was before:
3318
3225
    relay_log_pos       Current log pos
3319
3226
    pending             Number of bytes already processed from the event
3320
3227
  */
3321
 
  rli->event_relay_log_pos= max(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
 
3228
  rli->event_relay_log_pos= cmax(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
3322
3229
  my_b_seek(cur_log,rli->event_relay_log_pos);
3323
3230
  return(cur_log);
3324
3231
}
3330
3237
  IO_CACHE* cur_log = rli->cur_log;
3331
3238
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3332
3239
  const char* errmsg=0;
3333
 
  THD* thd = rli->sql_thd;
 
3240
  Session* session = rli->sql_session;
3334
3241
 
3335
 
  assert(thd != 0);
 
3242
  assert(session != 0);
3336
3243
 
3337
3244
  if (abort_slave_event_count && !rli->events_till_abort--)
3338
3245
    return(0);
3346
3253
  */
3347
3254
  safe_mutex_assert_owner(&rli->data_lock);
3348
3255
 
3349
 
  while (!sql_slave_killed(thd,rli))
 
3256
  while (!sql_slave_killed(session,rli))
3350
3257
  {
3351
3258
    /*
3352
3259
      We can have two kinds of log reading:
3379
3286
        hot_log=0;                              // Using old binary log
3380
3287
      }
3381
3288
    }
3382
 
    /* 
 
3289
    /*
3383
3290
      As there is no guarantee that the relay is open (for example, an I/O
3384
3291
      error during a write by the slave I/O thread may have closed it), we
3385
3292
      have to test it.
3405
3312
                                      rli->relay_log.description_event_for_exec)))
3406
3313
 
3407
3314
    {
3408
 
      assert(thd==rli->sql_thd);
 
3315
      assert(session==rli->sql_session);
3409
3316
      /*
3410
3317
        read it while we have a lock, to avoid a mutex lock in
3411
3318
        inc_event_relay_log_pos()
3415
3322
        pthread_mutex_unlock(log_lock);
3416
3323
      return(ev);
3417
3324
    }
3418
 
    assert(thd==rli->sql_thd);
 
3325
    assert(session==rli->sql_session);
3419
3326
    if (opt_reckless_slave)                     // For mysql-test
3420
3327
      cur_log->error = 0;
3421
3328
    if (cur_log->error < 0)
3521
3428
        pthread_mutex_unlock(&rli->log_space_lock);
3522
3429
        pthread_cond_broadcast(&rli->log_space_cond);
3523
3430
        // Note that wait_for_update_relay_log unlocks lock_log !
3524
 
        rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
 
3431
        rli->relay_log.wait_for_update_relay_log(rli->sql_session);
3525
3432
        // re-acquire data lock since we released it earlier
3526
3433
        pthread_mutex_lock(&rli->data_lock);
3527
3434
        rli->last_master_timestamp= save_timestamp;
3553
3460
        if (rli->relay_log.purge_first_log
3554
3461
            (rli,
3555
3462
             rli->group_relay_log_pos == rli->event_relay_log_pos
3556
 
             && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
 
3463
             && !strcmp(rli->group_relay_log_name.c_str(), rli->event_relay_log_name.c_str())))
3557
3464
        {
3558
3465
          errmsg = "Error purging processed logs";
3559
3466
          goto err;
3574
3481
          goto err;
3575
3482
        }
3576
3483
        rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3577
 
        strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
3578
 
                sizeof(rli->event_relay_log_name)-1);
 
3484
        rli->event_relay_log_name.assign(rli->linfo.log_file_name);
3579
3485
        flush_relay_log_info(rli);
3580
3486
      }
3581
3487
 
3723
3629
{
3724
3630
  struct st_version_range_for_one_bug {
3725
3631
    uint32_t        bug_id;
3726
 
    const uchar introduced_in[3]; // first version with bug
3727
 
    const uchar fixed_in[3];      // first version with fix
 
3632
    const unsigned char introduced_in[3]; // first version with bug
 
3633
    const unsigned char fixed_in[3];      // first version with fix
3728
3634
  };
3729
3635
  static struct st_version_range_for_one_bug versions_for_all_bugs[]=
3730
3636
  {
3733
3639
    {33029, { 5, 0,  0 }, { 5, 0, 58 } },
3734
3640
    {33029, { 5, 1,  0 }, { 5, 1, 12 } },
3735
3641
  };
3736
 
  const uchar *master_ver=
 
3642
  const unsigned char *master_ver=
3737
3643
    rli->relay_log.description_event_for_exec->server_version_split;
3738
3644
 
3739
3645
  assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3741
3647
  for (uint32_t i= 0;
3742
3648
       i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3743
3649
  {
3744
 
    const uchar *introduced_in= versions_for_all_bugs[i].introduced_in,
 
3650
    const unsigned char *introduced_in= versions_for_all_bugs[i].introduced_in,
3745
3651
      *fixed_in= versions_for_all_bugs[i].fixed_in;
3746
3652
    if ((versions_for_all_bugs[i].bug_id == bug_id) &&
3747
3653
        (memcmp(introduced_in, master_ver, 3) <= 0) &&
3792
3698
 
3793
3699
   Detect buggy master to work around.
3794
3700
 */
3795
 
bool rpl_master_erroneous_autoinc(THD *thd)
 
3701
bool rpl_master_erroneous_autoinc(Session *session)
3796
3702
{
3797
 
  if (active_mi && active_mi->rli.sql_thd == thd)
 
3703
  if (active_mi && active_mi->rli.sql_session == session)
3798
3704
  {
3799
3705
    Relay_log_info *rli= &active_mi->rli;
3800
3706
    return rpl_master_has_bug(rli, 33029, false);
3810
3716
/**
3811
3717
  @} (end of group Replication)
3812
3718
*/
3813
 
 
3814
 
#endif /* HAVE_REPLICATION */