~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_repl.cc

Moved base64.h to mysys.

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
   along with this program; if not, write to the Free Software
14
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
 
#include <drizzled/server_includes.h>
 
16
#include "mysql_priv.h"
 
17
#ifdef HAVE_REPLICATION
17
18
 
18
19
#include "rpl_mi.h"
19
20
#include "sql_repl.h"
20
21
#include "log_event.h"
21
22
#include "rpl_filter.h"
22
 
#include <drizzled/drizzled_error_messages.h>
 
23
#include <my_dir.h>
23
24
 
24
25
int max_binlog_dump_events = 0; // unlimited
25
26
 
52
53
  header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
53
54
 
54
55
  char* p = log_file_name+dirname_length(log_file_name);
55
 
  uint32_t ident_len = (uint32_t) strlen(p);
56
 
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
 
56
  uint ident_len = (uint) strlen(p);
 
57
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
57
58
  int4store(header + SERVER_ID_OFFSET, server_id);
58
59
  int4store(header + EVENT_LEN_OFFSET, event_len);
59
60
  int2store(header + FLAGS_OFFSET, 0);
65
66
  int8store(buf+R_POS_OFFSET,position);
66
67
  packet->append(buf, ROTATE_HEADER_LEN);
67
68
  packet->append(p,ident_len);
68
 
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
69
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
69
70
  {
70
71
    *errmsg = "failed on my_net_write()";
71
72
    return(-1);
82
83
  const char *errmsg = 0;
83
84
  int old_timeout;
84
85
  unsigned long packet_len;
85
 
  unsigned char buf[IO_SIZE];                           // It's safe to alloc this
 
86
  uchar buf[IO_SIZE];                           // It's safe to alloc this
86
87
 
87
88
  /*
88
89
    The client might be slow loading the data, give him wait_timeout to do
97
98
  */
98
99
  if (net_flush(net) || (packet_len = my_net_read(net)) == packet_error)
99
100
  {
100
 
    errmsg = _("Failed in send_file() while reading file name");
 
101
    errmsg = "while reading file name";
101
102
    goto err;
102
103
  }
103
104
 
110
111
 
111
112
  if ((fd = my_open(fname, O_RDONLY, MYF(0))) < 0)
112
113
  {
113
 
    errmsg = _("Failed in send_file() on open of file");
 
114
    errmsg = "on open of file";
114
115
    goto err;
115
116
  }
116
117
 
118
119
  {
119
120
    if (my_net_write(net, buf, bytes))
120
121
    {
121
 
      errmsg = _("Failed in send_file() while writing data to client");
 
122
      errmsg = "while writing data to client";
122
123
      goto err;
123
124
    }
124
125
  }
125
126
 
126
127
 end:
127
 
  if (my_net_write(net, (unsigned char*) "", 0) || net_flush(net) ||
 
128
  if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
128
129
      (my_net_read(net) == packet_error))
129
130
  {
130
 
    errmsg = _("Failed in send_file() while negotiating file transfer close");
 
131
    errmsg = "while negotiating file transfer close";
131
132
    goto err;
132
133
  }
133
134
  error = 0;
138
139
    (void) my_close(fd, MYF(0));
139
140
  if (errmsg)
140
141
  {
141
 
    sql_print_error(errmsg);
 
142
    sql_print_error("Failed in send_file() %s", errmsg);
142
143
  }
143
144
  return(error);
144
145
}
209
210
    if ((linfo = tmp->current_linfo))
210
211
    {
211
212
      pthread_mutex_lock(&linfo->lock);
212
 
      result = !memcmp(log_name, linfo->log_file_name, log_name_len);
 
213
      result = !bcmp((uchar*) log_name, (uchar*) linfo->log_file_name,
 
214
                     log_name_len);
213
215
      pthread_mutex_unlock(&linfo->lock);
214
216
      if (result)
215
217
        break;
222
224
 
223
225
bool purge_error_message(THD* thd, int res)
224
226
{
225
 
  uint32_t errmsg= 0;
 
227
  uint errmsg= 0;
226
228
 
227
229
  switch (res)  {
228
230
  case 0: break;
315
317
*/ 
316
318
static uint64_t get_heartbeat_period(THD * thd)
317
319
{
318
 
  bool null_value;
 
320
  my_bool null_value;
319
321
  LEX_STRING name=  { C_STRING_WITH_LEN("master_heartbeat_period")};
320
322
  user_var_entry *entry= 
321
 
    (user_var_entry*) hash_search(&thd->user_vars, (unsigned char*) name.str,
 
323
    (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
322
324
                                  name.length);
323
325
  return entry? entry->val_int(&null_value) : 0;
324
326
}
351
353
 
352
354
  char* p= coord->file_name + dirname_length(coord->file_name);
353
355
 
354
 
  uint32_t ident_len = strlen(p);
355
 
  uint32_t event_len = ident_len + LOG_EVENT_HEADER_LEN;
 
356
  uint ident_len = strlen(p);
 
357
  ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
356
358
  int4store(header + SERVER_ID_OFFSET, server_id);
357
359
  int4store(header + EVENT_LEN_OFFSET, event_len);
358
360
  int2store(header + FLAGS_OFFSET, 0);
362
364
  packet->append(header, sizeof(header));
363
365
  packet->append(p, ident_len);             // log_file_name
364
366
 
365
 
  if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) ||
 
367
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
366
368
      net_flush(net))
367
369
  {
368
370
    return(-1);
376
378
*/
377
379
 
378
380
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
379
 
                       uint16_t flags)
 
381
                       ushort flags)
380
382
{
381
383
  LOG_INFO linfo;
382
384
  char *log_file_name = linfo.log_file_name;
390
392
  pthread_mutex_t *log_lock;
391
393
  bool binlog_can_be_corrupted= false;
392
394
 
393
 
  memset(&log, 0, sizeof(log));
 
395
  bzero((char*) &log,sizeof(log));
394
396
  /* 
395
397
     heartbeat_period from @master_heartbeat_period user variable
396
398
  */
399
401
  struct event_coordinates coord_buf;
400
402
  struct timespec *heartbeat_ts= NULL;
401
403
  struct event_coordinates *coord= NULL;
402
 
  if (heartbeat_period != 0L)
 
404
  if (heartbeat_period != 0LL)
403
405
  {
404
406
    heartbeat_ts= &heartbeat_buf;
405
407
    set_timespec_nsec(*heartbeat_ts, 0);
539
541
           to avoid destroying temp tables.
540
542
          */
541
543
         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
542
 
                   ST_CREATED_OFFSET+1, (uint32_t) 0);
 
544
                   ST_CREATED_OFFSET+1, (ulong) 0);
543
545
         /* send it */
544
 
         if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
546
         if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
545
547
         {
546
548
           errmsg = "Failed on my_net_write()";
547
549
           my_errno= ER_UNKNOWN_ERROR;
595
597
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
596
598
        binlog_can_be_corrupted= false;
597
599
 
598
 
      if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()))
 
600
      if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
599
601
      {
600
602
        errmsg = "Failed on my_net_write()";
601
603
        my_errno= ER_UNKNOWN_ERROR;
686
688
          {
687
689
            if (coord)
688
690
            {
689
 
              assert(heartbeat_ts && heartbeat_period != 0L);
 
691
              assert(heartbeat_ts && heartbeat_period != 0LL);
690
692
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
691
693
            }
692
694
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
693
 
            assert(ret == 0 || (heartbeat_period != 0L && coord != NULL));
 
695
            assert(ret == 0 || (heartbeat_period != 0LL && coord != NULL));
694
696
            if (ret == ETIMEDOUT || ret == ETIME)
695
697
            {
696
698
              if (send_heartbeat_event(net, packet, coord))
719
721
        if (read_packet)
720
722
        {
721
723
          thd_proc_info(thd, "Sending binlog event to slave");
722
 
          if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) )
 
724
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
723
725
          {
724
726
            errmsg = "Failed on my_net_write()";
725
727
            my_errno= ER_UNKNOWN_ERROR;
909
911
 
910
912
          /* Issuing warning then started without --skip-slave-start */
911
913
          if (!opt_skip_slave_start)
912
 
            push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
914
            push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
913
915
                         ER_MISSING_SKIP_SLAVE,
914
916
                         ER(ER_MISSING_SKIP_SLAVE));
915
917
        }
917
919
        pthread_mutex_unlock(&mi->rli.data_lock);
918
920
      }
919
921
      else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
920
 
        push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
 
922
        push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
921
923
                     ER(ER_UNTIL_COND_IGNORED));
922
924
 
923
925
      if (!slave_errno)
933
935
  else
934
936
  {
935
937
    /* no error if all threads are already started, only a warning */
936
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
 
938
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
937
939
                 ER(ER_SLAVE_WAS_RUNNING));
938
940
  }
939
941
 
981
983
  {
982
984
    //no error if both threads are already stopped, only a warning
983
985
    slave_errno= 0;
984
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
 
986
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
985
987
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
986
988
  }
987
989
  unlock_slave_threads(mi);
1019
1021
  struct stat stat_area;
1020
1022
  char fname[FN_REFLEN];
1021
1023
  int thread_mask= 0, error= 0;
1022
 
  uint32_t sql_errno=0;
 
1024
  uint sql_errno=0;
1023
1025
  const char* errmsg=0;
1024
1026
 
1025
1027
  lock_slave_threads(mi);
1031
1033
    goto err;
1032
1034
  }
1033
1035
 
 
1036
  ha_reset_slave(thd);
 
1037
 
1034
1038
  // delete relay logs, clear relay log coordinates
1035
1039
  if ((error= purge_relay_logs(&mi->rli, thd,
1036
1040
                               1 /* just reset */,
1183
1187
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1184
1188
    mi->heartbeat_period = lex_mi->heartbeat_period;
1185
1189
  else
1186
 
    mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
 
1190
    mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
1187
1191
                                      (slave_net_timeout/2.0));
1188
 
  mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
 
1192
  mi->received_heartbeats= 0LL; // counter lives until master is CHANGEd
1189
1193
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1190
1194
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
1191
1195
 
1206
1210
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
1207
1211
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
1208
1212
      lex_mi->ssl_verify_server_cert )
1209
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
1213
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
1210
1214
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1211
1215
 
1212
1216
  if (lex_mi->relay_log_name)
1244
1248
   {
1245
1249
     /*
1246
1250
       Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1247
 
       not initialized), so we use a cmax().
 
1251
       not initialized), so we use a max().
1248
1252
       What happens to mi->rli.master_log_pos during the initialization stages
1249
1253
       of replication is not 100% clear, so we guard against problems using
1250
 
       cmax().
 
1254
       max().
1251
1255
      */
1252
 
     mi->master_log_pos = ((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1253
 
                           ? BIN_LOG_HEADER_SIZE
1254
 
                           : mi->rli.group_master_log_pos);
 
1256
     mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
 
1257
                              mi->rli.group_master_log_pos);
1255
1258
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
1256
1259
             sizeof(mi->master_log_name)-1);
1257
1260
  }
1348
1351
                   const char* log_file_name2, uint64_t log_pos2)
1349
1352
{
1350
1353
  int res;
1351
 
  uint32_t log_file_name1_len=  strlen(log_file_name1);
1352
 
  uint32_t log_file_name2_len=  strlen(log_file_name2);
 
1354
  uint log_file_name1_len=  strlen(log_file_name1);
 
1355
  uint log_file_name2_len=  strlen(log_file_name2);
1353
1356
 
1354
1357
  //  We assume that both log names match up to '.'
1355
1358
  if (log_file_name1_len == log_file_name2_len)
1379
1382
  Format_description_log_event *description_event= new
1380
1383
    Format_description_log_event(3); /* MySQL 4.0 by default */
1381
1384
 
 
1385
  /*
 
1386
    Wait for handlers to insert any pending information
 
1387
    into the binlog.  For e.g. ndb which updates the binlog asynchronously
 
1388
    this is needed so that the uses sees all its own commands in the binlog
 
1389
  */
 
1390
  ha_binlog_wait(thd);
 
1391
 
1382
1392
  if (mysql_bin_log.is_open())
1383
1393
  {
1384
1394
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1385
1395
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
1386
1396
    ha_rows event_count, limit_start, limit_end;
1387
 
    my_off_t pos = cmax((uint64_t)BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
 
1397
    my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1388
1398
    char search_file_name[FN_REFLEN], *name;
1389
1399
    const char *log_file_name = lex_mi->log_file_name;
1390
1400
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1555
1565
  File file;
1556
1566
  char fname[FN_REFLEN];
1557
1567
  List<Item> field_list;
1558
 
  uint32_t length;
 
1568
  uint length;
1559
1569
  int cur_dir_len;
1560
1570
  Protocol *protocol= thd->protocol;
1561
1571
 
1632
1642
int log_loaded_block(IO_CACHE* file)
1633
1643
{
1634
1644
  LOAD_FILE_INFO *lf_info;
1635
 
  uint32_t block_len;
 
1645
  uint block_len;
1636
1646
  /* buffer contains position where we started last read */
1637
 
  unsigned char* buffer= (unsigned char*) my_b_get_buffer_start(file);
1638
 
  uint32_t max_event_size= current_thd->variables.max_allowed_packet;
 
1647
  uchar* buffer= (uchar*) my_b_get_buffer_start(file);
 
1648
  uint max_event_size= current_thd->variables.max_allowed_packet;
1639
1649
  lf_info= (LOAD_FILE_INFO*) file->arg;
1640
1650
  if (lf_info->thd->current_stmt_binlog_row_based)
1641
1651
    return(0);
1644
1654
    return(0);
1645
1655
  
1646
1656
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1647
 
       buffer += cmin(block_len, max_event_size),
1648
 
       block_len -= cmin(block_len, max_event_size))
 
1657
       buffer += min(block_len, max_event_size),
 
1658
       block_len -= min(block_len, max_event_size))
1649
1659
  {
1650
1660
    lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1651
1661
    if (lf_info->wrote_create_file)
1652
1662
    {
1653
1663
      Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
1654
 
                               cmin(block_len, max_event_size),
 
1664
                               min(block_len, max_event_size),
1655
1665
                               lf_info->log_delayed);
1656
1666
      mysql_bin_log.write(&a);
1657
1667
    }
1659
1669
    {
1660
1670
      Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1661
1671
                                   buffer,
1662
 
                                   cmin(block_len, max_event_size),
 
1672
                                   min(block_len, max_event_size),
1663
1673
                                   lf_info->log_delayed);
1664
1674
      mysql_bin_log.write(&b);
1665
1675
      lf_info->wrote_create_file= 1;
1699
1709
static void fix_slave_net_timeout(THD *thd,
1700
1710
                                  enum_var_type type __attribute__((unused)))
1701
1711
{
 
1712
#ifdef HAVE_REPLICATION
1702
1713
  pthread_mutex_lock(&LOCK_active_mi);
1703
1714
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1704
 
    push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1715
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1705
1716
                        ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1706
1717
                        "The currect value for master_heartbeat_period"
1707
1718
                        " exceeds the new value of `slave_net_timeout' sec."
1708
1719
                        " A sensible value for the period should be"
1709
1720
                        " less than the timeout.");
1710
1721
  pthread_mutex_unlock(&LOCK_active_mi);
 
1722
#endif
1711
1723
  return;
1712
1724
}
1713
1725
 
1758
1770
    if (var->value != buff)
1759
1771
      buff--;                           // Remove last ','
1760
1772
    if (i < MAX_SLAVE_ERROR)
1761
 
      buff= my_stpcpy(buff, "...");  // Couldn't show all errors
 
1773
      buff= strmov(buff, "...");  // Couldn't show all errors
1762
1774
    *buff=0;
1763
1775
  }
1764
1776
  return 0;
1821
1833
bool sys_var_sync_binlog_period::update(THD *thd __attribute__((unused)),
1822
1834
                                        set_var *var)
1823
1835
{
1824
 
  sync_binlog_period= (uint32_t) var->save_result.uint64_t_value;
 
1836
  sync_binlog_period= (ulong) var->save_result.uint64_t_value;
1825
1837
  return 0;
1826
1838
}
1827
1839
 
1837
1849
  }
1838
1850
  return 0;
1839
1851
}
 
1852
 
 
1853
#endif /* HAVE_REPLICATION */
 
1854
 
 
1855