~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to server/sql_repl.cc

  • Committer: Jay Pipes
  • Date: 2008-07-21 17:52:33 UTC
  • mto: (201.2.1 drizzle)
  • mto: This revision was merged to the branch mainline in revision 204.
  • Revision ID: jay@mysql.com-20080721175233-mtyz298j8xl3v63y
cleanup of FAQ file

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
   along with this program; if not, write to the Free Software
14
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
 
#include <drizzled/server_includes.h>
 
16
#include "mysql_priv.h"
 
17
#ifdef HAVE_REPLICATION
17
18
 
18
19
#include "rpl_mi.h"
19
20
#include "sql_repl.h"
20
21
#include "log_event.h"
21
22
#include "rpl_filter.h"
22
 
#include <drizzled/drizzled_error_messages.h>
 
23
#include <my_dir.h>
23
24
 
24
25
int max_binlog_dump_events = 0; // unlimited
25
26
 
52
53
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
53
54
 
54
55
  char* p = log_file_name+dirname_length(log_file_name);
55
 
  uint32_t ident_len = (uint32_t) strlen(p);
56
 
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
 
56
  uint ident_len = (uint) strlen(p);
 
57
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
57
58
  int4store(header + SERVER_ID_OFFSET, server_id);
58
59
  int4store(header + EVENT_LEN_OFFSET, event_len);
59
60
  int2store(header + FLAGS_OFFSET, 0);
65
66
  int8store(buf+R_POS_OFFSET,position);
66
67
  packet->append(buf, ROTATE_HEADER_LEN);
67
68
  packet->append(p,ident_len);
68
 
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
69
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
69
70
  {
70
71
    *errmsg = "failed on my_net_write()";
71
72
    return(-1);
82
83
  const char *errmsg = 0;
83
84
  int old_timeout;
84
85
  unsigned long packet_len;
85
 
  unsigned char buf[IO_SIZE];                           // It's safe to alloc this
 
86
  uchar buf[IO_SIZE];                           // It's safe to alloc this
86
87
 
87
88
  /*
88
89
    The client might be slow loading the data, give him wait_timeout to do
97
98
  */
98
99
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
99
100
  {
100
 
    errmsg = _("Failed in send_file() while reading file name");
 
101
    errmsg = "while reading file name";
101
102
    goto err;
102
103
  }
103
104
 
110
111
 
111
112
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
112
113
  {
113
 
    errmsg = _("Failed in send_file() on open of file");
 
114
    errmsg = "on open of file";
114
115
    goto err;
115
116
  }
116
117
 
118
119
  {
119
120
    if (my_net_write(net, buf, bytes))
120
121
    {
121
 
      errmsg = _("Failed in send_file() while writing data to client");
 
122
      errmsg = "while writing data to client";
122
123
      goto err;
123
124
    }
124
125
  }
125
126
 
126
127
 end:
127
 
  if (my_net_write(net, (unsigned char*) "", 0) || net_flush(net) ||
 
128
  if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
128
129
      (my_net_read(net) == packet_error))
129
130
  {
130
 
    errmsg = _("Failed in send_file() while negotiating file transfer close");
 
131
    errmsg = "while negotiating file transfer close";
131
132
    goto err;
132
133
  }
133
134
  error = 0;
138
139
    (void) my_close(fd, MYF(0));
139
140
  if (errmsg)
140
141
  {
141
 
    sql_print_error(errmsg);
 
142
    sql_print_error("Failed in send_file() %s", errmsg);
142
143
  }
143
144
  return(error);
144
145
}
209
210
    if ((linfo = tmp->current_linfo))
210
211
    {
211
212
      pthread_mutex_lock(&linfo->lock);
212
 
      result = !memcmp(log_name, linfo->log_file_name, log_name_len);
 
213
      result = !bcmp((uchar*) log_name, (uchar*) linfo->log_file_name,
 
214
                     log_name_len);
213
215
      pthread_mutex_unlock(&linfo->lock);
214
216
      if (result)
215
217
        break;
222
224
 
223
225
bool purge_error_message(THD* thd, int res)
224
226
{
225
 
  uint32_t errmsg= 0;
 
227
  uint errmsg= 0;
226
228
 
227
229
  switch (res)  {
228
230
  case 0: break;
315
317
*/ 
316
318
static uint64_t get_heartbeat_period(THD * thd)
317
319
{
318
 
  bool null_value;
 
320
  my_bool null_value;
319
321
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
320
322
  user_var_entry *entry= 
321
 
    (user_var_entry*) hash_search(&thd->user_vars, (unsigned char*) name.str,
 
323
    (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
322
324
                                  name.length);
323
325
  return entry? entry->val_int(&null_value) : 0;
324
326
}
351
353
 
352
354
  char* p= coord->file_name + dirname_length(coord->file_name);
353
355
 
354
 
  uint32_t ident_len = strlen(p);
355
 
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN;
 
356
  uint ident_len = strlen(p);
 
357
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
356
358
  int4store(header + SERVER_ID_OFFSET, server_id);
357
359
  int4store(header + EVENT_LEN_OFFSET, event_len);
358
360
  int2store(header + FLAGS_OFFSET, 0);
362
364
  packet->append(header, sizeof(header));
363
365
  packet->append(p, ident_len);             // log_file_name
364
366
 
365
 
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) ||
 
367
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
366
368
      net_flush(net))
367
369
  {
368
370
    return(-1);
376
378
*/
377
379
 
378
380
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
379
 
                       uint16_t flags)
 
381
                       ushort flags)
380
382
{
381
383
  LOG_INFO linfo;
382
384
  char *log_file_name = linfo.log_file_name;
390
392
  pthread_mutex_t *log_lock;
391
393
  bool binlog_can_be_corrupted= false;
392
394
 
393
 
  memset(&log, 0, sizeof(log));
 
395
  bzero((char*) &log,sizeof(log));
394
396
  /* 
395
397
     heartbeat_period from @master_heartbeat_period user variable
396
398
  */
399
401
  struct event_coordinates coord_buf;
400
402
  struct timespec *heartbeat_ts= NULL;
401
403
  struct event_coordinates *coord= NULL;
402
 
  if (heartbeat_period != 0L)
 
404
  if (heartbeat_period != 0LL)
403
405
  {
404
406
    heartbeat_ts= &heartbeat_buf;
405
407
    set_timespec_nsec(*heartbeat_ts, 0);
539
541
           to avoid destroying temp tables.
540
542
          */
541
543
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
542
 
                   ST_CREATED_OFFSET+1, (uint32_t) 0);
 
544
                   ST_CREATED_OFFSET+1, (ulong) 0);
543
545
         /* send it */
544
 
         if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
546
         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
545
547
         {
546
548
           errmsg = "Failed on my_net_write()";
547
549
           my_errno= ER_UNKNOWN_ERROR;
595
597
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
596
598
        binlog_can_be_corrupted= false;
597
599
 
598
 
      if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
600
      if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
599
601
      {
600
602
        errmsg = "Failed on my_net_write()";
601
603
        my_errno= ER_UNKNOWN_ERROR;
686
688
          {
687
689
            if (coord)
688
690
            {
689
 
              assert(heartbeat_ts && heartbeat_period != 0L);
 
691
              assert(heartbeat_ts && heartbeat_period != 0LL);
690
692
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
691
693
            }
692
694
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
693
 
            assert(ret == 0 || (heartbeat_period != 0L && coord != NULL));
 
695
            assert(ret == 0 || (heartbeat_period != 0LL && coord != NULL));
694
696
            if (ret == ETIMEDOUT || ret == ETIME)
695
697
            {
696
698
              if (send_heartbeat_event(net, packet, coord))
718
720
 
719
721
        if (read_packet)
720
722
        {
721
 
          thd->set_proc_info("Sending binlog event to slave");
722
 
          if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) )
 
723
          thd_proc_info(thd, "Sending binlog event to slave");
 
724
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
723
725
          {
724
726
            errmsg = "Failed on my_net_write()";
725
727
            my_errno= ER_UNKNOWN_ERROR;
756
758
      bool loop_breaker = 0;
757
759
      /* need this to break out of the for loop from switch */
758
760
 
759
 
      thd->set_proc_info("Finished reading one binlog; switching to next binlog");
 
761
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
760
762
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
761
763
      case LOG_INFO_EOF:
762
764
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
804
806
  (void)my_close(file, MYF(MY_WME));
805
807
 
806
808
  my_eof(thd);
807
 
  thd->set_proc_info("Waiting to finalize termination");
 
809
  thd_proc_info(thd, "Waiting to finalize termination");
808
810
  pthread_mutex_lock(&LOCK_thread_count);
809
811
  thd->current_linfo = 0;
810
812
  pthread_mutex_unlock(&LOCK_thread_count);
811
813
  return;
812
814
 
813
815
err:
814
 
  thd->set_proc_info("Waiting to finalize termination");
 
816
  thd_proc_info(thd, "Waiting to finalize termination");
815
817
  end_io_cache(&log);
816
818
  /*
817
819
    Exclude  iteration through thread list
848
850
    thread_mask&= thd->lex->slave_thd_opt;
849
851
  if (thread_mask) //some threads are stopped, start them
850
852
  {
851
 
    if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
 
853
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
 
854
                         thread_mask))
852
855
      slave_errno=ER_MASTER_INFO;
853
 
    else if (server_id_supplied && *mi->getHostname())
 
856
    else if (server_id_supplied && *mi->host)
854
857
    {
855
858
      /*
856
859
        If we will start SQL thread we will care about UNTIL options If
908
911
 
909
912
          /* Issuing warning then started without --skip-slave-start */
910
913
          if (!opt_skip_slave_start)
911
 
            push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
914
            push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
912
915
                         ER_MISSING_SKIP_SLAVE,
913
916
                         ER(ER_MISSING_SKIP_SLAVE));
914
917
        }
916
919
        pthread_mutex_unlock(&mi->rli.data_lock);
917
920
      }
918
921
      else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
919
 
        push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
 
922
        push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
920
923
                     ER(ER_UNTIL_COND_IGNORED));
921
924
 
922
925
      if (!slave_errno)
932
935
  else
933
936
  {
934
937
    /* no error if all threads are already started, only a warning */
935
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
 
938
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
936
939
                 ER(ER_SLAVE_WAS_RUNNING));
937
940
  }
938
941
 
957
960
  if (!thd)
958
961
    thd = current_thd;
959
962
 
960
 
  thd->set_proc_info("Killing slave");
 
963
  thd_proc_info(thd, "Killing slave");
961
964
  int thread_mask;
962
965
  lock_slave_threads(mi);
963
966
  // Get a mask of _running_ threads
980
983
  {
981
984
    //no error if both threads are already stopped, only a warning
982
985
    slave_errno= 0;
983
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
 
986
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
984
987
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
985
988
  }
986
989
  unlock_slave_threads(mi);
987
 
  thd->set_proc_info(0);
 
990
  thd_proc_info(thd, 0);
988
991
 
989
992
  if (slave_errno)
990
993
  {
1018
1021
  struct stat stat_area;
1019
1022
  char fname[FN_REFLEN];
1020
1023
  int thread_mask= 0, error= 0;
1021
 
  uint32_t sql_errno=0;
 
1024
  uint sql_errno=0;
1022
1025
  const char* errmsg=0;
1023
1026
 
1024
1027
  lock_slave_threads(mi);
1030
1033
    goto err;
1031
1034
  }
1032
1035
 
 
1036
  ha_reset_slave(thd);
 
1037
 
1033
1038
  // delete relay logs, clear relay log coordinates
1034
1039
  if ((error= purge_relay_logs(&mi->rli, thd,
1035
1040
                               1 /* just reset */,
1037
1042
    goto err;
1038
1043
 
1039
1044
  /* Clear master's log coordinates */
1040
 
  mi->reset();
 
1045
  init_master_log_pos(mi);
1041
1046
  /*
1042
1047
     Reset errors (the idea is that we forget about the
1043
1048
     old master).
1046
1051
  mi->rli.clear_until_condition();
1047
1052
 
1048
1053
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1049
 
  mi->end_master_info();
 
1054
  end_master_info(mi);
1050
1055
  // and delete these two files
1051
1056
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
1052
1057
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1089
1094
*/
1090
1095
 
1091
1096
 
1092
 
void kill_zombie_dump_threads(uint32_t slave_server_id)
 
1097
void kill_zombie_dump_threads(uint32 slave_server_id)
1093
1098
{
1094
1099
  pthread_mutex_lock(&LOCK_thread_count);
1095
1100
  I_List_iterator<THD> it(threads);
1133
1138
    return(true);
1134
1139
  }
1135
1140
 
1136
 
  thd->set_proc_info("Changing master");
 
1141
  thd_proc_info(thd, "Changing master");
1137
1142
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1138
1143
  // TODO: see if needs re-write
1139
 
  if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
 
1144
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
 
1145
                       thread_mask))
1140
1146
  {
1141
1147
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1142
1148
    unlock_slave_threads(mi);
1155
1161
  */
1156
1162
 
1157
1163
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1158
 
    mi->reset();
 
1164
  {
 
1165
    mi->master_log_name[0] = 0;
 
1166
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
 
1167
  }
1159
1168
 
1160
1169
  if (lex_mi->log_file_name)
1161
 
    mi->setLogName(lex_mi->log_file_name);
 
1170
    strmake(mi->master_log_name, lex_mi->log_file_name,
 
1171
            sizeof(mi->master_log_name)-1);
1162
1172
  if (lex_mi->pos)
1163
1173
  {
1164
 
    mi->setLogPosition(lex_mi->pos);
 
1174
    mi->master_log_pos= lex_mi->pos;
1165
1175
  }
1166
1176
 
1167
1177
  if (lex_mi->host)
1168
 
    mi->setHost(lex_mi->host, lex_mi->port);
 
1178
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1169
1179
  if (lex_mi->user)
1170
 
    mi->setUsername(lex_mi->user);
 
1180
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
1171
1181
  if (lex_mi->password)
1172
 
    mi->setPassword(lex_mi->password);
 
1182
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
 
1183
  if (lex_mi->port)
 
1184
    mi->port = lex_mi->port;
1173
1185
  if (lex_mi->connect_retry)
1174
1186
    mi->connect_retry = lex_mi->connect_retry;
1175
1187
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1176
1188
    mi->heartbeat_period = lex_mi->heartbeat_period;
1177
1189
  else
1178
 
    mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
 
1190
    mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
1179
1191
                                      (slave_net_timeout/2.0));
1180
 
  mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
 
1192
  mi->received_heartbeats= 0LL; // counter lives until master is CHANGEd
 
1193
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1194
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
1195
 
 
1196
  if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
 
1197
    mi->ssl_verify_server_cert=
 
1198
      (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
 
1199
 
 
1200
  if (lex_mi->ssl_ca)
 
1201
    strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
 
1202
  if (lex_mi->ssl_capath)
 
1203
    strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
 
1204
  if (lex_mi->ssl_cert)
 
1205
    strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
 
1206
  if (lex_mi->ssl_cipher)
 
1207
    strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
 
1208
  if (lex_mi->ssl_key)
 
1209
    strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
 
1210
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
 
1211
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
 
1212
      lex_mi->ssl_verify_server_cert )
 
1213
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
 
1214
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1181
1215
 
1182
1216
  if (lex_mi->relay_log_name)
1183
1217
  {
1184
1218
    need_relay_log_purge= 0;
1185
 
    mi->rli.event_relay_log_name.assign(lex_mi->relay_log_name);
 
1219
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
 
1220
            sizeof(mi->rli.group_relay_log_name)-1);
 
1221
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
 
1222
            sizeof(mi->rli.event_relay_log_name)-1);
1186
1223
  }
1187
1224
 
1188
1225
  if (lex_mi->relay_log_pos)
1211
1248
   {
1212
1249
     /*
1213
1250
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1214
 
       not initialized), so we use a cmax().
 
1251
       not initialized), so we use a max().
1215
1252
       What happens to mi->rli.master_log_pos during the initialization stages
1216
1253
       of replication is not 100% clear, so we guard against problems using
1217
 
       cmax().
 
1254
       max().
1218
1255
      */
1219
 
     mi->setLogPosition(((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1220
 
                         ? BIN_LOG_HEADER_SIZE
1221
 
                         : mi->rli.group_master_log_pos));
1222
 
     mi->setLogName(mi->rli.group_master_log_name.c_str());
 
1256
     mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
 
1257
                              mi->rli.group_master_log_pos);
 
1258
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
 
1259
             sizeof(mi->master_log_name)-1);
1223
1260
  }
1224
1261
  /*
1225
1262
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1226
1263
    a slave before).
1227
1264
  */
1228
 
  if (mi->flush())
 
1265
  if (flush_master_info(mi, 0))
1229
1266
  {
1230
1267
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1231
1268
    unlock_slave_threads(mi);
1234
1271
  if (need_relay_log_purge)
1235
1272
  {
1236
1273
    relay_log_purge= 1;
1237
 
    thd->set_proc_info("Purging old relay logs");
 
1274
    thd_proc_info(thd, "Purging old relay logs");
1238
1275
    if (purge_relay_logs(&mi->rli, thd,
1239
1276
                         0 /* not only reset, but also reinit */,
1240
1277
                         &errmsg))
1250
1287
    relay_log_purge= 0;
1251
1288
    /* Relay log is already initialized */
1252
1289
    if (init_relay_log_pos(&mi->rli,
1253
 
                           mi->rli.group_relay_log_name.c_str(),
 
1290
                           mi->rli.group_relay_log_name,
1254
1291
                           mi->rli.group_relay_log_pos,
1255
1292
                           0 /*no data lock*/,
1256
1293
                           &msg, 0))
1270
1307
    ''/0: we have lost all copies of the original good coordinates.
1271
1308
    That's why we always save good coords in rli.
1272
1309
  */
1273
 
  mi->rli.group_master_log_pos= mi->getLogPosition();
1274
 
  mi->rli.group_master_log_name.assign(mi->getLogName());
 
1310
  mi->rli.group_master_log_pos= mi->master_log_pos;
 
1311
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
 
1312
          sizeof(mi->rli.group_master_log_name)-1);
1275
1313
 
1276
 
  if (mi->rli.group_master_log_name.size() == 0) // uninitialized case
1277
 
    mi->rli.group_master_log_pos= 0;
 
1314
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
 
1315
    mi->rli.group_master_log_pos=0;
1278
1316
 
1279
1317
  pthread_mutex_lock(&mi->rli.data_lock);
1280
1318
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1293
1331
  pthread_mutex_unlock(&mi->rli.data_lock);
1294
1332
 
1295
1333
  unlock_slave_threads(mi);
1296
 
  thd->set_proc_info(0);
 
1334
  thd_proc_info(thd, 0);
1297
1335
  my_ok(thd);
1298
1336
  return(false);
1299
1337
}
1313
1351
                   const char* log_file_name2, uint64_t log_pos2)
1314
1352
{
1315
1353
  int res;
1316
 
  uint32_t log_file_name1_len=  strlen(log_file_name1);
1317
 
  uint32_t log_file_name2_len=  strlen(log_file_name2);
 
1354
  uint log_file_name1_len=  strlen(log_file_name1);
 
1355
  uint log_file_name2_len=  strlen(log_file_name2);
1318
1356
 
1319
1357
  //  We assume that both log names match up to '.'
1320
1358
  if (log_file_name1_len == log_file_name2_len)
1327
1365
}
1328
1366
 
1329
1367
 
 
1368
bool mysql_show_binlog_events(THD* thd)
 
1369
{
 
1370
  Protocol *protocol= thd->protocol;
 
1371
  List<Item> field_list;
 
1372
  const char *errmsg= 0;
 
1373
  bool ret= true;
 
1374
  IO_CACHE log;
 
1375
  File file= -1;
 
1376
 
 
1377
  Log_event::init_show_field_list(&field_list);
 
1378
  if (protocol->send_fields(&field_list,
 
1379
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
 
1380
    return(true);
 
1381
 
 
1382
  Format_description_log_event *description_event= new
 
1383
    Format_description_log_event(3); /* MySQL 4.0 by default */
 
1384
 
 
1385
  /*
 
1386
    Wait for handlers to insert any pending information
 
1387
    into the binlog.  For e.g. ndb which updates the binlog asynchronously
 
1388
    this is needed so that the uses sees all its own commands in the binlog
 
1389
  */
 
1390
  ha_binlog_wait(thd);
 
1391
 
 
1392
  if (mysql_bin_log.is_open())
 
1393
  {
 
1394
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
 
1395
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
 
1396
    ha_rows event_count, limit_start, limit_end;
 
1397
    my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
 
1398
    char search_file_name[FN_REFLEN], *name;
 
1399
    const char *log_file_name = lex_mi->log_file_name;
 
1400
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
 
1401
    LOG_INFO linfo;
 
1402
    Log_event* ev;
 
1403
 
 
1404
    unit->set_limit(thd->lex->current_select);
 
1405
    limit_start= unit->offset_limit_cnt;
 
1406
    limit_end= unit->select_limit_cnt;
 
1407
 
 
1408
    name= search_file_name;
 
1409
    if (log_file_name)
 
1410
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
 
1411
    else
 
1412
      name=0;                                   // Find first log
 
1413
 
 
1414
    linfo.index_file_offset = 0;
 
1415
 
 
1416
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
 
1417
    {
 
1418
      errmsg = "Could not find target log";
 
1419
      goto err;
 
1420
    }
 
1421
 
 
1422
    pthread_mutex_lock(&LOCK_thread_count);
 
1423
    thd->current_linfo = &linfo;
 
1424
    pthread_mutex_unlock(&LOCK_thread_count);
 
1425
 
 
1426
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
 
1427
      goto err;
 
1428
 
 
1429
    /*
 
1430
      to account binlog event header size
 
1431
    */
 
1432
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
 
1433
 
 
1434
    pthread_mutex_lock(log_lock);
 
1435
 
 
1436
    /*
 
1437
      open_binlog() sought to position 4.
 
1438
      Read the first event in case it's a Format_description_log_event, to
 
1439
      know the format. If there's no such event, we are 3.23 or 4.x. This
 
1440
      code, like before, can't read 3.23 binlogs.
 
1441
      This code will fail on a mixed relay log (one which has Format_desc then
 
1442
      Rotate then Format_desc).
 
1443
    */
 
1444
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
 
1445
    if (ev)
 
1446
    {
 
1447
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
 
1448
      {
 
1449
        delete description_event;
 
1450
        description_event= (Format_description_log_event*) ev;
 
1451
      }
 
1452
      else
 
1453
        delete ev;
 
1454
    }
 
1455
 
 
1456
    my_b_seek(&log, pos);
 
1457
 
 
1458
    if (!description_event->is_valid())
 
1459
    {
 
1460
      errmsg="Invalid Format_description event; could be out of memory";
 
1461
      goto err;
 
1462
    }
 
1463
 
 
1464
    for (event_count = 0;
 
1465
         (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
 
1466
                                         description_event)); )
 
1467
    {
 
1468
      if (event_count >= limit_start &&
 
1469
          ev->net_send(protocol, linfo.log_file_name, pos))
 
1470
      {
 
1471
        errmsg = "Net error";
 
1472
        delete ev;
 
1473
        pthread_mutex_unlock(log_lock);
 
1474
        goto err;
 
1475
      }
 
1476
 
 
1477
      pos = my_b_tell(&log);
 
1478
      delete ev;
 
1479
 
 
1480
      if (++event_count >= limit_end)
 
1481
        break;
 
1482
    }
 
1483
 
 
1484
    if (event_count < limit_end && log.error)
 
1485
    {
 
1486
      errmsg = "Wrong offset or I/O error";
 
1487
      pthread_mutex_unlock(log_lock);
 
1488
      goto err;
 
1489
    }
 
1490
 
 
1491
    pthread_mutex_unlock(log_lock);
 
1492
  }
 
1493
 
 
1494
  ret= false;
 
1495
 
 
1496
err:
 
1497
  delete description_event;
 
1498
  if (file >= 0)
 
1499
  {
 
1500
    end_io_cache(&log);
 
1501
    (void) my_close(file, MYF(MY_WME));
 
1502
  }
 
1503
 
 
1504
  if (errmsg)
 
1505
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
 
1506
             "SHOW BINLOG EVENTS", errmsg);
 
1507
  else
 
1508
    my_eof(thd);
 
1509
 
 
1510
  pthread_mutex_lock(&LOCK_thread_count);
 
1511
  thd->current_linfo = 0;
 
1512
  pthread_mutex_unlock(&LOCK_thread_count);
 
1513
  return(ret);
 
1514
}
 
1515
 
 
1516
 
1330
1517
bool show_binlog_info(THD* thd)
1331
1518
{
1332
1519
  Protocol *protocol= thd->protocol;
1333
1520
  List<Item> field_list;
1334
1521
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1335
1522
  field_list.push_back(new Item_return_int("Position",20,
1336
 
                                           DRIZZLE_TYPE_LONGLONG));
 
1523
                                           MYSQL_TYPE_LONGLONG));
1337
1524
  field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
1338
1525
  field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
1339
1526
 
1378
1565
  File file;
1379
1566
  char fname[FN_REFLEN];
1380
1567
  List<Item> field_list;
1381
 
  uint32_t length;
 
1568
  uint length;
1382
1569
  int cur_dir_len;
1383
1570
  Protocol *protocol= thd->protocol;
1384
1571
 
1390
1577
 
1391
1578
  field_list.push_back(new Item_empty_string("Log_name", 255));
1392
1579
  field_list.push_back(new Item_return_int("File_size", 20,
1393
 
                                           DRIZZLE_TYPE_LONGLONG));
 
1580
                                           MYSQL_TYPE_LONGLONG));
1394
1581
  if (protocol->send_fields(&field_list,
1395
1582
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1396
1583
    return(true);
1423
1610
    else
1424
1611
    {
1425
1612
      /* this is an old log, open it and find the size */
1426
 
      if ((file= my_open(fname, O_RDONLY,
 
1613
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
1427
1614
                         MYF(0))) >= 0)
1428
1615
      {
1429
1616
        file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));
1455
1642
int log_loaded_block(IO_CACHE* file)
1456
1643
{
1457
1644
  LOAD_FILE_INFO *lf_info;
1458
 
  uint32_t block_len;
 
1645
  uint block_len;
1459
1646
  /* buffer contains position where we started last read */
1460
 
  unsigned char* buffer= (unsigned char*) my_b_get_buffer_start(file);
1461
 
  uint32_t max_event_size= current_thd->variables.max_allowed_packet;
 
1647
  uchar* buffer= (uchar*) my_b_get_buffer_start(file);
 
1648
  uint max_event_size= current_thd->variables.max_allowed_packet;
1462
1649
  lf_info= (LOAD_FILE_INFO*) file->arg;
1463
1650
  if (lf_info->thd->current_stmt_binlog_row_based)
1464
1651
    return(0);
1467
1654
    return(0);
1468
1655
  
1469
1656
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1470
 
       buffer += cmin(block_len, max_event_size),
1471
 
       block_len -= cmin(block_len, max_event_size))
 
1657
       buffer += min(block_len, max_event_size),
 
1658
       block_len -= min(block_len, max_event_size))
1472
1659
  {
1473
1660
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1474
1661
    if (lf_info->wrote_create_file)
1475
1662
    {
1476
1663
      Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
1477
 
                               cmin(block_len, max_event_size),
 
1664
                               min(block_len, max_event_size),
1478
1665
                               lf_info->log_delayed);
1479
1666
      mysql_bin_log.write(&a);
1480
1667
    }
1482
1669
    {
1483
1670
      Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1484
1671
                                   buffer,
1485
 
                                   cmin(block_len, max_event_size),
 
1672
                                   min(block_len, max_event_size),
1486
1673
                                   lf_info->log_delayed);
1487
1674
      mysql_bin_log.write(&b);
1488
1675
      lf_info->wrote_create_file= 1;
1520
1707
};
1521
1708
 
1522
1709
static void fix_slave_net_timeout(THD *thd,
1523
 
                                  enum_var_type type __attribute__((unused)))
 
1710
                                  enum_var_type type __attribute__((__unused__)))
1524
1711
{
 
1712
#ifdef HAVE_REPLICATION
1525
1713
  pthread_mutex_lock(&LOCK_active_mi);
1526
1714
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1527
 
    push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1715
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1528
1716
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1529
1717
                        "The currect value for master_heartbeat_period"
1530
1718
                        " exceeds the new value of `slave_net_timeout' sec."
1531
1719
                        " A sensible value for the period should be"
1532
1720
                        " less than the timeout.");
1533
1721
  pthread_mutex_unlock(&LOCK_active_mi);
 
1722
#endif
1534
1723
  return;
1535
1724
}
1536
1725
 
1549
1738
static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
1550
1739
 
1551
1740
 
1552
 
static int show_slave_skip_errors(THD *thd __attribute__((unused)),
 
1741
static int show_slave_skip_errors(THD *thd __attribute__((__unused__)),
1553
1742
                                  SHOW_VAR *var, char *buff)
1554
1743
{
1555
1744
  var->type=SHOW_CHAR;
1581
1770
    if (var->value != buff)
1582
1771
      buff--;                           // Remove last ','
1583
1772
    if (i < MAX_SLAVE_ERROR)
1584
 
      buff= my_stpcpy(buff, "...");  // Couldn't show all errors
 
1773
      buff= strmov(buff, "...");  // Couldn't show all errors
1585
1774
    *buff=0;
1586
1775
  }
1587
1776
  return 0;
1601
1790
  {"slave_skip_errors",       (char*) &show_slave_skip_errors_cont,      SHOW_FUNC},
1602
1791
};
1603
1792
 
1604
 
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((unused)),
 
1793
bool sys_var_slave_skip_counter::check(THD *thd __attribute__((__unused__)),
1605
1794
                                       set_var *var)
1606
1795
{
1607
1796
  int result= 0;
1619
1808
}
1620
1809
 
1621
1810
 
1622
 
bool sys_var_slave_skip_counter::update(THD *thd __attribute__((unused)),
 
1811
bool sys_var_slave_skip_counter::update(THD *thd __attribute__((__unused__)),
1623
1812
                                        set_var *var)
1624
1813
{
1625
1814
  pthread_mutex_lock(&LOCK_active_mi);
1641
1830
}
1642
1831
 
1643
1832
 
1644
 
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((unused)),
 
1833
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((__unused__)),
1645
1834
                                        set_var *var)
1646
1835
{
1647
 
  sync_binlog_period= (uint32_t) var->save_result.uint64_t_value;
 
1836
  sync_binlog_period= (ulong) var->save_result.uint64_t_value;
1648
1837
  return 0;
1649
1838
}
1650
1839
 
1660
1849
  }
1661
1850
  return 0;
1662
1851
}
 
1852
 
 
1853
#endif /* HAVE_REPLICATION */
 
1854
 
 
1855