~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_repl.cc

  • Committer: Monty Taylor
  • Date: 2008-10-13 09:29:43 UTC
  • mfrom: (509 drizzle)
  • mto: (509.1.4 codestyle)
  • mto: This revision was merged to the branch mainline in revision 511.
  • Revision ID: monty@inaugust.com-20081013092943-rwvx4a6d85b5l2dh
MergedĀ inĀ trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
718
718
 
719
719
        if (read_packet)
720
720
        {
721
 
          thd_proc_info(thd, "Sending binlog event to slave");
 
721
          thd->set_proc_info("Sending binlog event to slave");
722
722
          if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) )
723
723
          {
724
724
            errmsg = "Failed on my_net_write()";
756
756
      bool loop_breaker = 0;
757
757
      /* need this to break out of the for loop from switch */
758
758
 
759
 
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
 
759
      thd->set_proc_info("Finished reading one binlog; switching to next binlog");
760
760
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
761
761
      case LOG_INFO_EOF:
762
762
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
804
804
  (void)my_close(file, MYF(MY_WME));
805
805
 
806
806
  my_eof(thd);
807
 
  thd_proc_info(thd, "Waiting to finalize termination");
 
807
  thd->set_proc_info("Waiting to finalize termination");
808
808
  pthread_mutex_lock(&LOCK_thread_count);
809
809
  thd->current_linfo = 0;
810
810
  pthread_mutex_unlock(&LOCK_thread_count);
811
811
  return;
812
812
 
813
813
err:
814
 
  thd_proc_info(thd, "Waiting to finalize termination");
 
814
  thd->set_proc_info("Waiting to finalize termination");
815
815
  end_io_cache(&log);
816
816
  /*
817
817
    Exclude  iteration through thread list
848
848
    thread_mask&= thd->lex->slave_thd_opt;
849
849
  if (thread_mask) //some threads are stopped, start them
850
850
  {
851
 
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
852
 
                         thread_mask))
 
851
    if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
853
852
      slave_errno=ER_MASTER_INFO;
854
 
    else if (server_id_supplied && *mi->host)
 
853
    else if (server_id_supplied && *mi->getHostname())
855
854
    {
856
855
      /*
857
856
        If we will start SQL thread we will care about UNTIL options If
958
957
  if (!thd)
959
958
    thd = current_thd;
960
959
 
961
 
  thd_proc_info(thd, "Killing slave");
 
960
  thd->set_proc_info("Killing slave");
962
961
  int thread_mask;
963
962
  lock_slave_threads(mi);
964
963
  // Get a mask of _running_ threads
985
984
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
986
985
  }
987
986
  unlock_slave_threads(mi);
988
 
  thd_proc_info(thd, 0);
 
987
  thd->set_proc_info(0);
989
988
 
990
989
  if (slave_errno)
991
990
  {
1038
1037
    goto err;
1039
1038
 
1040
1039
  /* Clear master's log coordinates */
1041
 
  init_master_log_pos(mi);
 
1040
  mi->reset();
1042
1041
  /*
1043
1042
     Reset errors (the idea is that we forget about the
1044
1043
     old master).
1047
1046
  mi->rli.clear_until_condition();
1048
1047
 
1049
1048
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1050
 
  end_master_info(mi);
 
1049
  mi->end_master_info();
1051
1050
  // and delete these two files
1052
1051
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
1053
1052
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1134
1133
    return(true);
1135
1134
  }
1136
1135
 
1137
 
  thd_proc_info(thd, "Changing master");
 
1136
  thd->set_proc_info("Changing master");
1138
1137
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1139
1138
  // TODO: see if needs re-write
1140
 
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
1141
 
                       thread_mask))
 
1139
  if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
1142
1140
  {
1143
1141
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1144
1142
    unlock_slave_threads(mi);
1157
1155
  */
1158
1156
 
1159
1157
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1160
 
  {
1161
 
    mi->master_log_name[0] = 0;
1162
 
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
1163
 
  }
 
1158
    mi->reset();
1164
1159
 
1165
1160
  if (lex_mi->log_file_name)
1166
 
    strmake(mi->master_log_name, lex_mi->log_file_name,
1167
 
            sizeof(mi->master_log_name)-1);
 
1161
    mi->setLogName(lex_mi->log_file_name);
1168
1162
  if (lex_mi->pos)
1169
1163
  {
1170
 
    mi->master_log_pos= lex_mi->pos;
 
1164
    mi->setLogPosition(lex_mi->pos);
1171
1165
  }
1172
1166
 
1173
1167
  if (lex_mi->host)
1174
 
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
 
1168
    mi->setHost(lex_mi->host, lex_mi->port);
1175
1169
  if (lex_mi->user)
1176
 
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
 
1170
    mi->setUsername(lex_mi->user);
1177
1171
  if (lex_mi->password)
1178
 
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
1179
 
  if (lex_mi->port)
1180
 
    mi->port = lex_mi->port;
 
1172
    mi->setPassword(lex_mi->password);
1181
1173
  if (lex_mi->connect_retry)
1182
1174
    mi->connect_retry = lex_mi->connect_retry;
1183
1175
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1190
1182
  if (lex_mi->relay_log_name)
1191
1183
  {
1192
1184
    need_relay_log_purge= 0;
1193
 
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
1194
 
            sizeof(mi->rli.group_relay_log_name)-1);
1195
 
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
1196
 
            sizeof(mi->rli.event_relay_log_name)-1);
 
1185
    mi->rli.event_relay_log_name.assign(lex_mi->relay_log_name);
1197
1186
  }
1198
1187
 
1199
1188
  if (lex_mi->relay_log_pos)
1227
1216
       of replication is not 100% clear, so we guard against problems using
1228
1217
       cmax().
1229
1218
      */
1230
 
     mi->master_log_pos = ((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1231
 
                           ? BIN_LOG_HEADER_SIZE
1232
 
                           : mi->rli.group_master_log_pos);
1233
 
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
1234
 
             sizeof(mi->master_log_name)-1);
 
1219
     mi->setLogPosition(((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
 
1220
                         ? BIN_LOG_HEADER_SIZE
 
1221
                         : mi->rli.group_master_log_pos));
 
1222
     mi->setLogName(mi->rli.group_master_log_name.c_str());
1235
1223
  }
1236
1224
  /*
1237
1225
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1238
1226
    a slave before).
1239
1227
  */
1240
 
  if (flush_master_info(mi, 0))
 
1228
  if (mi->flush())
1241
1229
  {
1242
1230
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1243
1231
    unlock_slave_threads(mi);
1246
1234
  if (need_relay_log_purge)
1247
1235
  {
1248
1236
    relay_log_purge= 1;
1249
 
    thd_proc_info(thd, "Purging old relay logs");
 
1237
    thd->set_proc_info("Purging old relay logs");
1250
1238
    if (purge_relay_logs(&mi->rli, thd,
1251
1239
                         0 /* not only reset, but also reinit */,
1252
1240
                         &errmsg))
1262
1250
    relay_log_purge= 0;
1263
1251
    /* Relay log is already initialized */
1264
1252
    if (init_relay_log_pos(&mi->rli,
1265
 
                           mi->rli.group_relay_log_name,
 
1253
                           mi->rli.group_relay_log_name.c_str(),
1266
1254
                           mi->rli.group_relay_log_pos,
1267
1255
                           0 /*no data lock*/,
1268
1256
                           &msg, 0))
1282
1270
    ''/0: we have lost all copies of the original good coordinates.
1283
1271
    That's why we always save good coords in rli.
1284
1272
  */
1285
 
  mi->rli.group_master_log_pos= mi->master_log_pos;
1286
 
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
1287
 
          sizeof(mi->rli.group_master_log_name)-1);
 
1273
  mi->rli.group_master_log_pos= mi->getLogPosition();
 
1274
  mi->rli.group_master_log_name.assign(mi->getLogName());
1288
1275
 
1289
 
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
1290
 
    mi->rli.group_master_log_pos=0;
 
1276
  if (mi->rli.group_master_log_name.size() == 0) // uninitialized case
 
1277
    mi->rli.group_master_log_pos= 0;
1291
1278
 
1292
1279
  pthread_mutex_lock(&mi->rli.data_lock);
1293
1280
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1306
1293
  pthread_mutex_unlock(&mi->rli.data_lock);
1307
1294
 
1308
1295
  unlock_slave_threads(mi);
1309
 
  thd_proc_info(thd, 0);
 
1296
  thd->set_proc_info(0);
1310
1297
  my_ok(thd);
1311
1298
  return(false);
1312
1299
}
1340
1327
}
1341
1328
 
1342
1329
 
1343
 
bool mysql_show_binlog_events(THD* thd)
1344
 
{
1345
 
  Protocol *protocol= thd->protocol;
1346
 
  List<Item> field_list;
1347
 
  const char *errmsg= 0;
1348
 
  bool ret= true;
1349
 
  IO_CACHE log;
1350
 
  File file= -1;
1351
 
 
1352
 
  Log_event::init_show_field_list(&field_list);
1353
 
  if (protocol->send_fields(&field_list,
1354
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1355
 
    return(true);
1356
 
 
1357
 
  Format_description_log_event *description_event= new
1358
 
    Format_description_log_event(3); /* MySQL 4.0 by default */
1359
 
 
1360
 
  if (mysql_bin_log.is_open())
1361
 
  {
1362
 
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1363
 
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
1364
 
    ha_rows event_count, limit_start, limit_end;
1365
 
    my_off_t pos = cmax((uint64_t)BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1366
 
    char search_file_name[FN_REFLEN], *name;
1367
 
    const char *log_file_name = lex_mi->log_file_name;
1368
 
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1369
 
    LOG_INFO linfo;
1370
 
    Log_event* ev;
1371
 
 
1372
 
    unit->set_limit(thd->lex->current_select);
1373
 
    limit_start= unit->offset_limit_cnt;
1374
 
    limit_end= unit->select_limit_cnt;
1375
 
 
1376
 
    name= search_file_name;
1377
 
    if (log_file_name)
1378
 
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
1379
 
    else
1380
 
      name=0;                                   // Find first log
1381
 
 
1382
 
    linfo.index_file_offset = 0;
1383
 
 
1384
 
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1385
 
    {
1386
 
      errmsg = "Could not find target log";
1387
 
      goto err;
1388
 
    }
1389
 
 
1390
 
    pthread_mutex_lock(&LOCK_thread_count);
1391
 
    thd->current_linfo = &linfo;
1392
 
    pthread_mutex_unlock(&LOCK_thread_count);
1393
 
 
1394
 
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
1395
 
      goto err;
1396
 
 
1397
 
    /*
1398
 
      to account binlog event header size
1399
 
    */
1400
 
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
1401
 
 
1402
 
    pthread_mutex_lock(log_lock);
1403
 
 
1404
 
    /*
1405
 
      open_binlog() sought to position 4.
1406
 
      Read the first event in case it's a Format_description_log_event, to
1407
 
      know the format. If there's no such event, we are 3.23 or 4.x. This
1408
 
      code, like before, can't read 3.23 binlogs.
1409
 
      This code will fail on a mixed relay log (one which has Format_desc then
1410
 
      Rotate then Format_desc).
1411
 
    */
1412
 
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
1413
 
    if (ev)
1414
 
    {
1415
 
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
1416
 
      {
1417
 
        delete description_event;
1418
 
        description_event= (Format_description_log_event*) ev;
1419
 
      }
1420
 
      else
1421
 
        delete ev;
1422
 
    }
1423
 
 
1424
 
    my_b_seek(&log, pos);
1425
 
 
1426
 
    if (!description_event->is_valid())
1427
 
    {
1428
 
      errmsg="Invalid Format_description event; could be out of memory";
1429
 
      goto err;
1430
 
    }
1431
 
 
1432
 
    for (event_count = 0;
1433
 
         (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
1434
 
                                         description_event)); )
1435
 
    {
1436
 
      if (event_count >= limit_start &&
1437
 
          ev->net_send(protocol, linfo.log_file_name, pos))
1438
 
      {
1439
 
        errmsg = "Net error";
1440
 
        delete ev;
1441
 
        pthread_mutex_unlock(log_lock);
1442
 
        goto err;
1443
 
      }
1444
 
 
1445
 
      pos = my_b_tell(&log);
1446
 
      delete ev;
1447
 
 
1448
 
      if (++event_count >= limit_end)
1449
 
        break;
1450
 
    }
1451
 
 
1452
 
    if (event_count < limit_end && log.error)
1453
 
    {
1454
 
      errmsg = "Wrong offset or I/O error";
1455
 
      pthread_mutex_unlock(log_lock);
1456
 
      goto err;
1457
 
    }
1458
 
 
1459
 
    pthread_mutex_unlock(log_lock);
1460
 
  }
1461
 
 
1462
 
  ret= false;
1463
 
 
1464
 
err:
1465
 
  delete description_event;
1466
 
  if (file >= 0)
1467
 
  {
1468
 
    end_io_cache(&log);
1469
 
    (void) my_close(file, MYF(MY_WME));
1470
 
  }
1471
 
 
1472
 
  if (errmsg)
1473
 
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
1474
 
             "SHOW BINLOG EVENTS", errmsg);
1475
 
  else
1476
 
    my_eof(thd);
1477
 
 
1478
 
  pthread_mutex_lock(&LOCK_thread_count);
1479
 
  thd->current_linfo = 0;
1480
 
  pthread_mutex_unlock(&LOCK_thread_count);
1481
 
  return(ret);
1482
 
}
1483
 
 
1484
 
 
1485
1330
bool show_binlog_info(THD* thd)
1486
1331
{
1487
1332
  Protocol *protocol= thd->protocol;
1578
1423
    else
1579
1424
    {
1580
1425
      /* this is an old log, open it and find the size */
1581
 
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
 
1426
      if ((file= my_open(fname, O_RDONLY,
1582
1427
                         MYF(0))) >= 0)
1583
1428
      {
1584
1429
        file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));