~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/slave.cc

  • Committer: Monty Taylor
  • Date: 2008-08-04 22:01:39 UTC
  • mto: (261.1.4 drizzle)
  • mto: This revision was merged to the branch mainline in revision 262.
  • Revision ID: monty@inaugust.com-20080804220139-fy862jc9lykayvka
Moved libdrizzle.ver.in to libdrizzle.ver.

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
  @brief Code to run the io thread and the sql thread on the
24
24
  replication slave.
25
25
*/
26
 
#include <drizzled/server_includes.h>
 
26
 
 
27
#include "mysql_priv.h"
27
28
 
28
29
#include <storage/myisam/myisam.h>
 
30
#include "slave.h"
29
31
#include "rpl_mi.h"
30
32
#include "rpl_rli.h"
31
33
#include "sql_repl.h"
37
39
#include <mysys/mysys_err.h>
38
40
#include <drizzled/drizzled_error_messages.h>
39
41
 
 
42
#ifdef HAVE_REPLICATION
 
43
 
40
44
#include "rpl_tblmap.h"
41
45
 
42
46
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
282
286
  use_slave_mask = 1;
283
287
  for (;my_isspace(system_charset_info,*arg);++arg)
284
288
    /* empty */;
285
 
  if (!my_strnncoll(system_charset_info,(unsigned char*)arg,4,(const unsigned char*)"all",4))
 
289
  if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
286
290
  {
287
291
    bitmap_set_all(&slave_error_mask);
288
292
    return;
541
545
 
542
546
 
543
547
#ifdef NOT_USED_YET
544
 
static int32_t end_slave_on_walk(Master_info* mi, unsigned char* /*unused*/)
 
548
static int32_t end_slave_on_walk(Master_info* mi, uchar* /*unused*/)
545
549
{
546
550
  end_master_info(mi);
547
551
  return(0);
635
639
{
636
640
  (void)net_request_file(net, "/dev/null");
637
641
  (void)my_net_read(net);                               // discard response
638
 
  (void)net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0); // ok
 
642
  (void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok
639
643
  return;
640
644
}
641
645
 
642
646
 
643
647
bool net_request_file(NET* net, const char* fname)
644
648
{
645
 
  return(net_write_command(net, 251, (unsigned char*) fname, strlen(fname),
646
 
                                (unsigned char*) "", 0));
 
649
  return(net_write_command(net, 251, (uchar*) fname, strlen(fname),
 
650
                                (uchar*) "", 0));
647
651
}
648
652
 
649
653
/*
1092
1096
int32_t register_slave_on_master(DRIZZLE *drizzle, Master_info *mi,
1093
1097
                             bool *suppress_warnings)
1094
1098
{
1095
 
  unsigned char buf[1024], *pos= buf;
 
1099
  uchar buf[1024], *pos= buf;
1096
1100
  uint32_t report_host_len, report_user_len=0, report_password_len=0;
1097
1101
 
1098
1102
  *suppress_warnings= false;
1109
1113
    return(0);                                     // safety
1110
1114
 
1111
1115
  int4store(pos, server_id); pos+= 4;
1112
 
  pos= net_store_data(pos, (unsigned char*) report_host, report_host_len);
1113
 
  pos= net_store_data(pos, (unsigned char*) report_user, report_user_len);
1114
 
  pos= net_store_data(pos, (unsigned char*) report_password, report_password_len);
 
1116
  pos= net_store_data(pos, (uchar*) report_host, report_host_len);
 
1117
  pos= net_store_data(pos, (uchar*) report_user, report_user_len);
 
1118
  pos= net_store_data(pos, (uchar*) report_password, report_password_len);
1115
1119
  int2store(pos, (uint16_t) report_port); pos+= 2;
1116
1120
  int4store(pos, rpl_recovery_rank);    pos+= 4;
1117
1121
  /* The master will fill in master_id */
1218
1222
      non-volotile members like mi->io_thd, which is guarded by the mutex.
1219
1223
    */
1220
1224
    pthread_mutex_lock(&mi->run_lock);
1221
 
    protocol->store(mi->io_thd ? mi->io_thd->get_proc_info() : "", &my_charset_bin);
 
1225
    protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
1222
1226
    pthread_mutex_unlock(&mi->run_lock);
1223
1227
 
1224
1228
    pthread_mutex_lock(&mi->data_lock);
1234
1238
                    &my_charset_bin);
1235
1239
    protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1236
1240
    protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
1237
 
    protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
 
1241
    protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ?
1238
1242
                    "Yes" : "No", &my_charset_bin);
1239
1243
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1240
1244
    protocol->store(rpl_filter->get_do_db());
1275
1279
      Seconds_Behind_Master: if SQL thread is running and I/O thread is
1276
1280
      connected, we can compute it otherwise show NULL (i.e. unknown).
1277
1281
    */
1278
 
    if ((mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT) &&
 
1282
    if ((mi->slave_running == MYSQL_SLAVE_RUN_CONNECT) &&
1279
1283
        mi->rli.slave_running)
1280
1284
    {
1281
1285
      long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
1295
1299
        slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1296
1300
        between timestamp of slave and rli->last_master_timestamp is 0
1297
1301
        (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1298
 
        This confuses users, so we don't go below 0: hence the cmax().
 
1302
        This confuses users, so we don't go below 0: hence the max().
1299
1303
 
1300
1304
        last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1301
1305
        special marker to say "consider we have caught up".
1302
1306
      */
1303
1307
      protocol->store((int64_t)(mi->rli.last_master_timestamp ?
1304
 
                                 cmax((long)0, time_diff) : 0));
 
1308
                                 max(0, time_diff) : 0));
1305
1309
    }
1306
1310
    else
1307
1311
    {
1321
1325
    pthread_mutex_unlock(&mi->rli.data_lock);
1322
1326
    pthread_mutex_unlock(&mi->data_lock);
1323
1327
 
1324
 
    if (my_net_write(&thd->net, (unsigned char*) thd->packet.ptr(), packet->length()))
 
1328
    if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
1325
1329
      return(true);
1326
1330
  }
1327
1331
  my_eof(thd);
1448
1452
static int32_t request_dump(DRIZZLE *drizzle, Master_info* mi,
1449
1453
                        bool *suppress_warnings)
1450
1454
{
1451
 
  unsigned char buf[FN_REFLEN + 10];
 
1455
  uchar buf[FN_REFLEN + 10];
1452
1456
  int32_t len;
1453
1457
  int32_t binlog_flags = 0; // for now
1454
1458
  char* logname = mi->master_log_name;
1827
1831
            exec_res= 0;
1828
1832
            end_trans(thd, ROLLBACK);
1829
1833
            /* chance for concurrent connection to get more locks */
1830
 
            safe_sleep(thd, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
 
1834
            safe_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
1831
1835
                       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1832
1836
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1833
1837
            rli->trans_retries++;
1899
1903
*/
1900
1904
 
1901
1905
static int32_t try_to_reconnect(THD *thd, DRIZZLE *drizzle, Master_info *mi,
1902
 
                                uint32_t *retry_count, bool suppress_warnings,
1903
 
                                const char *messages[SLAVE_RECON_MSG_MAX])
 
1906
                            uint32_t *retry_count, bool suppress_warnings,
 
1907
                            const char *messages[SLAVE_RECON_MSG_MAX])
1904
1908
{
1905
 
  mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1906
 
  thd->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
1907
 
  drizzle_disconnect(drizzle);
 
1909
  mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
 
1910
  thd->proc_info= _(messages[SLAVE_RECON_MSG_WAIT]);
 
1911
#ifdef SIGNAL_WITH_VIO_CLOSE
 
1912
  thd->clear_active_vio();
 
1913
#endif
 
1914
  end_server(drizzle);
1908
1915
  if ((*retry_count)++)
1909
1916
  {
1910
1917
    if (*retry_count > master_retry_count)
1915
1922
  if (check_io_slave_killed(thd, mi,
1916
1923
                            _(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1917
1924
    return 1;
1918
 
  thd->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
 
1925
  thd->proc_info = _(messages[SLAVE_RECON_MSG_AFTER]);
1919
1926
  if (!suppress_warnings)
1920
1927
  {
1921
1928
    char buf[256], llbuff[22];
2023
2030
connected:
2024
2031
 
2025
2032
  // TODO: the assignment below should be under mutex (5.0)
2026
 
  mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
 
2033
  mi->slave_running= MYSQL_SLAVE_RUN_CONNECT;
2027
2034
  thd->slave_net = &drizzle->net;
2028
2035
  thd_proc_info(thd, "Checking master version");
2029
2036
  if (get_master_version_and_clock(drizzle, mi))
2176
2183
  sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2177
2184
                          "position %s"),
2178
2185
                        IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2179
 
  pthread_mutex_lock(&LOCK_thread_count);
 
2186
  VOID(pthread_mutex_lock(&LOCK_thread_count));
2180
2187
  thd->query = thd->db = 0; // extra safety
2181
2188
  thd->query_length= thd->db_length= 0;
2182
 
  pthread_mutex_unlock(&LOCK_thread_count);
 
2189
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
2183
2190
  if (drizzle)
2184
2191
  {
2185
2192
    /*
2190
2197
      can be called in the middle of closing the VIO associated with
2191
2198
      the 'mysql' object, causing a crash.
2192
2199
    */
 
2200
#ifdef SIGNAL_WITH_VIO_CLOSE
 
2201
    thd->clear_active_vio();
 
2202
#endif
2193
2203
    drizzle_close(drizzle);
2194
2204
    mi->drizzle=0;
2195
2205
  }
2397
2407
        }
2398
2408
 
2399
2409
        /* Print any warnings issued */
2400
 
        List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
2401
 
        DRIZZLE_ERROR *err;
 
2410
        List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
 
2411
        MYSQL_ERROR *err;
2402
2412
        /*
2403
2413
          Added controlled slave thread cancel for replication
2404
2414
          of user-defined variables.
2445
2455
    must "proactively" clear playgrounds:
2446
2456
  */
2447
2457
  rli->cleanup_context(thd, 1);
2448
 
  pthread_mutex_lock(&LOCK_thread_count);
 
2458
  VOID(pthread_mutex_lock(&LOCK_thread_count));
2449
2459
  /*
2450
2460
    Some extra safety, which should not been needed (normally, event deletion
2451
2461
    should already have done these assignments (each event which sets these
2453
2463
  */
2454
2464
  thd->query= thd->db= thd->catalog= 0;
2455
2465
  thd->query_length= thd->db_length= 0;
2456
 
  pthread_mutex_unlock(&LOCK_thread_count);
 
2466
  VOID(pthread_mutex_unlock(&LOCK_thread_count));
2457
2467
  thd_proc_info(thd, "Waiting for slave mutex on exit");
2458
2468
  pthread_mutex_lock(&rli->run_lock);
2459
2469
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
2551
2561
      if (unlikely(!num_bytes)) /* eof */
2552
2562
      {
2553
2563
        /* 3.23 master wants it */
2554
 
        net_write_command(net, 0, (unsigned char*) "", 0, (unsigned char*) "", 0);
 
2564
        net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0);
2555
2565
        /*
2556
2566
          If we wrote Create_file_log_event, then we need to write
2557
2567
          Execute_load_log_event. If we did not write Create_file_log_event,
2720
2730
                      "master could be corrupt but a more likely cause "
2721
2731
                      "of this is a bug"),
2722
2732
                    errmsg);
2723
 
    free((char*) tmp_buf);
 
2733
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
2724
2734
    return(1);
2725
2735
  }
2726
2736
 
2756
2766
    delete ev;
2757
2767
    mi->master_log_pos += inc_pos;
2758
2768
    pthread_mutex_unlock(&mi->data_lock);
2759
 
    free((char*)tmp_buf);
 
2769
    my_free((char*)tmp_buf, MYF(0));
2760
2770
    return(error);
2761
2771
  }
2762
2772
  default:
2806
2816
                      "master could be corrupt but a more likely cause of "
2807
2817
                      "this is a bug"),
2808
2818
                    errmsg);
2809
 
    free((char*) tmp_buf);
 
2819
    my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
2810
2820
    return(1);
2811
2821
  }
2812
2822
  pthread_mutex_lock(&mi->data_lock);
3139
3149
*/
3140
3150
 
3141
3151
static int32_t connect_to_master(THD* thd, DRIZZLE *drizzle, Master_info* mi,
3142
 
                                 bool reconnect, bool suppress_warnings)
 
3152
                             bool reconnect, bool suppress_warnings)
3143
3153
{
3144
3154
  int32_t slave_was_killed;
3145
3155
  int32_t last_errno= -2;                           // impossible error
3154
3164
  drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3155
3165
  drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3156
3166
 
 
3167
  drizzle_options(drizzle, DRIZZLE_SET_CHARSET_NAME, default_charset_info->csname);
 
3168
  /* This one is not strictly needed but we have it here for completeness */
 
3169
  drizzle_options(drizzle, DRIZZLE_SET_CHARSET_DIR, (char *) charsets_dir);
 
3170
 
3157
3171
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3158
3172
         (reconnect ? drizzle_reconnect(drizzle) != 0 :
3159
3173
          drizzle_connect(drizzle, mi->host, mi->user, mi->password, 0,
3203
3217
    else
3204
3218
    {
3205
3219
      change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
 
3220
      general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d",
 
3221
                        mi->user, mi->host, mi->port);
3206
3222
    }
 
3223
#ifdef SIGNAL_WITH_VIO_CLOSE
 
3224
    thd->set_active_vio(drizzle->net.vio);
 
3225
#endif
3207
3226
  }
3208
3227
  drizzle->reconnect= 1;
3209
3228
  return(slave_was_killed);
3265
3284
  char buff[FN_REFLEN*2+22*2+4], *pos;
3266
3285
 
3267
3286
  my_b_seek(file, 0L);
3268
 
  pos=my_stpcpy(buff, rli->group_relay_log_name);
 
3287
  pos=strmov(buff, rli->group_relay_log_name);
3269
3288
  *pos++='\n';
3270
3289
  pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
3271
3290
  *pos++='\n';
3272
 
  pos=my_stpcpy(pos, rli->group_master_log_name);
 
3291
  pos=strmov(pos, rli->group_master_log_name);
3273
3292
  *pos++='\n';
3274
3293
  pos=int64_t2str(rli->group_master_log_pos, pos, 10);
3275
3294
  *pos='\n';
3276
 
  if (my_b_write(file, (unsigned char*) buff, (size_t) (pos-buff)+1))
 
3295
  if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
3277
3296
    error=1;
3278
3297
  if (flush_io_cache(file))
3279
3298
    error=1;
3301
3320
    relay_log_pos       Current log pos
3302
3321
    pending             Number of bytes already processed from the event
3303
3322
  */
3304
 
  rli->event_relay_log_pos= cmax(rli->event_relay_log_pos, (uint64_t)BIN_LOG_HEADER_SIZE);
 
3323
  rli->event_relay_log_pos= max(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE);
3305
3324
  my_b_seek(cur_log,rli->event_relay_log_pos);
3306
3325
  return(cur_log);
3307
3326
}
3382
3401
      When the relay log is created when the I/O thread starts, easy: the
3383
3402
      master will send the description event and we will queue it.
3384
3403
      But if the relay log is created by new_file(): then the solution is:
3385
 
      DRIZZLE_BIN_LOG::open() will write the buffered description event.
 
3404
      MYSQL_BIN_LOG::open() will write the buffered description event.
3386
3405
    */
3387
3406
    if ((ev=Log_event::read_log_event(cur_log,0,
3388
3407
                                      rli->relay_log.description_event_for_exec)))
3651
3670
  Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
3652
3671
  because of size is simpler because when we do it we already have all relevant
3653
3672
  locks; here we don't, so this function is mainly taking locks).
3654
 
  Returns nothing as we cannot catch any error (DRIZZLE_BIN_LOG::new_file()
 
3673
  Returns nothing as we cannot catch any error (MYSQL_BIN_LOG::new_file()
3655
3674
  is void).
3656
3675
*/
3657
3676
 
3706
3725
{
3707
3726
  struct st_version_range_for_one_bug {
3708
3727
    uint32_t        bug_id;
3709
 
    const unsigned char introduced_in[3]; // first version with bug
3710
 
    const unsigned char fixed_in[3];      // first version with fix
 
3728
    const uchar introduced_in[3]; // first version with bug
 
3729
    const uchar fixed_in[3];      // first version with fix
3711
3730
  };
3712
3731
  static struct st_version_range_for_one_bug versions_for_all_bugs[]=
3713
3732
  {
3716
3735
    {33029, { 5, 0,  0 }, { 5, 0, 58 } },
3717
3736
    {33029, { 5, 1,  0 }, { 5, 1, 12 } },
3718
3737
  };
3719
 
  const unsigned char *master_ver=
 
3738
  const uchar *master_ver=
3720
3739
    rli->relay_log.description_event_for_exec->server_version_split;
3721
3740
 
3722
3741
  assert(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
3724
3743
  for (uint32_t i= 0;
3725
3744
       i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
3726
3745
  {
3727
 
    const unsigned char *introduced_in= versions_for_all_bugs[i].introduced_in,
 
3746
    const uchar *introduced_in= versions_for_all_bugs[i].introduced_in,
3728
3747
      *fixed_in= versions_for_all_bugs[i].fixed_in;
3729
3748
    if ((versions_for_all_bugs[i].bug_id == bug_id) &&
3730
3749
        (memcmp(introduced_in, master_ver, 3) <= 0) &&
3793
3812
/**
3794
3813
  @} (end of group Replication)
3795
3814
*/
 
3815
 
 
3816
#endif /* HAVE_REPLICATION */