~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/slave.cc

  • Committer: Brian Aker
  • Date: 2008-10-08 02:16:25 UTC
  • Revision ID: brian@tangent.org-20081008021625-3756823nqs1phvsn
More effort around master.info (and relay.info)

Show diffs side-by-side

added added

removed removed

Lines of Context:
239
239
    goto err;
240
240
  }
241
241
 
242
 
  if (init_master_info(active_mi,master_info_file,relay_log_info_file,
243
 
                       1, (SLAVE_IO | SLAVE_SQL)))
 
242
  if (active_mi->init_master_info(master_info_file, relay_log_info_file, 1, (SLAVE_IO | SLAVE_SQL)))
244
243
  {
245
244
    sql_print_error(_("Failed to initialize the master info structure"));
246
245
    goto err;
585
584
      once multi-master code is ready.
586
585
    */
587
586
    terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
588
 
    end_master_info(active_mi);
 
587
    active_mi->end_master_info();
589
588
    delete active_mi;
590
589
    active_mi= 0;
591
590
  }
1084
1083
                     " to the relay log, SHOW SLAVE STATUS may be"
1085
1084
                     " inaccurate"));
1086
1085
      rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1087
 
      if (flush_master_info(mi, 1))
 
1086
      if (mi->flush_master_info(1))
1088
1087
        sql_print_error(_("Failed to flush master info file"));
1089
1088
      delete ev;
1090
1089
    }
1221
1220
 
1222
1221
    pthread_mutex_lock(&mi->data_lock);
1223
1222
    pthread_mutex_lock(&mi->rli.data_lock);
1224
 
    protocol->store(mi->host, &my_charset_bin);
1225
 
    protocol->store(mi->user, &my_charset_bin);
1226
 
    protocol->store((uint32_t) mi->port);
1227
 
    protocol->store((uint32_t) mi->connect_retry);
1228
 
    protocol->store(mi->master_log_name, &my_charset_bin);
 
1223
    protocol->store(mi->getHostname(), &my_charset_bin);
 
1224
    protocol->store(mi->getUsername(), &my_charset_bin);
 
1225
    protocol->store((uint32_t) mi->getPort());
 
1226
    protocol->store(mi->getConnectionRetry());
 
1227
    protocol->store(mi->getLogName(), &my_charset_bin);
1229
1228
    protocol->store((uint64_t) mi->master_log_pos);
1230
 
    protocol->store(mi->rli.group_relay_log_name +
1231
 
                    dirname_length(mi->rli.group_relay_log_name),
 
1229
    protocol->store(mi->rli.group_relay_log_name.c_str() +
 
1230
                    dirname_length(mi->rli.group_relay_log_name.c_str()),
1232
1231
                    &my_charset_bin);
1233
1232
    protocol->store((uint64_t) mi->rli.group_relay_log_pos);
1234
 
    protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
 
1233
    protocol->store(mi->rli.group_master_log_name.c_str(), &my_charset_bin);
1235
1234
    protocol->store(mi->slave_running == DRIZZLE_SLAVE_RUN_CONNECT ?
1236
1235
                    "Yes" : "No", &my_charset_bin);
1237
1236
    protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1421
1420
  unsigned char buf[FN_REFLEN + 10];
1422
1421
  int32_t len;
1423
1422
  int32_t binlog_flags = 0; // for now
1424
 
  char* logname = mi->master_log_name;
 
1423
  const char* logname = mi->getLogName();
1425
1424
  
1426
1425
  *suppress_warnings= false;
1427
1426
 
1660
1659
                  " of the relay log information: the slave may"
1661
1660
                  " be in an inconsistent state."
1662
1661
                  " Stopped in %s position %s"),
1663
 
                  rli->group_relay_log_name,
 
1662
                  rli->group_relay_log_name.c_str(),
1664
1663
                  llstr(rli->group_relay_log_pos, buf));
1665
1664
      return(2);
1666
1665
    }
1784
1783
        */
1785
1784
        if (rli->trans_retries < slave_trans_retries)
1786
1785
        {
1787
 
          if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
 
1786
          if (rli->mi->init_master_info(0, 0, 0, SLAVE_SQL))
1788
1787
            sql_print_error(_("Failed to initialize the master info structure"));
1789
1788
          else if (init_relay_log_pos(rli,
1790
 
                                      rli->group_relay_log_name,
 
1789
                                      rli->group_relay_log_name.c_str(),
1791
1790
                                      rli->group_relay_log_pos,
1792
1791
                                      1, &errmsg, 1))
1793
1792
            sql_print_error(_("Error initializing relay log position: %s"),
1974
1973
  {
1975
1974
    sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
1976
1975
                            "replication started in log '%s' at position %s"),
1977
 
                          mi->user, mi->host, mi->port,
 
1976
                          mi->getUsername(), mi->getHostname(), mi->port,
1978
1977
                          IO_RPL_LOG_NAME,
1979
1978
                          llstr(mi->master_log_pos,llbuff));
1980
1979
  /*
2111
2110
      {
2112
2111
        goto err;
2113
2112
      }
2114
 
      if (flush_master_info(mi, 1))
 
2113
      if (mi->flush_master_info(1))
2115
2114
      {
2116
2115
        sql_print_error(_("Failed to flush master info file"));
2117
2116
        goto err;
2267
2266
  rli->trans_retries= 0; // start from "no error"
2268
2267
 
2269
2268
  if (init_relay_log_pos(rli,
2270
 
                         rli->group_relay_log_name,
 
2269
                         rli->group_relay_log_name.c_str(),
2271
2270
                         rli->group_relay_log_pos,
2272
2271
                         1 /*need data lock*/, &errmsg,
2273
2272
                         1 /*look for a description_event*/))
2299
2298
                            "position %s, relay log '%s' position: %s"),
2300
2299
                            RPL_LOG_NAME,
2301
2300
                          llstr(rli->group_master_log_pos,llbuff),
2302
 
                          rli->group_relay_log_name,
 
2301
                          rli->group_relay_log_name.c_str(),
2303
2302
                          llstr(rli->group_relay_log_pos,llbuff1));
2304
2303
 
2305
2304
  /* execute init_slave variable */
2604
2603
    return(1);
2605
2604
 
2606
2605
  /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
2607
 
  memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
2608
 
  mi->master_log_pos= rev->pos;
 
2606
  mi->setLogName(rev->new_log_ident);
 
2607
  mi->setLogPosition(rev->pos);
2609
2608
  /*
2610
2609
    If we do not do this, we will be getting the first
2611
2610
    rotate event forever, so we need to not disconnect after one.
2952
2951
 
2953
2952
       TODO: handling `when' for SHOW SLAVE STATUS' snds behind
2954
2953
    */
2955
 
    if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
2956
 
         && mi->master_log_name != NULL)
 
2954
    if ((mi->setLogName(hb.get_log_ident()) && mi->getLogName() != NULL)
2957
2955
        || mi->master_log_pos != hb.log_pos)
2958
2956
    {
2959
2957
      /* missed events of heartbeat from the past */
3012
3010
        buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3013
3011
    {
3014
3012
      mi->master_log_pos+= inc_pos;
3015
 
      memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
 
3013
      memcpy(rli->ign_master_log_name_end, mi->getLogName(), FN_REFLEN);
3016
3014
      assert(rli->ign_master_log_name_end[0]);
3017
 
      rli->ign_master_log_pos_end= mi->master_log_pos;
 
3015
      rli->ign_master_log_pos_end= mi->getLogPosition();
3018
3016
    }
3019
3017
    rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3020
3018
  }
3122
3120
 
3123
3121
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3124
3122
         (reconnect ? drizzle_reconnect(drizzle) != 0 :
3125
 
          drizzle_connect(drizzle, mi->host, mi->user, mi->password, 0,
3126
 
                             mi->port, 0, client_flag) == 0))
 
3123
          drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
 
3124
                             mi->getPort(), 0, client_flag) == 0))
3127
3125
  {
3128
3126
    /* Don't repeat last error */
3129
3127
    if ((int32_t)drizzle_errno(drizzle) != last_errno)
3134
3132
                 _("error %s to master '%s@%s:%d'"
3135
3133
                   " - retry-time: %d  retries: %u"),
3136
3134
                 (reconnect ? _("reconnecting") : _("connecting")),
3137
 
                 mi->user, mi->host, mi->port,
3138
 
                 mi->connect_retry, master_retry_count);
 
3135
                 mi->getUsername(), mi->getHostname(), mi->getPort(),
 
3136
                 mi->getConnectionRetry(), master_retry_count);
3139
3137
    }
3140
3138
    /*
3141
3139
      By default we try forever. The reason is that failure will trigger
3159
3157
      if (!suppress_warnings && global_system_variables.log_warnings)
3160
3158
        sql_print_information(_("Slave: connected to master '%s@%s:%d', "
3161
3159
                                "replication resumed in log '%s' at "
3162
 
                                "position %s"), mi->user,
3163
 
                                mi->host, mi->port,
 
3160
                                "position %s"), mi->getUsername(),
 
3161
                                mi->getHostname(), mi->getPort(),
3164
3162
                                IO_RPL_LOG_NAME,
3165
 
                                llstr(mi->master_log_pos,llbuff));
 
3163
                                llstr(mi->getLogPosition(),llbuff));
3166
3164
    }
3167
3165
  }
3168
3166
  drizzle->reconnect= 1;
3216
3214
 
3217
3215
bool flush_relay_log_info(Relay_log_info* rli)
3218
3216
{
3219
 
  bool error=0;
 
3217
  bool error= 0;
3220
3218
 
3221
3219
  if (unlikely(rli->no_storage))
3222
3220
    return(0);
3223
3221
 
3224
 
  IO_CACHE *file = &rli->info_file;
3225
 
  char buff[FN_REFLEN*2+22*2+4], *pos;
3226
 
 
3227
 
  my_b_seek(file, 0L);
3228
 
  pos=my_stpcpy(buff, rli->group_relay_log_name);
3229
 
  *pos++='\n';
3230
 
  pos=int64_t2str(rli->group_relay_log_pos, pos, 10);
3231
 
  *pos++='\n';
3232
 
  pos=my_stpcpy(pos, rli->group_master_log_name);
3233
 
  *pos++='\n';
3234
 
  pos=int64_t2str(rli->group_master_log_pos, pos, 10);
3235
 
  *pos='\n';
3236
 
  if (my_b_write(file, (unsigned char*) buff, (size_t) (pos-buff)+1))
3237
 
    error=1;
3238
 
  if (flush_io_cache(file))
3239
 
    error=1;
3240
 
 
3241
 
  /* Flushing the relay log is done by the slave I/O thread */
3242
3222
  return(error);
3243
3223
}
3244
3224
 
3253
3233
  assert(rli->cur_log_fd == -1);
3254
3234
 
3255
3235
  IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
3256
 
  if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
3257
 
                                   errmsg)) <0)
 
3236
  if ((rli->cur_log_fd=open_binlog(cur_log, rli->event_relay_log_name.c_str(), errmsg)) < 0)
3258
3237
    return(0);
3259
3238
  /*
3260
3239
    We want to start exactly where we was before:
3496
3475
        if (rli->relay_log.purge_first_log
3497
3476
            (rli,
3498
3477
             rli->group_relay_log_pos == rli->event_relay_log_pos
3499
 
             && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
 
3478
             && !strcmp(rli->group_relay_log_name.c_str(), rli->event_relay_log_name.c_str())))
3500
3479
        {
3501
3480
          errmsg = "Error purging processed logs";
3502
3481
          goto err;
3517
3496
          goto err;
3518
3497
        }
3519
3498
        rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
3520
 
        strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
3521
 
                sizeof(rli->event_relay_log_name)-1);
 
3499
        rli->event_relay_log_name.assign(rli->linfo.log_file_name);
3522
3500
        flush_relay_log_info(rli);
3523
3501
      }
3524
3502