~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/slave.cc

  • Committer: Monty Taylor
  • Date: 2008-10-13 09:29:43 UTC
  • mfrom: (509 drizzle)
  • mto: (509.1.4 codestyle)
  • mto: This revision was merged to the branch mainline in revision 511.
  • Revision ID: monty@inaugust.com-20081013092943-rwvx4a6d85b5l2dh
MergedĀ inĀ trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
239
239
    goto err;
240
240
  }
241
241
 
242
 
  if (init_master_info(active_mi,master_info_file,relay_log_info_file,
243
 
                       1, (SLAVE_IO | SLAVE_SQL)))
 
242
  if (active_mi->init_master_info(master_info_file, relay_log_info_file, (SLAVE_IO | SLAVE_SQL)))
244
243
  {
245
244
    sql_print_error(_("Failed to initialize the master info structure"));
246
245
    goto err;
585
584
      once multi-master code is ready.
586
585
    */
587
586
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
588
 
    end_master_info(active_mi);
 
587
    active_mi->end_master_info();
589
588
    delete active_mi;
590
589
    active_mi= 0;
591
590
  }
1084
1083
                     " to the relay log, SHOW SLAVE STATUS may be"
1085
1084
                     " inaccurate"));
1086
1085
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1087
 
      if (flush_master_info(mi, 1))
 
1086
      if (mi->flush())
1088
1087
        sql_print_error(_("Failed to flush master info file"));
1089
1088
      delete ev;
1090
1089
    }
1124
1123
  pos= net_store_data(pos, (unsigned char*) report_user, report_user_len);
1125
1124
  pos= net_store_data(pos, (unsigned char*) report_password, report_password_len);
1126
1125
  int2store(pos, (uint16_t) report_port); pos+= 2;
1127
 
  int4store(pos, rpl_recovery_rank);    pos+= 4;
 
1126
  int4store(pos, 0);    pos+= 4;
1128
1127
  /* The master will fill in master_id */
1129
1128
  int4store(pos, 0);                    pos+= 4;
1130
1129
 
1221
1220
 
1222
1221
    pthread_mutex_lock(&mi->data_lock);
1223
1222
    pthread_mutex_lock(&mi->rli.data_lock);
1224
 
    protocol->store(mi->host, &my_charset_bin);
1225
 
    protocol->store(mi->user, &my_charset_bin);
1226
 
    protocol->store((uint32_t) mi->port);
1227
 
    protocol->store((uint32_t) mi->connect_retry);
1228
 
    protocol->store(mi->master_log_name, &my_charset_bin);
1229
 
    protocol->store((uint64_t) mi->master_log_pos);
1230
 
    protocol->store(mi->rli.group_relay_log_name +
1231
 
                    dirname_length(mi->rli.group_relay_log_name),
 
1223
    protocol->store(mi->getHostname(), &my_charset_bin);
 
1224
    protocol->store(mi->getUsername(), &my_charset_bin);
 
1225
    protocol->store((uint32_t) mi->getPort());
 
1226
    protocol->store(mi->getConnectionRetry());
 
1227
    protocol->store(mi->getLogName(), &my_charset_bin);
 
1228
    protocol->store((uint64_t) mi->getLogPosition());
 
1229
    protocol->store(mi->rli.group_relay_log_name.c_str() +
 
1230
                    dirname_length(mi->rli.group_relay_log_name.c_str()),
1232
1231
                    &my_charset_bin);
1233
1232
    protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1234
 
    protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
 
1233
    protocol->store(mi->rli.group_master_log_name.c_str(), &my_charset_bin);
1235
1234
    protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1236
1235
                    "Yes" : "No", &my_charset_bin);
1237
1236
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1359
1358
  thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1360
1359
    + MAX_LOG_EVENT_HEADER;  /* note, incr over the global not session var */
1361
1360
  thd->slave_thread = 1;
1362
 
  thd->enable_slow_log= opt_log_slow_slave_statements;
1363
1361
  set_slave_thread_options(thd);
1364
1362
  thd->client_capabilities = CLIENT_LOCAL_FILES;
1365
1363
  pthread_mutex_lock(&LOCK_thread_count);
1376
1374
  lex_start(thd);
1377
1375
 
1378
1376
  if (thd_type == SLAVE_THD_SQL)
1379
 
    thd_proc_info(thd, "Waiting for the next event in relay log");
 
1377
    thd->set_proc_info("Waiting for the next event in relay log");
1380
1378
  else
1381
 
    thd_proc_info(thd, "Waiting for master update");
 
1379
    thd->set_proc_info("Waiting for master update");
1382
1380
  thd->version=refresh_version;
1383
1381
  thd->set_time();
1384
1382
  return(0);
1421
1419
  unsigned char buf[FN_REFLEN + 10];
1422
1420
  int32_t len;
1423
1421
  int32_t binlog_flags = 0; // for now
1424
 
  char* logname = mi->master_log_name;
 
1422
  const char* logname = mi->getLogName();
1425
1423
  
1426
1424
  *suppress_warnings= false;
1427
1425
 
1428
1426
  // TODO if big log files: Change next to int8store()
1429
 
  int4store(buf, (uint32_t) mi->master_log_pos);
 
1427
  int4store(buf, (uint32_t) mi->getLogPosition());
1430
1428
  int2store(buf + 4, binlog_flags);
1431
1429
  int4store(buf + 6, server_id);
1432
1430
  len = (uint32_t) strlen(logname);
1660
1658
                  " of the relay log information: the slave may"
1661
1659
                  " be in an inconsistent state."
1662
1660
                  " Stopped in %s position %s"),
1663
 
                  rli->group_relay_log_name,
 
1661
                  rli->group_relay_log_name.c_str(),
1664
1662
                  llstr(rli->group_relay_log_pos, buf));
1665
1663
      return(2);
1666
1664
    }
1784
1782
        */
1785
1783
        if (rli->trans_retries < slave_trans_retries)
1786
1784
        {
1787
 
          if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
 
1785
          if (rli->mi->init_master_info(0, 0, SLAVE_SQL))
1788
1786
            sql_print_error(_("Failed to initialize the master info structure"));
1789
1787
          else if (init_relay_log_pos(rli,
1790
 
                                      rli->group_relay_log_name,
 
1788
                                      rli->group_relay_log_name.c_str(),
1791
1789
                                      rli->group_relay_log_pos,
1792
1790
                                      1, &errmsg, 1))
1793
1791
            sql_print_error(_("Error initializing relay log position: %s"),
1890
1888
  {
1891
1889
    char buf[256], llbuff[22];
1892
1890
    snprintf(buf, sizeof(buf), _(messages[SLAVE_RECON_MSG_FAILED]),
1893
 
             IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
 
1891
             IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
1894
1892
    /*
1895
1893
      Raise a warining during registering on master/requesting dump.
1896
1894
      Log a message reading event.
1968
1966
    goto err;
1969
1967
  }
1970
1968
 
1971
 
  thd_proc_info(thd, "Connecting to master");
 
1969
  thd->set_proc_info("Connecting to master");
1972
1970
  // we can get killed during safe_connect
1973
1971
  if (!safe_connect(thd, drizzle, mi))
1974
1972
  {
1975
1973
    sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
1976
1974
                            "replication started in log '%s' at position %s"),
1977
 
                          mi->user, mi->host, mi->port,
 
1975
                          mi->getUsername(), mi->getHostname(), mi->getPort(),
1978
1976
                          IO_RPL_LOG_NAME,
1979
 
                          llstr(mi->master_log_pos,llbuff));
 
1977
                          llstr(mi->getLogPosition(), llbuff));
1980
1978
  /*
1981
1979
    Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
1982
1980
    thread, since a replication event can become this much larger than
1995
1993
  // TODO: the assignment below should be under mutex (5.0)
1996
1994
  mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
1997
1995
  thd->slave_net = &drizzle->net;
1998
 
  thd_proc_info(thd, "Checking master version");
 
1996
  thd->set_proc_info("Checking master version");
1999
1997
  if (get_master_version_and_clock(drizzle, mi))
2000
1998
    goto err;
2001
1999
  
2004
2002
    /*
2005
2003
      Register ourselves with the master.
2006
2004
    */
2007
 
    thd_proc_info(thd, "Registering slave on master");
 
2005
    thd->set_proc_info("Registering slave on master");
2008
2006
    if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2009
2007
    {
2010
2008
      if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
2032
2030
 
2033
2031
  while (!io_slave_killed(thd,mi))
2034
2032
  {
2035
 
    thd_proc_info(thd, "Requesting binlog dump");
 
2033
    thd->set_proc_info("Requesting binlog dump");
2036
2034
    if (request_dump(drizzle, mi, &suppress_warnings))
2037
2035
    {
2038
2036
      sql_print_error(_("Failed on request_dump()"));
2062
2060
        important thing is to not confuse users by saying "reading" whereas
2063
2061
        we're in fact receiving nothing.
2064
2062
      */
2065
 
      thd_proc_info(thd, _("Waiting for master to send event"));
 
2063
      thd->set_proc_info(_("Waiting for master to send event"));
2066
2064
      event_len= read_event(drizzle, mi, &suppress_warnings);
2067
2065
      if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while "
2068
2066
                                           "reading event")))
2106
2104
      } // if (event_len == packet_error)
2107
2105
 
2108
2106
      retry_count=0;                    // ok event, reset retry counter
2109
 
      thd_proc_info(thd, _("Queueing master event to the relay log"));
 
2107
      thd->set_proc_info(_("Queueing master event to the relay log"));
2110
2108
      if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2111
2109
      {
2112
2110
        goto err;
2113
2111
      }
2114
 
      if (flush_master_info(mi, 1))
 
2112
      if (mi->flush())
2115
2113
      {
2116
2114
        sql_print_error(_("Failed to flush master info file"));
2117
2115
        goto err;
2145
2143
// print the current replication position
2146
2144
  sql_print_information(_("Slave I/O thread exiting, read up to log '%s', "
2147
2145
                          "position %s"),
2148
 
                        IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
 
2146
                        IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
2149
2147
  pthread_mutex_lock(&LOCK_thread_count);
2150
2148
  thd->query = thd->db = 0; // extra safety
2151
2149
  thd->query_length= thd->db_length= 0;
2164
2162
    mi->drizzle=0;
2165
2163
  }
2166
2164
  write_ignored_events_info_to_relay_log(thd, mi);
2167
 
  thd_proc_info(thd, _("Waiting for slave mutex on exit"));
 
2165
  thd->set_proc_info(_("Waiting for slave mutex on exit"));
2168
2166
  pthread_mutex_lock(&mi->run_lock);
2169
2167
 
2170
2168
  /* Forget the relay log's format */
2267
2265
  rli->trans_retries= 0; // start from "no error"
2268
2266
 
2269
2267
  if (init_relay_log_pos(rli,
2270
 
                         rli->group_relay_log_name,
 
2268
                         rli->group_relay_log_name.c_str(),
2271
2269
                         rli->group_relay_log_pos,
2272
2270
                         1 /*need data lock*/, &errmsg,
2273
2271
                         1 /*look for a description_event*/))
2299
2297
                            "position %s, relay log '%s' position: %s"),
2300
2298
                            RPL_LOG_NAME,
2301
2299
                          llstr(rli->group_master_log_pos,llbuff),
2302
 
                          rli->group_relay_log_name,
 
2300
                          rli->group_relay_log_name.c_str(),
2303
2301
                          llstr(rli->group_relay_log_pos,llbuff1));
2304
2302
 
2305
2303
  /* execute init_slave variable */
2334
2332
 
2335
2333
  while (!sql_slave_killed(thd,rli))
2336
2334
  {
2337
 
    thd_proc_info(thd, _("Reading event from the relay log"));
 
2335
    thd->set_proc_info(_("Reading event from the relay log"));
2338
2336
    assert(rli->sql_thd == thd);
2339
2337
    THD_CHECK_SENTRY(thd);
2340
2338
    if (exec_relay_log_event(thd,rli))
2422
2420
  thd->query= thd->db= thd->catalog= 0;
2423
2421
  thd->query_length= thd->db_length= 0;
2424
2422
  pthread_mutex_unlock(&LOCK_thread_count);
2425
 
  thd_proc_info(thd, "Waiting for slave mutex on exit");
 
2423
  thd->set_proc_info("Waiting for slave mutex on exit");
2426
2424
  pthread_mutex_lock(&rli->run_lock);
2427
2425
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
2428
2426
  pthread_mutex_lock(&rli->data_lock);
2604
2602
    return(1);
2605
2603
 
2606
2604
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2607
 
  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
2608
 
  mi->master_log_pos= rev->pos;
 
2605
  mi->setLogName(rev->new_log_ident);
 
2606
  mi->setLogPosition(rev->pos);
2609
2607
  /*
2610
2608
    If we do not do this, we will be getting the first
2611
2609
    rotate event forever, so we need to not disconnect after one.
2691
2689
  }
2692
2690
 
2693
2691
  pthread_mutex_lock(&mi->data_lock);
2694
 
  ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
 
2692
  ev->log_pos= mi->getLogPosition(); /* 3.23 events don't contain log_pos */
2695
2693
  switch (ev->get_type_code()) {
2696
2694
  case STOP_EVENT:
2697
2695
    ignore_event= 1;
2720
2718
    ev->log_pos+= inc_pos;
2721
2719
    int32_t error = process_io_create_file(mi,(Create_file_log_event*)ev);
2722
2720
    delete ev;
2723
 
    mi->master_log_pos += inc_pos;
 
2721
    mi->incrementLogPosition(inc_pos);
2724
2722
    pthread_mutex_unlock(&mi->data_lock);
2725
2723
    free((char*)tmp_buf);
2726
2724
    return(error);
2746
2744
    rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2747
2745
  }
2748
2746
  delete ev;
2749
 
  mi->master_log_pos+= inc_pos;
 
2747
  mi->incrementLogPosition(inc_pos);
2750
2748
  pthread_mutex_unlock(&mi->data_lock);
2751
2749
  return(0);
2752
2750
}
2800
2798
  }
2801
2799
  rli->relay_log.harvest_bytes_written(&rli->log_space_total);
2802
2800
  delete ev;
2803
 
  mi->master_log_pos+= inc_pos;
 
2801
  mi->incrementLogPosition(inc_pos);
2804
2802
err:
2805
2803
  pthread_mutex_unlock(&mi->data_lock);
2806
2804
  return(0);
2952
2950
 
2953
2951
       TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2954
2952
    */
2955
 
    if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
2956
 
         && mi->master_log_name != NULL)
2957
 
        || mi->master_log_pos != hb.log_pos)
 
2953
    if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
 
2954
        || mi->getLogPosition() != hb.log_pos)
2958
2955
    {
2959
2956
      /* missed events of heartbeat from the past */
2960
2957
      error= ER_SLAVE_HEARTBEAT_FAILURE;
3011
3008
        buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3012
3009
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3013
3010
    {
3014
 
      mi->master_log_pos+= inc_pos;
3015
 
      memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
 
3011
      mi->incrementLogPosition(inc_pos);
 
3012
      memcpy(rli->ign_master_log_name_end, mi->getLogName(), FN_REFLEN);
3016
3013
      assert(rli->ign_master_log_name_end[0]);
3017
 
      rli->ign_master_log_pos_end= mi->master_log_pos;
 
3014
      rli->ign_master_log_pos_end= mi->getLogPosition();
3018
3015
    }
3019
3016
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3020
3017
  }
3023
3020
    /* write the event to the relay log */
3024
3021
    if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3025
3022
    {
3026
 
      mi->master_log_pos+= inc_pos;
 
3023
      mi->incrementLogPosition(inc_pos);
3027
3024
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3028
3025
    }
3029
3026
    else
3122
3119
 
3123
3120
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3124
3121
         (reconnect ? drizzle_reconnect(drizzle) != 0 :
3125
 
          drizzle_connect(drizzle, mi->host, mi->user, mi->password, 0,
3126
 
                             mi->port, 0, client_flag) == 0))
 
3122
          drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
 
3123
                             mi->getPort(), 0, client_flag) == 0))
3127
3124
  {
3128
3125
    /* Don't repeat last error */
3129
3126
    if ((int32_t)drizzle_errno(drizzle) != last_errno)
3134
3131
                 _("error %s to master '%s@%s:%d'"
3135
3132
                   " - retry-time: %d  retries: %u"),
3136
3133
                 (reconnect ? _("reconnecting") : _("connecting")),
3137
 
                 mi->user, mi->host, mi->port,
3138
 
                 mi->connect_retry, master_retry_count);
 
3134
                 mi->getUsername(), mi->getHostname(), mi->getPort(),
 
3135
                 mi->getConnectionRetry(), master_retry_count);
3139
3136
    }
3140
3137
    /*
3141
3138
      By default we try forever. The reason is that failure will trigger
3159
3156
      if (!suppress_warnings && global_system_variables.log_warnings)
3160
3157
        sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3161
3158
                                "replication resumed in log '%s' at "
3162
 
                                "position %s"), mi->user,
3163
 
                                mi->host, mi->port,
 
3159
                                "position %s"), mi->getUsername(),
 
3160
                                mi->getHostname(), mi->getPort(),
3164
3161
                                IO_RPL_LOG_NAME,
3165
 
                                llstr(mi->master_log_pos,llbuff));
 
3162
                                llstr(mi->getLogPosition(),llbuff));
3166
3163
    }
3167
3164
  }
3168
3165
  drizzle->reconnect= 1;
3216
3213
 
3217
3214
bool flush_relay_log_info(Relay_log_info* rli)
3218
3215
{
3219
 
  bool error=0;
 
3216
  bool error= 0;
3220
3217
 
3221
3218
  if (unlikely(rli->no_storage))
3222
3219
    return(0);
3223
3220
 
3224
 
  IO_CACHE *file = &rli->info_file;
3225
 
  char buff[FN_REFLEN*2+22*2+4], *pos;
3226
 
 
3227
 
  my_b_seek(file, 0L);
3228
 
  pos=my_stpcpy(buff, rli->group_relay_log_name);
3229
 
  *pos++='\n';
3230
 
  pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
3231
 
  *pos++='\n';
3232
 
  pos=my_stpcpy(pos, rli->group_master_log_name);
3233
 
  *pos++='\n';
3234
 
  pos=int64_t2str(rli->group_master_log_pos, pos, 10);
3235
 
  *pos='\n';
3236
 
  if (my_b_write(file, (unsigned char*) buff, (size_t) (pos-buff)+1))
3237
 
    error=1;
3238
 
  if (flush_io_cache(file))
3239
 
    error=1;
3240
 
 
3241
 
  /* Flushing the relay log is done by the slave I/O thread */
3242
3221
  return(error);
3243
3222
}
3244
3223
 
3253
3232
  assert(rli->cur_log_fd == -1);
3254
3233
 
3255
3234
  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3256
 
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
3257
 
                                   errmsg)) <0)
 
3235
  if ((rli->cur_log_fd=open_binlog(cur_log, rli->event_relay_log_name.c_str(), errmsg)) < 0)
3258
3236
    return(0);
3259
3237
  /*
3260
3238
    We want to start exactly where we was before:
3496
3474
        if (rli->relay_log.purge_first_log
3497
3475
            (rli,
3498
3476
             rli->group_relay_log_pos == rli->event_relay_log_pos
3499
 
             && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
 
3477
             && !strcmp(rli->group_relay_log_name.c_str(), rli->event_relay_log_name.c_str())))
3500
3478
        {
3501
3479
          errmsg = "Error purging processed logs";
3502
3480
          goto err;
3517
3495
          goto err;
3518
3496
        }
3519
3497
        rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3520
 
        strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
3521
 
                sizeof(rli->event_relay_log_name)-1);
 
3498
        rli->event_relay_log_name.assign(rli->linfo.log_file_name);
3522
3499
        flush_relay_log_info(rli);
3523
3500
      }
3524
3501