~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_repl.cc

  • Committer: Monty Taylor
  • Date: 2008-10-16 06:32:30 UTC
  • mto: (511.1.5 codestyle)
  • mto: This revision was merged to the branch mainline in revision 521.
  • Revision ID: monty@inaugust.com-20081016063230-4brxsra0qsmsg84q
Added -Wunused-macros.

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
   along with this program; if not, write to the Free Software
14
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
 
#include "mysql_priv.h"
17
 
#ifdef HAVE_REPLICATION
 
16
#include <drizzled/server_includes.h>
18
17
 
19
18
#include "rpl_mi.h"
20
19
#include "sql_repl.h"
21
20
#include "log_event.h"
22
21
#include "rpl_filter.h"
23
 
#include <my_dir.h>
 
22
#include <drizzled/drizzled_error_messages.h>
24
23
 
25
24
int max_binlog_dump_events = 0; // unlimited
26
25
 
53
52
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
54
53
 
55
54
  char* p = log_file_name+dirname_length(log_file_name);
56
 
  uint ident_len = (uint) strlen(p);
57
 
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
 
55
  uint32_t ident_len = (uint32_t) strlen(p);
 
56
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
58
57
  int4store(header + SERVER_ID_OFFSET, server_id);
59
58
  int4store(header + EVENT_LEN_OFFSET, event_len);
60
59
  int2store(header + FLAGS_OFFSET, 0);
66
65
  int8store(buf+R_POS_OFFSET,position);
67
66
  packet->append(buf, ROTATE_HEADER_LEN);
68
67
  packet->append(p,ident_len);
69
 
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
 
68
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
70
69
  {
71
70
    *errmsg = "failed on my_net_write()";
72
71
    return(-1);
83
82
  const char *errmsg = 0;
84
83
  int old_timeout;
85
84
  unsigned long packet_len;
86
 
  uchar buf[IO_SIZE];                           // It's safe to alloc this
 
85
  unsigned char buf[IO_SIZE];                           // It's safe to alloc this
87
86
 
88
87
  /*
89
88
    The client might be slow loading the data, give him wait_timeout to do
98
97
  */
99
98
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
100
99
  {
101
 
    errmsg = "while reading file name";
 
100
    errmsg = _("Failed in send_file() while reading file name");
102
101
    goto err;
103
102
  }
104
103
 
111
110
 
112
111
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
113
112
  {
114
 
    errmsg = "on open of file";
 
113
    errmsg = _("Failed in send_file() on open of file");
115
114
    goto err;
116
115
  }
117
116
 
119
118
  {
120
119
    if (my_net_write(net, buf, bytes))
121
120
    {
122
 
      errmsg = "while writing data to client";
 
121
      errmsg = _("Failed in send_file() while writing data to client");
123
122
      goto err;
124
123
    }
125
124
  }
126
125
 
127
126
 end:
128
 
  if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
 
127
  if (my_net_write(net, (unsigned char*) "", 0) || net_flush(net) ||
129
128
      (my_net_read(net) == packet_error))
130
129
  {
131
 
    errmsg = "while negotiating file transfer close";
 
130
    errmsg = _("Failed in send_file() while negotiating file transfer close");
132
131
    goto err;
133
132
  }
134
133
  error = 0;
139
138
    (void) my_close(fd, MYF(0));
140
139
  if (errmsg)
141
140
  {
142
 
    sql_print_error("Failed in send_file() %s", errmsg);
 
141
    sql_print_error(errmsg);
143
142
  }
144
143
  return(error);
145
144
}
210
209
    if ((linfo = tmp->current_linfo))
211
210
    {
212
211
      pthread_mutex_lock(&linfo->lock);
213
 
      result = !bcmp((uchar*) log_name, (uchar*) linfo->log_file_name,
214
 
                     log_name_len);
 
212
      result = !memcmp(log_name, linfo->log_file_name, log_name_len);
215
213
      pthread_mutex_unlock(&linfo->lock);
216
214
      if (result)
217
215
        break;
224
222
 
225
223
bool purge_error_message(THD* thd, int res)
226
224
{
227
 
  uint errmsg= 0;
 
225
  uint32_t errmsg= 0;
228
226
 
229
227
  switch (res)  {
230
228
  case 0: break;
317
315
*/ 
318
316
static uint64_t get_heartbeat_period(THD * thd)
319
317
{
320
 
  my_bool null_value;
 
318
  bool null_value;
321
319
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
322
320
  user_var_entry *entry= 
323
 
    (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
 
321
    (user_var_entry*) hash_search(&thd->user_vars, (unsigned char*) name.str,
324
322
                                  name.length);
325
323
  return entry? entry->val_int(&null_value) : 0;
326
324
}
353
351
 
354
352
  char* p= coord->file_name + dirname_length(coord->file_name);
355
353
 
356
 
  uint ident_len = strlen(p);
357
 
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
 
354
  uint32_t ident_len = strlen(p);
 
355
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN;
358
356
  int4store(header + SERVER_ID_OFFSET, server_id);
359
357
  int4store(header + EVENT_LEN_OFFSET, event_len);
360
358
  int2store(header + FLAGS_OFFSET, 0);
364
362
  packet->append(header, sizeof(header));
365
363
  packet->append(p, ident_len);             // log_file_name
366
364
 
367
 
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
 
365
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) ||
368
366
      net_flush(net))
369
367
  {
370
368
    return(-1);
378
376
*/
379
377
 
380
378
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
381
 
                       ushort flags)
 
379
                       uint16_t flags)
382
380
{
383
381
  LOG_INFO linfo;
384
382
  char *log_file_name = linfo.log_file_name;
392
390
  pthread_mutex_t *log_lock;
393
391
  bool binlog_can_be_corrupted= false;
394
392
 
395
 
  bzero((char*) &log,sizeof(log));
 
393
  memset(&log, 0, sizeof(log));
396
394
  /* 
397
395
     heartbeat_period from @master_heartbeat_period user variable
398
396
  */
401
399
  struct event_coordinates coord_buf;
402
400
  struct timespec *heartbeat_ts= NULL;
403
401
  struct event_coordinates *coord= NULL;
404
 
  if (heartbeat_period != 0LL)
 
402
  if (heartbeat_period != 0L)
405
403
  {
406
404
    heartbeat_ts= &heartbeat_buf;
407
405
    set_timespec_nsec(*heartbeat_ts, 0);
541
539
           to avoid destroying temp tables.
542
540
          */
543
541
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
544
 
                   ST_CREATED_OFFSET+1, (ulong) 0);
 
542
                   ST_CREATED_OFFSET+1, (uint32_t) 0);
545
543
         /* send it */
546
 
         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
 
544
         if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
547
545
         {
548
546
           errmsg = "Failed on my_net_write()";
549
547
           my_errno= ER_UNKNOWN_ERROR;
597
595
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
598
596
        binlog_can_be_corrupted= false;
599
597
 
600
 
      if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
 
598
      if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
601
599
      {
602
600
        errmsg = "Failed on my_net_write()";
603
601
        my_errno= ER_UNKNOWN_ERROR;
688
686
          {
689
687
            if (coord)
690
688
            {
691
 
              assert(heartbeat_ts && heartbeat_period != 0LL);
 
689
              assert(heartbeat_ts && heartbeat_period != 0L);
692
690
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
693
691
            }
694
692
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
695
 
            assert(ret == 0 || (heartbeat_period != 0LL && coord != NULL));
 
693
            assert(ret == 0 || (heartbeat_period != 0L && coord != NULL));
696
694
            if (ret == ETIMEDOUT || ret == ETIME)
697
695
            {
698
696
              if (send_heartbeat_event(net, packet, coord))
720
718
 
721
719
        if (read_packet)
722
720
        {
723
 
          thd_proc_info(thd, "Sending binlog event to slave");
724
 
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
 
721
          thd->set_proc_info("Sending binlog event to slave");
 
722
          if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) )
725
723
          {
726
724
            errmsg = "Failed on my_net_write()";
727
725
            my_errno= ER_UNKNOWN_ERROR;
758
756
      bool loop_breaker = 0;
759
757
      /* need this to break out of the for loop from switch */
760
758
 
761
 
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
 
759
      thd->set_proc_info("Finished reading one binlog; switching to next binlog");
762
760
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
763
761
      case LOG_INFO_EOF:
764
762
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
806
804
  (void)my_close(file, MYF(MY_WME));
807
805
 
808
806
  my_eof(thd);
809
 
  thd_proc_info(thd, "Waiting to finalize termination");
 
807
  thd->set_proc_info("Waiting to finalize termination");
810
808
  pthread_mutex_lock(&LOCK_thread_count);
811
809
  thd->current_linfo = 0;
812
810
  pthread_mutex_unlock(&LOCK_thread_count);
813
811
  return;
814
812
 
815
813
err:
816
 
  thd_proc_info(thd, "Waiting to finalize termination");
 
814
  thd->set_proc_info("Waiting to finalize termination");
817
815
  end_io_cache(&log);
818
816
  /*
819
817
    Exclude  iteration through thread list
850
848
    thread_mask&= thd->lex->slave_thd_opt;
851
849
  if (thread_mask) //some threads are stopped, start them
852
850
  {
853
 
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
854
 
                         thread_mask))
 
851
    if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
855
852
      slave_errno=ER_MASTER_INFO;
856
 
    else if (server_id_supplied && *mi->host)
 
853
    else if (server_id_supplied && *mi->getHostname())
857
854
    {
858
855
      /*
859
856
        If we will start SQL thread we will care about UNTIL options If
911
908
 
912
909
          /* Issuing warning then started without --skip-slave-start */
913
910
          if (!opt_skip_slave_start)
914
 
            push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
 
911
            push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
915
912
                         ER_MISSING_SKIP_SLAVE,
916
913
                         ER(ER_MISSING_SKIP_SLAVE));
917
914
        }
919
916
        pthread_mutex_unlock(&mi->rli.data_lock);
920
917
      }
921
918
      else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
922
 
        push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
 
919
        push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
923
920
                     ER(ER_UNTIL_COND_IGNORED));
924
921
 
925
922
      if (!slave_errno)
935
932
  else
936
933
  {
937
934
    /* no error if all threads are already started, only a warning */
938
 
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
 
935
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
939
936
                 ER(ER_SLAVE_WAS_RUNNING));
940
937
  }
941
938
 
960
957
  if (!thd)
961
958
    thd = current_thd;
962
959
 
963
 
  thd_proc_info(thd, "Killing slave");
 
960
  thd->set_proc_info("Killing slave");
964
961
  int thread_mask;
965
962
  lock_slave_threads(mi);
966
963
  // Get a mask of _running_ threads
983
980
  {
984
981
    //no error if both threads are already stopped, only a warning
985
982
    slave_errno= 0;
986
 
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
 
983
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
987
984
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
988
985
  }
989
986
  unlock_slave_threads(mi);
990
 
  thd_proc_info(thd, 0);
 
987
  thd->set_proc_info(0);
991
988
 
992
989
  if (slave_errno)
993
990
  {
1021
1018
  struct stat stat_area;
1022
1019
  char fname[FN_REFLEN];
1023
1020
  int thread_mask= 0, error= 0;
1024
 
  uint sql_errno=0;
 
1021
  uint32_t sql_errno=0;
1025
1022
  const char* errmsg=0;
1026
1023
 
1027
1024
  lock_slave_threads(mi);
1033
1030
    goto err;
1034
1031
  }
1035
1032
 
1036
 
  ha_reset_slave(thd);
1037
 
 
1038
1033
  // delete relay logs, clear relay log coordinates
1039
1034
  if ((error= purge_relay_logs(&mi->rli, thd,
1040
1035
                               1 /* just reset */,
1042
1037
    goto err;
1043
1038
 
1044
1039
  /* Clear master's log coordinates */
1045
 
  init_master_log_pos(mi);
 
1040
  mi->reset();
1046
1041
  /*
1047
1042
     Reset errors (the idea is that we forget about the
1048
1043
     old master).
1051
1046
  mi->rli.clear_until_condition();
1052
1047
 
1053
1048
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1054
 
  end_master_info(mi);
 
1049
  mi->end_master_info();
1055
1050
  // and delete these two files
1056
1051
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
1057
1052
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1094
1089
*/
1095
1090
 
1096
1091
 
1097
 
void kill_zombie_dump_threads(uint32 slave_server_id)
 
1092
void kill_zombie_dump_threads(uint32_t slave_server_id)
1098
1093
{
1099
1094
  pthread_mutex_lock(&LOCK_thread_count);
1100
1095
  I_List_iterator<THD> it(threads);
1138
1133
    return(true);
1139
1134
  }
1140
1135
 
1141
 
  thd_proc_info(thd, "Changing master");
 
1136
  thd->set_proc_info("Changing master");
1142
1137
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1143
1138
  // TODO: see if needs re-write
1144
 
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
1145
 
                       thread_mask))
 
1139
  if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
1146
1140
  {
1147
1141
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1148
1142
    unlock_slave_threads(mi);
1161
1155
  */
1162
1156
 
1163
1157
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1164
 
  {
1165
 
    mi->master_log_name[0] = 0;
1166
 
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
1167
 
  }
 
1158
    mi->reset();
1168
1159
 
1169
1160
  if (lex_mi->log_file_name)
1170
 
    strmake(mi->master_log_name, lex_mi->log_file_name,
1171
 
            sizeof(mi->master_log_name)-1);
 
1161
    mi->setLogName(lex_mi->log_file_name);
1172
1162
  if (lex_mi->pos)
1173
1163
  {
1174
 
    mi->master_log_pos= lex_mi->pos;
 
1164
    mi->setLogPosition(lex_mi->pos);
1175
1165
  }
1176
1166
 
1177
1167
  if (lex_mi->host)
1178
 
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
 
1168
    mi->setHost(lex_mi->host, lex_mi->port);
1179
1169
  if (lex_mi->user)
1180
 
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
 
1170
    mi->setUsername(lex_mi->user);
1181
1171
  if (lex_mi->password)
1182
 
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
1183
 
  if (lex_mi->port)
1184
 
    mi->port = lex_mi->port;
 
1172
    mi->setPassword(lex_mi->password);
1185
1173
  if (lex_mi->connect_retry)
1186
1174
    mi->connect_retry = lex_mi->connect_retry;
1187
1175
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1188
1176
    mi->heartbeat_period = lex_mi->heartbeat_period;
1189
1177
  else
1190
 
    mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
 
1178
    mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
1191
1179
                                      (slave_net_timeout/2.0));
1192
 
  mi->received_heartbeats= 0LL; // counter lives until master is CHANGEd
1193
 
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1194
 
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
1195
 
 
1196
 
  if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1197
 
    mi->ssl_verify_server_cert=
1198
 
      (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
1199
 
 
1200
 
  if (lex_mi->ssl_ca)
1201
 
    strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
1202
 
  if (lex_mi->ssl_capath)
1203
 
    strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
1204
 
  if (lex_mi->ssl_cert)
1205
 
    strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
1206
 
  if (lex_mi->ssl_cipher)
1207
 
    strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
1208
 
  if (lex_mi->ssl_key)
1209
 
    strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
1210
 
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
1211
 
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
1212
 
      lex_mi->ssl_verify_server_cert )
1213
 
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
1214
 
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
 
1180
  mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
1215
1181
 
1216
1182
  if (lex_mi->relay_log_name)
1217
1183
  {
1218
1184
    need_relay_log_purge= 0;
1219
 
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
1220
 
            sizeof(mi->rli.group_relay_log_name)-1);
1221
 
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
1222
 
            sizeof(mi->rli.event_relay_log_name)-1);
 
1185
    mi->rli.event_relay_log_name.assign(lex_mi->relay_log_name);
1223
1186
  }
1224
1187
 
1225
1188
  if (lex_mi->relay_log_pos)
1248
1211
   {
1249
1212
     /*
1250
1213
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1251
 
       not initialized), so we use a max().
 
1214
       not initialized), so we use a cmax().
1252
1215
       What happens to mi->rli.master_log_pos during the initialization stages
1253
1216
       of replication is not 100% clear, so we guard against problems using
1254
 
       max().
 
1217
       cmax().
1255
1218
      */
1256
 
     mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
1257
 
                              mi->rli.group_master_log_pos);
1258
 
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
1259
 
             sizeof(mi->master_log_name)-1);
 
1219
     mi->setLogPosition(((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
 
1220
                         ? BIN_LOG_HEADER_SIZE
 
1221
                         : mi->rli.group_master_log_pos));
 
1222
     mi->setLogName(mi->rli.group_master_log_name.c_str());
1260
1223
  }
1261
1224
  /*
1262
1225
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1263
1226
    a slave before).
1264
1227
  */
1265
 
  if (flush_master_info(mi, 0))
 
1228
  if (mi->flush())
1266
1229
  {
1267
1230
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1268
1231
    unlock_slave_threads(mi);
1271
1234
  if (need_relay_log_purge)
1272
1235
  {
1273
1236
    relay_log_purge= 1;
1274
 
    thd_proc_info(thd, "Purging old relay logs");
 
1237
    thd->set_proc_info("Purging old relay logs");
1275
1238
    if (purge_relay_logs(&mi->rli, thd,
1276
1239
                         0 /* not only reset, but also reinit */,
1277
1240
                         &errmsg))
1287
1250
    relay_log_purge= 0;
1288
1251
    /* Relay log is already initialized */
1289
1252
    if (init_relay_log_pos(&mi->rli,
1290
 
                           mi->rli.group_relay_log_name,
 
1253
                           mi->rli.group_relay_log_name.c_str(),
1291
1254
                           mi->rli.group_relay_log_pos,
1292
1255
                           0 /*no data lock*/,
1293
1256
                           &msg, 0))
1307
1270
    ''/0: we have lost all copies of the original good coordinates.
1308
1271
    That's why we always save good coords in rli.
1309
1272
  */
1310
 
  mi->rli.group_master_log_pos= mi->master_log_pos;
1311
 
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
1312
 
          sizeof(mi->rli.group_master_log_name)-1);
 
1273
  mi->rli.group_master_log_pos= mi->getLogPosition();
 
1274
  mi->rli.group_master_log_name.assign(mi->getLogName());
1313
1275
 
1314
 
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
1315
 
    mi->rli.group_master_log_pos=0;
 
1276
  if (mi->rli.group_master_log_name.size() == 0) // uninitialized case
 
1277
    mi->rli.group_master_log_pos= 0;
1316
1278
 
1317
1279
  pthread_mutex_lock(&mi->rli.data_lock);
1318
1280
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1331
1293
  pthread_mutex_unlock(&mi->rli.data_lock);
1332
1294
 
1333
1295
  unlock_slave_threads(mi);
1334
 
  thd_proc_info(thd, 0);
 
1296
  thd->set_proc_info(0);
1335
1297
  my_ok(thd);
1336
1298
  return(false);
1337
1299
}
1351
1313
                   const char* log_file_name2, uint64_t log_pos2)
1352
1314
{
1353
1315
  int res;
1354
 
  uint log_file_name1_len=  strlen(log_file_name1);
1355
 
  uint log_file_name2_len=  strlen(log_file_name2);
 
1316
  uint32_t log_file_name1_len=  strlen(log_file_name1);
 
1317
  uint32_t log_file_name2_len=  strlen(log_file_name2);
1356
1318
 
1357
1319
  //  We assume that both log names match up to '.'
1358
1320
  if (log_file_name1_len == log_file_name2_len)
1365
1327
}
1366
1328
 
1367
1329
 
1368
 
bool mysql_show_binlog_events(THD* thd)
1369
 
{
1370
 
  Protocol *protocol= thd->protocol;
1371
 
  List<Item> field_list;
1372
 
  const char *errmsg= 0;
1373
 
  bool ret= true;
1374
 
  IO_CACHE log;
1375
 
  File file= -1;
1376
 
 
1377
 
  Log_event::init_show_field_list(&field_list);
1378
 
  if (protocol->send_fields(&field_list,
1379
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1380
 
    return(true);
1381
 
 
1382
 
  Format_description_log_event *description_event= new
1383
 
    Format_description_log_event(3); /* MySQL 4.0 by default */
1384
 
 
1385
 
  /*
1386
 
    Wait for handlers to insert any pending information
1387
 
    into the binlog.  For e.g. ndb which updates the binlog asynchronously
1388
 
    this is needed so that the uses sees all its own commands in the binlog
1389
 
  */
1390
 
  ha_binlog_wait(thd);
1391
 
 
1392
 
  if (mysql_bin_log.is_open())
1393
 
  {
1394
 
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1395
 
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
1396
 
    ha_rows event_count, limit_start, limit_end;
1397
 
    my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1398
 
    char search_file_name[FN_REFLEN], *name;
1399
 
    const char *log_file_name = lex_mi->log_file_name;
1400
 
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1401
 
    LOG_INFO linfo;
1402
 
    Log_event* ev;
1403
 
 
1404
 
    unit->set_limit(thd->lex->current_select);
1405
 
    limit_start= unit->offset_limit_cnt;
1406
 
    limit_end= unit->select_limit_cnt;
1407
 
 
1408
 
    name= search_file_name;
1409
 
    if (log_file_name)
1410
 
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
1411
 
    else
1412
 
      name=0;                                   // Find first log
1413
 
 
1414
 
    linfo.index_file_offset = 0;
1415
 
 
1416
 
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1417
 
    {
1418
 
      errmsg = "Could not find target log";
1419
 
      goto err;
1420
 
    }
1421
 
 
1422
 
    pthread_mutex_lock(&LOCK_thread_count);
1423
 
    thd->current_linfo = &linfo;
1424
 
    pthread_mutex_unlock(&LOCK_thread_count);
1425
 
 
1426
 
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
1427
 
      goto err;
1428
 
 
1429
 
    /*
1430
 
      to account binlog event header size
1431
 
    */
1432
 
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
1433
 
 
1434
 
    pthread_mutex_lock(log_lock);
1435
 
 
1436
 
    /*
1437
 
      open_binlog() sought to position 4.
1438
 
      Read the first event in case it's a Format_description_log_event, to
1439
 
      know the format. If there's no such event, we are 3.23 or 4.x. This
1440
 
      code, like before, can't read 3.23 binlogs.
1441
 
      This code will fail on a mixed relay log (one which has Format_desc then
1442
 
      Rotate then Format_desc).
1443
 
    */
1444
 
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
1445
 
    if (ev)
1446
 
    {
1447
 
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
1448
 
      {
1449
 
        delete description_event;
1450
 
        description_event= (Format_description_log_event*) ev;
1451
 
      }
1452
 
      else
1453
 
        delete ev;
1454
 
    }
1455
 
 
1456
 
    my_b_seek(&log, pos);
1457
 
 
1458
 
    if (!description_event->is_valid())
1459
 
    {
1460
 
      errmsg="Invalid Format_description event; could be out of memory";
1461
 
      goto err;
1462
 
    }
1463
 
 
1464
 
    for (event_count = 0;
1465
 
         (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
1466
 
                                         description_event)); )
1467
 
    {
1468
 
      if (event_count >= limit_start &&
1469
 
          ev->net_send(protocol, linfo.log_file_name, pos))
1470
 
      {
1471
 
        errmsg = "Net error";
1472
 
        delete ev;
1473
 
        pthread_mutex_unlock(log_lock);
1474
 
        goto err;
1475
 
      }
1476
 
 
1477
 
      pos = my_b_tell(&log);
1478
 
      delete ev;
1479
 
 
1480
 
      if (++event_count >= limit_end)
1481
 
        break;
1482
 
    }
1483
 
 
1484
 
    if (event_count < limit_end && log.error)
1485
 
    {
1486
 
      errmsg = "Wrong offset or I/O error";
1487
 
      pthread_mutex_unlock(log_lock);
1488
 
      goto err;
1489
 
    }
1490
 
 
1491
 
    pthread_mutex_unlock(log_lock);
1492
 
  }
1493
 
 
1494
 
  ret= false;
1495
 
 
1496
 
err:
1497
 
  delete description_event;
1498
 
  if (file >= 0)
1499
 
  {
1500
 
    end_io_cache(&log);
1501
 
    (void) my_close(file, MYF(MY_WME));
1502
 
  }
1503
 
 
1504
 
  if (errmsg)
1505
 
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
1506
 
             "SHOW BINLOG EVENTS", errmsg);
1507
 
  else
1508
 
    my_eof(thd);
1509
 
 
1510
 
  pthread_mutex_lock(&LOCK_thread_count);
1511
 
  thd->current_linfo = 0;
1512
 
  pthread_mutex_unlock(&LOCK_thread_count);
1513
 
  return(ret);
1514
 
}
1515
 
 
1516
 
 
1517
1330
bool show_binlog_info(THD* thd)
1518
1331
{
1519
1332
  Protocol *protocol= thd->protocol;
1520
1333
  List<Item> field_list;
1521
1334
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1522
1335
  field_list.push_back(new Item_return_int("Position",20,
1523
 
                                           MYSQL_TYPE_LONGLONG));
 
1336
                                           DRIZZLE_TYPE_LONGLONG));
1524
1337
  field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
1525
1338
  field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
1526
1339
 
1565
1378
  File file;
1566
1379
  char fname[FN_REFLEN];
1567
1380
  List<Item> field_list;
1568
 
  uint length;
 
1381
  uint32_t length;
1569
1382
  int cur_dir_len;
1570
1383
  Protocol *protocol= thd->protocol;
1571
1384
 
1577
1390
 
1578
1391
  field_list.push_back(new Item_empty_string("Log_name", 255));
1579
1392
  field_list.push_back(new Item_return_int("File_size", 20,
1580
 
                                           MYSQL_TYPE_LONGLONG));
 
1393
                                           DRIZZLE_TYPE_LONGLONG));
1581
1394
  if (protocol->send_fields(&field_list,
1582
1395
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1583
1396
    return(true);
1610
1423
    else
1611
1424
    {
1612
1425
      /* this is an old log, open it and find the size */
1613
 
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
 
1426
      if ((file= my_open(fname, O_RDONLY,
1614
1427
                         MYF(0))) >= 0)
1615
1428
      {
1616
1429
        file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));
1642
1455
int log_loaded_block(IO_CACHE* file)
1643
1456
{
1644
1457
  LOAD_FILE_INFO *lf_info;
1645
 
  uint block_len;
 
1458
  uint32_t block_len;
1646
1459
  /* buffer contains position where we started last read */
1647
 
  uchar* buffer= (uchar*) my_b_get_buffer_start(file);
1648
 
  uint max_event_size= current_thd->variables.max_allowed_packet;
 
1460
  unsigned char* buffer= (unsigned char*) my_b_get_buffer_start(file);
 
1461
  uint32_t max_event_size= current_thd->variables.max_allowed_packet;
1649
1462
  lf_info= (LOAD_FILE_INFO*) file->arg;
1650
1463
  if (lf_info->thd->current_stmt_binlog_row_based)
1651
1464
    return(0);
1654
1467
    return(0);
1655
1468
  
1656
1469
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1657
 
       buffer += min(block_len, max_event_size),
1658
 
       block_len -= min(block_len, max_event_size))
 
1470
       buffer += cmin(block_len, max_event_size),
 
1471
       block_len -= cmin(block_len, max_event_size))
1659
1472
  {
1660
1473
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1661
1474
    if (lf_info->wrote_create_file)
1662
1475
    {
1663
1476
      Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
1664
 
                               min(block_len, max_event_size),
 
1477
                               cmin(block_len, max_event_size),
1665
1478
                               lf_info->log_delayed);
1666
1479
      mysql_bin_log.write(&a);
1667
1480
    }
1669
1482
    {
1670
1483
      Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1671
1484
                                   buffer,
1672
 
                                   min(block_len, max_event_size),
 
1485
                                   cmin(block_len, max_event_size),
1673
1486
                                   lf_info->log_delayed);
1674
1487
      mysql_bin_log.write(&b);
1675
1488
      lf_info->wrote_create_file= 1;
1707
1520
};
1708
1521
 
1709
1522
static void fix_slave_net_timeout(THD *thd,
1710
 
                                  enum_var_type type __attribute__((__unused__)))
 
1523
                                  enum_var_type type __attribute__((unused)))
1711
1524
{
1712
 
#ifdef HAVE_REPLICATION
1713
1525
  pthread_mutex_lock(&LOCK_active_mi);
1714
1526
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1715
 
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
 
1527
    push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1716
1528
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1717
1529
                        "The currect value for master_heartbeat_period"
1718
1530
                        " exceeds the new value of `slave_net_timeout' sec."
1719
1531
                        " A sensible value for the period should be"
1720
1532
                        " less than the timeout.");
1721
1533
  pthread_mutex_unlock(&LOCK_active_mi);
1722
 
#endif
1723
1534
  return;
1724
1535
}
1725
1536
 
1738
1549
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
1739
1550
 
1740
1551
 
1741
 
static int show_slave_skip_errors(THD *thd __attribute__((__unused__)),
 
1552
static int show_slave_skip_errors(THD *thd __attribute__((unused)),
1742
1553
                                  SHOW_VAR *var, char *buff)
1743
1554
{
1744
1555
  var->type=SHOW_CHAR;
1770
1581
    if (var->value != buff)
1771
1582
      buff--;                           // Remove last ','
1772
1583
    if (i < MAX_SLAVE_ERROR)
1773
 
      buff= strmov(buff, "...");  // Couldn't show all errors
 
1584
      buff= my_stpcpy(buff, "...");  // Couldn't show all errors
1774
1585
    *buff=0;
1775
1586
  }
1776
1587
  return 0;
1790
1601
  {"slave_skip_errors",       (char*) &show_slave_skip_errors_cont,      SHOW_FUNC},
1791
1602
};
1792
1603
 
1793
 
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((__unused__)),
 
1604
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((unused)),
1794
1605
                                       set_var *var)
1795
1606
{
1796
1607
  int result= 0;
1808
1619
}
1809
1620
 
1810
1621
 
1811
 
bool sys_var_slave_skip_counter::update(THD *thd __attribute__((__unused__)),
 
1622
bool sys_var_slave_skip_counter::update(THD *thd __attribute__((unused)),
1812
1623
                                        set_var *var)
1813
1624
{
1814
1625
  pthread_mutex_lock(&LOCK_active_mi);
1830
1641
}
1831
1642
 
1832
1643
 
1833
 
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((__unused__)),
 
1644
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((unused)),
1834
1645
                                        set_var *var)
1835
1646
{
1836
 
  sync_binlog_period= (ulong) var->save_result.uint64_t_value;
 
1647
  sync_binlog_period= (uint32_t) var->save_result.uint64_t_value;
1837
1648
  return 0;
1838
1649
}
1839
1650
 
1849
1660
  }
1850
1661
  return 0;
1851
1662
}
1852
 
 
1853
 
#endif /* HAVE_REPLICATION */
1854
 
 
1855