~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to server/sql_repl.cc

MergedĀ fromĀ trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
#include <my_dir.h>
24
24
 
25
25
int max_binlog_dump_events = 0; // unlimited
26
 
my_bool opt_sporadic_binlog_dump_fail = 0;
27
 
#ifndef DBUG_OFF
28
 
static int binlog_dump_count = 0;
29
 
#endif
30
26
 
31
27
/*
32
28
    fake_rotate_event() builds a fake (=which does not exist physically in any
45
41
    well-placed zeros was not possible as Rotate events have a variable-length
46
42
    part.
47
43
*/
48
 
 
49
44
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
50
45
                             ulonglong position, const char** errmsg)
51
46
{
52
 
  DBUG_ENTER("fake_rotate_event");
53
47
  char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
54
48
  /*
55
49
    'when' (the timestamp) is set to 0 so that slave could distinguish between
75
69
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
76
70
  {
77
71
    *errmsg = "failed on my_net_write()";
78
 
    DBUG_RETURN(-1);
 
72
    return(-1);
79
73
  }
80
 
  DBUG_RETURN(0);
 
74
  return(0);
81
75
}
82
76
 
83
77
static int send_file(THD *thd)
90
84
  int old_timeout;
91
85
  unsigned long packet_len;
92
86
  uchar buf[IO_SIZE];                           // It's safe to alloc this
93
 
  DBUG_ENTER("send_file");
94
87
 
95
88
  /*
96
89
    The client might be slow loading the data, give him wait_timeout to do
147
140
  if (errmsg)
148
141
  {
149
142
    sql_print_error("Failed in send_file() %s", errmsg);
150
 
    DBUG_PRINT("error", (errmsg));
151
143
  }
152
 
  DBUG_RETURN(error);
 
144
  return(error);
153
145
}
154
146
 
155
147
 
250
242
  if (errmsg)
251
243
  {
252
244
    my_message(errmsg, ER(errmsg), MYF(0));
253
 
    return TRUE;
 
245
    return true;
254
246
  }
255
247
  my_ok(thd);
256
 
  return FALSE;
 
248
  return false;
257
249
}
258
250
 
259
251
 
263
255
  if (!mysql_bin_log.is_open())
264
256
  {
265
257
    my_ok(thd);
266
 
    return FALSE;
 
258
    return false;
267
259
  }
268
260
 
269
261
  mysql_bin_log.make_log_name(search_file_name, to_log);
350
342
static int send_heartbeat_event(NET* net, String* packet,
351
343
                                const struct event_coordinates *coord)
352
344
{
353
 
  DBUG_ENTER("send_heartbeat_event");
354
345
  char header[LOG_EVENT_HEADER_LEN];
355
346
  /*
356
347
    'when' (the timestamp) is set to 0 so that slave could distinguish between
376
367
  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
377
368
      net_flush(net))
378
369
  {
379
 
    DBUG_RETURN(-1);
 
370
    return(-1);
380
371
  }
381
372
  packet->set("\0", 1, &my_charset_bin);
382
 
  DBUG_RETURN(0);
 
373
  return(0);
383
374
}
384
375
 
385
376
/*
399
390
  const char *errmsg = "Unknown error";
400
391
  NET* net = &thd->net;
401
392
  pthread_mutex_t *log_lock;
402
 
  bool binlog_can_be_corrupted= FALSE;
403
 
#ifndef DBUG_OFF
404
 
  int left_events = max_binlog_dump_events;
405
 
#endif
406
 
  DBUG_ENTER("mysql_binlog_send");
407
 
  DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));
 
393
  bool binlog_can_be_corrupted= false;
408
394
 
409
395
  bzero((char*) &log,sizeof(log));
410
396
  /* 
423
409
    coord->file_name= log_file_name; // initialization basing on what slave remembers
424
410
    coord->pos= pos;
425
411
  }
426
 
#ifndef DBUG_OFF
427
 
  if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
428
 
  {
429
 
    errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover";
430
 
    my_errno= ER_UNKNOWN_ERROR;
431
 
    goto err;
432
 
  }
433
 
#endif
434
412
 
435
413
  if (!mysql_bin_log.is_open())
436
414
  {
604
582
  {
605
583
    while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
606
584
    {
607
 
#ifndef DBUG_OFF
608
 
      if (max_binlog_dump_events && !left_events--)
609
 
      {
610
 
        net_flush(net);
611
 
        errmsg = "Debugging binlog dump abort";
612
 
        my_errno= ER_UNKNOWN_ERROR;
613
 
        goto err;
614
 
      }
615
 
#endif
616
585
      /*
617
586
        log's filename does not change while it's active
618
587
      */
626
595
        (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
627
596
      }
628
597
      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
629
 
        binlog_can_be_corrupted= FALSE;
 
598
        binlog_can_be_corrupted= false;
630
599
 
631
600
      if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
632
601
      {
686
655
        log.error=0;
687
656
        bool read_packet = 0, fatal_error = 0;
688
657
 
689
 
#ifndef DBUG_OFF
690
 
        if (max_binlog_dump_events && !left_events--)
691
 
        {
692
 
          errmsg = "Debugging binlog dump abort";
693
 
          my_errno= ER_UNKNOWN_ERROR;
694
 
          goto err;
695
 
        }
696
 
#endif
697
 
 
698
658
        /*
699
659
          No one will update the log while we are reading
700
660
          now, but we'll be quick and just read one record
718
678
        case LOG_READ_EOF:
719
679
        {
720
680
          int ret;
721
 
          DBUG_PRINT("wait",("waiting for data in binary log"));
722
681
          if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
723
682
          {
724
683
            pthread_mutex_unlock(log_lock);
725
684
            goto end;
726
685
          }
727
686
 
728
 
#ifndef DBUG_OFF
729
 
          ulong hb_info_counter= 0;
730
 
#endif
731
687
          do 
732
688
          {
733
689
            if (coord)
734
690
            {
735
 
              DBUG_ASSERT(heartbeat_ts && heartbeat_period != 0LL);
 
691
              assert(heartbeat_ts && heartbeat_period != 0LL);
736
692
              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
737
693
            }
738
694
            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
739
 
            DBUG_ASSERT(ret == 0 || heartbeat_period != 0LL && coord != NULL);
 
695
            assert(ret == 0 || heartbeat_period != 0LL && coord != NULL);
740
696
            if (ret == ETIMEDOUT || ret == ETIME)
741
697
            {
742
 
#ifndef DBUG_OFF
743
 
              if (hb_info_counter < 3)
744
 
              {
745
 
                sql_print_information("master sends heartbeat message");
746
 
                hb_info_counter++;
747
 
                if (hb_info_counter == 3)
748
 
                  sql_print_information("the rest of heartbeat info skipped ...");
749
 
              }
750
 
#endif
751
698
              if (send_heartbeat_event(net, packet, coord))
752
699
              {
753
700
                errmsg = "Failed on my_net_write()";
758
705
            }
759
706
            else
760
707
            {
761
 
              DBUG_ASSERT(ret == 0);
762
 
              DBUG_PRINT("wait",("binary log received update"));
 
708
              assert(ret == 0);
763
709
            }
764
710
          } while (ret != 0 && coord != NULL && !thd->killed);
765
711
          pthread_mutex_unlock(log_lock);
864
810
  pthread_mutex_lock(&LOCK_thread_count);
865
811
  thd->current_linfo = 0;
866
812
  pthread_mutex_unlock(&LOCK_thread_count);
867
 
  DBUG_VOID_RETURN;
 
813
  return;
868
814
 
869
815
err:
870
816
  thd_proc_info(thd, "Waiting to finalize termination");
883
829
    (void) my_close(file, MYF(MY_WME));
884
830
 
885
831
  my_message(my_errno, errmsg, MYF(0));
886
 
  DBUG_VOID_RETURN;
 
832
  return;
887
833
}
888
834
 
889
835
int start_slave(THD* thd , Master_info* mi,  bool net_report)
890
836
{
891
837
  int slave_errno= 0;
892
838
  int thread_mask;
893
 
  DBUG_ENTER("start_slave");
894
839
 
895
840
  lock_slave_threads(mi);  // this allows us to cleanly read slave_running
896
841
  // Get a mask of _stopped_ threads
1000
945
  {
1001
946
    if (net_report)
1002
947
      my_message(slave_errno, ER(slave_errno), MYF(0));
1003
 
    DBUG_RETURN(1);
 
948
    return(1);
1004
949
  }
1005
950
  else if (net_report)
1006
951
    my_ok(thd);
1007
952
 
1008
 
  DBUG_RETURN(0);
 
953
  return(0);
1009
954
}
1010
955
 
1011
956
 
1012
957
int stop_slave(THD* thd, Master_info* mi, bool net_report )
1013
958
{
1014
 
  DBUG_ENTER("stop_slave");
1015
 
  
1016
959
  int slave_errno;
1017
960
  if (!thd)
1018
961
    thd = current_thd;
1050
993
  {
1051
994
    if (net_report)
1052
995
      my_message(slave_errno, ER(slave_errno), MYF(0));
1053
 
    DBUG_RETURN(1);
 
996
    return(1);
1054
997
  }
1055
998
  else if (net_report)
1056
999
    my_ok(thd);
1057
1000
 
1058
 
  DBUG_RETURN(0);
 
1001
  return(0);
1059
1002
}
1060
1003
 
1061
1004
 
1080
1023
  int thread_mask= 0, error= 0;
1081
1024
  uint sql_errno=0;
1082
1025
  const char* errmsg=0;
1083
 
  DBUG_ENTER("reset_slave");
1084
1026
 
1085
1027
  lock_slave_threads(mi);
1086
1028
  init_thread_mask(&thread_mask,mi,0 /* not inverse */);
1129
1071
  unlock_slave_threads(mi);
1130
1072
  if (error)
1131
1073
    my_error(sql_errno, MYF(0), errmsg);
1132
 
  DBUG_RETURN(error);
 
1074
  return(error);
1133
1075
}
1134
1076
 
1135
1077
/*
1186
1128
  int thread_mask;
1187
1129
  const char* errmsg= 0;
1188
1130
  bool need_relay_log_purge= 1;
1189
 
  DBUG_ENTER("change_master");
1190
1131
 
1191
1132
  lock_slave_threads(mi);
1192
1133
  init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
1194
1135
  {
1195
1136
    my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
1196
1137
    unlock_slave_threads(mi);
1197
 
    DBUG_RETURN(TRUE);
 
1138
    return(true);
1198
1139
  }
1199
1140
 
1200
1141
  thd_proc_info(thd, "Changing master");
1205
1146
  {
1206
1147
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1207
1148
    unlock_slave_threads(mi);
1208
 
    DBUG_RETURN(TRUE);
 
1149
    return(true);
1209
1150
  }
1210
1151
 
1211
1152
  /*
1232
1173
  {
1233
1174
    mi->master_log_pos= lex_mi->pos;
1234
1175
  }
1235
 
  DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
1236
1176
 
1237
1177
  if (lex_mi->host)
1238
1178
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1326
1266
  {
1327
1267
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1328
1268
    unlock_slave_threads(mi);
1329
 
    DBUG_RETURN(TRUE);
 
1269
    return(true);
1330
1270
  }
1331
1271
  if (need_relay_log_purge)
1332
1272
  {
1338
1278
    {
1339
1279
      my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
1340
1280
      unlock_slave_threads(mi);
1341
 
      DBUG_RETURN(TRUE);
 
1281
      return(true);
1342
1282
    }
1343
1283
  }
1344
1284
  else
1354
1294
    {
1355
1295
      my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
1356
1296
      unlock_slave_threads(mi);
1357
 
      DBUG_RETURN(TRUE);
 
1297
      return(true);
1358
1298
    }
1359
1299
  }
1360
1300
  /*
1368
1308
    That's why we always save good coords in rli.
1369
1309
  */
1370
1310
  mi->rli.group_master_log_pos= mi->master_log_pos;
1371
 
  DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
1372
1311
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
1373
1312
          sizeof(mi->rli.group_master_log_name)-1);
1374
1313
 
1394
1333
  unlock_slave_threads(mi);
1395
1334
  thd_proc_info(thd, 0);
1396
1335
  my_ok(thd);
1397
 
  DBUG_RETURN(FALSE);
 
1336
  return(false);
1398
1337
}
1399
1338
 
1400
1339
int reset_master(THD* thd)
1430
1369
{
1431
1370
  Protocol *protocol= thd->protocol;
1432
1371
  List<Item> field_list;
1433
 
  const char *errmsg = 0;
1434
 
  bool ret = TRUE;
 
1372
  const char *errmsg= 0;
 
1373
  bool ret= true;
1435
1374
  IO_CACHE log;
1436
 
  File file = -1;
1437
 
  DBUG_ENTER("mysql_show_binlog_events");
 
1375
  File file= -1;
1438
1376
 
1439
1377
  Log_event::init_show_field_list(&field_list);
1440
1378
  if (protocol->send_fields(&field_list,
1441
1379
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1442
 
    DBUG_RETURN(TRUE);
 
1380
    return(true);
1443
1381
 
1444
1382
  Format_description_log_event *description_event= new
1445
1383
    Format_description_log_event(3); /* MySQL 4.0 by default */
1553
1491
    pthread_mutex_unlock(log_lock);
1554
1492
  }
1555
1493
 
1556
 
  ret= FALSE;
 
1494
  ret= false;
1557
1495
 
1558
1496
err:
1559
1497
  delete description_event;
1572
1510
  pthread_mutex_lock(&LOCK_thread_count);
1573
1511
  thd->current_linfo = 0;
1574
1512
  pthread_mutex_unlock(&LOCK_thread_count);
1575
 
  DBUG_RETURN(ret);
 
1513
  return(ret);
1576
1514
}
1577
1515
 
1578
1516
 
1579
1517
bool show_binlog_info(THD* thd)
1580
1518
{
1581
1519
  Protocol *protocol= thd->protocol;
1582
 
  DBUG_ENTER("show_binlog_info");
1583
1520
  List<Item> field_list;
1584
1521
  field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1585
1522
  field_list.push_back(new Item_return_int("Position",20,
1589
1526
 
1590
1527
  if (protocol->send_fields(&field_list,
1591
1528
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1592
 
    DBUG_RETURN(TRUE);
 
1529
    return(true);
1593
1530
  protocol->prepare_for_resend();
1594
1531
 
1595
1532
  if (mysql_bin_log.is_open())
1602
1539
    protocol->store(binlog_filter->get_do_db());
1603
1540
    protocol->store(binlog_filter->get_ignore_db());
1604
1541
    if (protocol->write())
1605
 
      DBUG_RETURN(TRUE);
 
1542
      return(true);
1606
1543
  }
1607
1544
  my_eof(thd);
1608
 
  DBUG_RETURN(FALSE);
 
1545
  return(false);
1609
1546
}
1610
1547
 
1611
1548
 
1617
1554
    thd         Thread specific variable
1618
1555
 
1619
1556
  RETURN VALUES
1620
 
    FALSE OK
1621
 
    TRUE  error
 
1557
    false OK
 
1558
    true  error
1622
1559
*/
1623
1560
 
1624
1561
bool show_binlogs(THD* thd)
1631
1568
  uint length;
1632
1569
  int cur_dir_len;
1633
1570
  Protocol *protocol= thd->protocol;
1634
 
  DBUG_ENTER("show_binlogs");
1635
1571
 
1636
1572
  if (!mysql_bin_log.is_open())
1637
1573
  {
1644
1580
                                           MYSQL_TYPE_LONGLONG));
1645
1581
  if (protocol->send_fields(&field_list,
1646
1582
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1647
 
    DBUG_RETURN(TRUE);
 
1583
    return(true);
1648
1584
  
1649
1585
  pthread_mutex_lock(mysql_bin_log.get_log_lock());
1650
1586
  mysql_bin_log.lock_index();
1687
1623
  }
1688
1624
  mysql_bin_log.unlock_index();
1689
1625
  my_eof(thd);
1690
 
  DBUG_RETURN(FALSE);
 
1626
  return(false);
1691
1627
 
1692
1628
err:
1693
1629
  mysql_bin_log.unlock_index();
1694
 
  DBUG_RETURN(TRUE);
 
1630
  return(true);
1695
1631
}
1696
1632
 
1697
1633
/**
1705
1641
*/
1706
1642
int log_loaded_block(IO_CACHE* file)
1707
1643
{
1708
 
  DBUG_ENTER("log_loaded_block");
1709
1644
  LOAD_FILE_INFO *lf_info;
1710
1645
  uint block_len;
1711
1646
  /* buffer contains position where we started last read */
1713
1648
  uint max_event_size= current_thd->variables.max_allowed_packet;
1714
1649
  lf_info= (LOAD_FILE_INFO*) file->arg;
1715
1650
  if (lf_info->thd->current_stmt_binlog_row_based)
1716
 
    DBUG_RETURN(0);
 
1651
    return(0);
1717
1652
  if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1718
1653
      lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1719
 
    DBUG_RETURN(0);
 
1654
    return(0);
1720
1655
  
1721
1656
  for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1722
1657
       buffer += min(block_len, max_event_size),
1738
1673
                                   lf_info->log_delayed);
1739
1674
      mysql_bin_log.write(&b);
1740
1675
      lf_info->wrote_create_file= 1;
1741
 
      DBUG_SYNC_POINT("debug_lock.created_file_event",10);
1742
1676
    }
1743
1677
  }
1744
 
  DBUG_RETURN(0);
 
1678
  return(0);
1745
1679
}
1746
1680
 
1747
1681
/*
1775
1709
static void fix_slave_net_timeout(THD *thd,
1776
1710
                                  enum_var_type type __attribute__((__unused__)))
1777
1711
{
1778
 
  DBUG_ENTER("fix_slave_net_timeout");
1779
1712
#ifdef HAVE_REPLICATION
1780
1713
  pthread_mutex_lock(&LOCK_active_mi);
1781
1714
  if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1787
1720
                        " less than the timeout.");
1788
1721
  pthread_mutex_unlock(&LOCK_active_mi);
1789
1722
#endif
1790
 
  DBUG_VOID_RETURN;
 
1723
  return;
1791
1724
}
1792
1725
 
1793
1726
static sys_var_chain vars = { NULL, NULL };