~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_repl.cc

  • Committer: Mark Atwood
  • Date: 2008-10-16 11:33:16 UTC
  • mto: (520.1.13 drizzle)
  • mto: This revision was merged to the branch mainline in revision 530.
  • Revision ID: mark@fallenpegasus.com-20081016113316-ff6jdt31ck90sjdh
an implemention of the errmsg plugin

Show diffs side-by-side

added added

removed removed

Lines of Context:
138
138
    (void) my_close(fd, MYF(0));
139
139
  if (errmsg)
140
140
  {
141
 
    sql_print_error(errmsg);
 
141
    sql_print_error("%s",errmsg);
142
142
  }
143
143
  return(error);
144
144
}
718
718
 
719
719
        if (read_packet)
720
720
        {
721
 
          thd_proc_info(thd, "Sending binlog event to slave");
 
721
          thd->set_proc_info("Sending binlog event to slave");
722
722
          if (my_net_write(net, (unsigned char*) packet->ptr(), packet->length()) )
723
723
          {
724
724
            errmsg = "Failed on my_net_write()";
756
756
      bool loop_breaker = 0;
757
757
      /* need this to break out of the for loop from switch */
758
758
 
759
 
      thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
 
759
      thd->set_proc_info("Finished reading one binlog; switching to next binlog");
760
760
      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
761
761
      case LOG_INFO_EOF:
762
762
        loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
804
804
  (void)my_close(file, MYF(MY_WME));
805
805
 
806
806
  my_eof(thd);
807
 
  thd_proc_info(thd, "Waiting to finalize termination");
 
807
  thd->set_proc_info("Waiting to finalize termination");
808
808
  pthread_mutex_lock(&LOCK_thread_count);
809
809
  thd->current_linfo = 0;
810
810
  pthread_mutex_unlock(&LOCK_thread_count);
811
811
  return;
812
812
 
813
813
err:
814
 
  thd_proc_info(thd, "Waiting to finalize termination");
 
814
  thd->set_proc_info("Waiting to finalize termination");
815
815
  end_io_cache(&log);
816
816
  /*
817
817
    Exclude  iteration through thread list
848
848
    thread_mask&= thd->lex->slave_thd_opt;
849
849
  if (thread_mask) //some threads are stopped, start them
850
850
  {
851
 
    if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
852
 
                         thread_mask))
 
851
    if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
853
852
      slave_errno=ER_MASTER_INFO;
854
 
    else if (server_id_supplied && *mi->host)
 
853
    else if (server_id_supplied && *mi->getHostname())
855
854
    {
856
855
      /*
857
856
        If we will start SQL thread we will care about UNTIL options If
958
957
  if (!thd)
959
958
    thd = current_thd;
960
959
 
961
 
  thd_proc_info(thd, "Killing slave");
 
960
  thd->set_proc_info("Killing slave");
962
961
  int thread_mask;
963
962
  lock_slave_threads(mi);
964
963
  // Get a mask of _running_ threads
985
984
                 ER(ER_SLAVE_WAS_NOT_RUNNING));
986
985
  }
987
986
  unlock_slave_threads(mi);
988
 
  thd_proc_info(thd, 0);
 
987
  thd->set_proc_info(0);
989
988
 
990
989
  if (slave_errno)
991
990
  {
1038
1037
    goto err;
1039
1038
 
1040
1039
  /* Clear master's log coordinates */
1041
 
  init_master_log_pos(mi);
 
1040
  mi->reset();
1042
1041
  /*
1043
1042
     Reset errors (the idea is that we forget about the
1044
1043
     old master).
1047
1046
  mi->rli.clear_until_condition();
1048
1047
 
1049
1048
  // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
1050
 
  end_master_info(mi);
 
1049
  mi->end_master_info();
1051
1050
  // and delete these two files
1052
1051
  fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
1053
1052
  if (!stat(fname, &stat_area) && my_delete(fname, MYF(MY_WME)))
1134
1133
    return(true);
1135
1134
  }
1136
1135
 
1137
 
  thd_proc_info(thd, "Changing master");
 
1136
  thd->set_proc_info("Changing master");
1138
1137
  LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1139
1138
  // TODO: see if needs re-write
1140
 
  if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
1141
 
                       thread_mask))
 
1139
  if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
1142
1140
  {
1143
1141
    my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1144
1142
    unlock_slave_threads(mi);
1157
1155
  */
1158
1156
 
1159
1157
  if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1160
 
  {
1161
 
    mi->master_log_name[0] = 0;
1162
 
    mi->master_log_pos= BIN_LOG_HEADER_SIZE;
1163
 
  }
 
1158
    mi->reset();
1164
1159
 
1165
1160
  if (lex_mi->log_file_name)
1166
 
    strmake(mi->master_log_name, lex_mi->log_file_name,
1167
 
            sizeof(mi->master_log_name)-1);
 
1161
    mi->setLogName(lex_mi->log_file_name);
1168
1162
  if (lex_mi->pos)
1169
1163
  {
1170
 
    mi->master_log_pos= lex_mi->pos;
 
1164
    mi->setLogPosition(lex_mi->pos);
1171
1165
  }
1172
1166
 
1173
1167
  if (lex_mi->host)
1174
 
    strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
 
1168
    mi->setHost(lex_mi->host, lex_mi->port);
1175
1169
  if (lex_mi->user)
1176
 
    strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
 
1170
    mi->setUsername(lex_mi->user);
1177
1171
  if (lex_mi->password)
1178
 
    strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
1179
 
  if (lex_mi->port)
1180
 
    mi->port = lex_mi->port;
 
1172
    mi->setPassword(lex_mi->password);
1181
1173
  if (lex_mi->connect_retry)
1182
1174
    mi->connect_retry = lex_mi->connect_retry;
1183
1175
  if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1186
1178
    mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
1187
1179
                                      (slave_net_timeout/2.0));
1188
1180
  mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
1189
 
  if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1190
 
    mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
1191
 
 
1192
 
  if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1193
 
    mi->ssl_verify_server_cert=
1194
 
      (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
1195
 
 
1196
 
  if (lex_mi->ssl_ca)
1197
 
    strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
1198
 
  if (lex_mi->ssl_capath)
1199
 
    strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
1200
 
  if (lex_mi->ssl_cert)
1201
 
    strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
1202
 
  if (lex_mi->ssl_cipher)
1203
 
    strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
1204
 
  if (lex_mi->ssl_key)
1205
 
    strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
1206
 
  if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
1207
 
      lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
1208
 
      lex_mi->ssl_verify_server_cert )
1209
 
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1210
 
                 ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1211
1181
 
1212
1182
  if (lex_mi->relay_log_name)
1213
1183
  {
1214
1184
    need_relay_log_purge= 0;
1215
 
    strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
1216
 
            sizeof(mi->rli.group_relay_log_name)-1);
1217
 
    strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
1218
 
            sizeof(mi->rli.event_relay_log_name)-1);
 
1185
    mi->rli.event_relay_log_name.assign(lex_mi->relay_log_name);
1219
1186
  }
1220
1187
 
1221
1188
  if (lex_mi->relay_log_pos)
1249
1216
       of replication is not 100% clear, so we guard against problems using
1250
1217
       cmax().
1251
1218
      */
1252
 
     mi->master_log_pos = ((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1253
 
                           ? BIN_LOG_HEADER_SIZE
1254
 
                           : mi->rli.group_master_log_pos);
1255
 
     strmake(mi->master_log_name, mi->rli.group_master_log_name,
1256
 
             sizeof(mi->master_log_name)-1);
 
1219
     mi->setLogPosition(((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
 
1220
                         ? BIN_LOG_HEADER_SIZE
 
1221
                         : mi->rli.group_master_log_pos));
 
1222
     mi->setLogName(mi->rli.group_master_log_name.c_str());
1257
1223
  }
1258
1224
  /*
1259
1225
    Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1260
1226
    a slave before).
1261
1227
  */
1262
 
  if (flush_master_info(mi, 0))
 
1228
  if (mi->flush())
1263
1229
  {
1264
1230
    my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1265
1231
    unlock_slave_threads(mi);
1268
1234
  if (need_relay_log_purge)
1269
1235
  {
1270
1236
    relay_log_purge= 1;
1271
 
    thd_proc_info(thd, "Purging old relay logs");
 
1237
    thd->set_proc_info("Purging old relay logs");
1272
1238
    if (purge_relay_logs(&mi->rli, thd,
1273
1239
                         0 /* not only reset, but also reinit */,
1274
1240
                         &errmsg))
1284
1250
    relay_log_purge= 0;
1285
1251
    /* Relay log is already initialized */
1286
1252
    if (init_relay_log_pos(&mi->rli,
1287
 
                           mi->rli.group_relay_log_name,
 
1253
                           mi->rli.group_relay_log_name.c_str(),
1288
1254
                           mi->rli.group_relay_log_pos,
1289
1255
                           0 /*no data lock*/,
1290
1256
                           &msg, 0))
1304
1270
    ''/0: we have lost all copies of the original good coordinates.
1305
1271
    That's why we always save good coords in rli.
1306
1272
  */
1307
 
  mi->rli.group_master_log_pos= mi->master_log_pos;
1308
 
  strmake(mi->rli.group_master_log_name,mi->master_log_name,
1309
 
          sizeof(mi->rli.group_master_log_name)-1);
 
1273
  mi->rli.group_master_log_pos= mi->getLogPosition();
 
1274
  mi->rli.group_master_log_name.assign(mi->getLogName());
1310
1275
 
1311
 
  if (!mi->rli.group_master_log_name[0]) // uninitialized case
1312
 
    mi->rli.group_master_log_pos=0;
 
1276
  if (mi->rli.group_master_log_name.size() == 0) // uninitialized case
 
1277
    mi->rli.group_master_log_pos= 0;
1313
1278
 
1314
1279
  pthread_mutex_lock(&mi->rli.data_lock);
1315
1280
  mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1328
1293
  pthread_mutex_unlock(&mi->rli.data_lock);
1329
1294
 
1330
1295
  unlock_slave_threads(mi);
1331
 
  thd_proc_info(thd, 0);
 
1296
  thd->set_proc_info(0);
1332
1297
  my_ok(thd);
1333
1298
  return(false);
1334
1299
}
1362
1327
}
1363
1328
 
1364
1329
 
1365
 
bool mysql_show_binlog_events(THD* thd)
1366
 
{
1367
 
  Protocol *protocol= thd->protocol;
1368
 
  List<Item> field_list;
1369
 
  const char *errmsg= 0;
1370
 
  bool ret= true;
1371
 
  IO_CACHE log;
1372
 
  File file= -1;
1373
 
 
1374
 
  Log_event::init_show_field_list(&field_list);
1375
 
  if (protocol->send_fields(&field_list,
1376
 
                            Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1377
 
    return(true);
1378
 
 
1379
 
  Format_description_log_event *description_event= new
1380
 
    Format_description_log_event(3); /* MySQL 4.0 by default */
1381
 
 
1382
 
  if (mysql_bin_log.is_open())
1383
 
  {
1384
 
    LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1385
 
    SELECT_LEX_UNIT *unit= &thd->lex->unit;
1386
 
    ha_rows event_count, limit_start, limit_end;
1387
 
    my_off_t pos = cmax((uint64_t)BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1388
 
    char search_file_name[FN_REFLEN], *name;
1389
 
    const char *log_file_name = lex_mi->log_file_name;
1390
 
    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1391
 
    LOG_INFO linfo;
1392
 
    Log_event* ev;
1393
 
 
1394
 
    unit->set_limit(thd->lex->current_select);
1395
 
    limit_start= unit->offset_limit_cnt;
1396
 
    limit_end= unit->select_limit_cnt;
1397
 
 
1398
 
    name= search_file_name;
1399
 
    if (log_file_name)
1400
 
      mysql_bin_log.make_log_name(search_file_name, log_file_name);
1401
 
    else
1402
 
      name=0;                                   // Find first log
1403
 
 
1404
 
    linfo.index_file_offset = 0;
1405
 
 
1406
 
    if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1407
 
    {
1408
 
      errmsg = "Could not find target log";
1409
 
      goto err;
1410
 
    }
1411
 
 
1412
 
    pthread_mutex_lock(&LOCK_thread_count);
1413
 
    thd->current_linfo = &linfo;
1414
 
    pthread_mutex_unlock(&LOCK_thread_count);
1415
 
 
1416
 
    if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
1417
 
      goto err;
1418
 
 
1419
 
    /*
1420
 
      to account binlog event header size
1421
 
    */
1422
 
    thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
1423
 
 
1424
 
    pthread_mutex_lock(log_lock);
1425
 
 
1426
 
    /*
1427
 
      open_binlog() sought to position 4.
1428
 
      Read the first event in case it's a Format_description_log_event, to
1429
 
      know the format. If there's no such event, we are 3.23 or 4.x. This
1430
 
      code, like before, can't read 3.23 binlogs.
1431
 
      This code will fail on a mixed relay log (one which has Format_desc then
1432
 
      Rotate then Format_desc).
1433
 
    */
1434
 
    ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
1435
 
    if (ev)
1436
 
    {
1437
 
      if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
1438
 
      {
1439
 
        delete description_event;
1440
 
        description_event= (Format_description_log_event*) ev;
1441
 
      }
1442
 
      else
1443
 
        delete ev;
1444
 
    }
1445
 
 
1446
 
    my_b_seek(&log, pos);
1447
 
 
1448
 
    if (!description_event->is_valid())
1449
 
    {
1450
 
      errmsg="Invalid Format_description event; could be out of memory";
1451
 
      goto err;
1452
 
    }
1453
 
 
1454
 
    for (event_count = 0;
1455
 
         (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
1456
 
                                         description_event)); )
1457
 
    {
1458
 
      if (event_count >= limit_start &&
1459
 
          ev->net_send(protocol, linfo.log_file_name, pos))
1460
 
      {
1461
 
        errmsg = "Net error";
1462
 
        delete ev;
1463
 
        pthread_mutex_unlock(log_lock);
1464
 
        goto err;
1465
 
      }
1466
 
 
1467
 
      pos = my_b_tell(&log);
1468
 
      delete ev;
1469
 
 
1470
 
      if (++event_count >= limit_end)
1471
 
        break;
1472
 
    }
1473
 
 
1474
 
    if (event_count < limit_end && log.error)
1475
 
    {
1476
 
      errmsg = "Wrong offset or I/O error";
1477
 
      pthread_mutex_unlock(log_lock);
1478
 
      goto err;
1479
 
    }
1480
 
 
1481
 
    pthread_mutex_unlock(log_lock);
1482
 
  }
1483
 
 
1484
 
  ret= false;
1485
 
 
1486
 
err:
1487
 
  delete description_event;
1488
 
  if (file >= 0)
1489
 
  {
1490
 
    end_io_cache(&log);
1491
 
    (void) my_close(file, MYF(MY_WME));
1492
 
  }
1493
 
 
1494
 
  if (errmsg)
1495
 
    my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
1496
 
             "SHOW BINLOG EVENTS", errmsg);
1497
 
  else
1498
 
    my_eof(thd);
1499
 
 
1500
 
  pthread_mutex_lock(&LOCK_thread_count);
1501
 
  thd->current_linfo = 0;
1502
 
  pthread_mutex_unlock(&LOCK_thread_count);
1503
 
  return(ret);
1504
 
}
1505
 
 
1506
 
 
1507
1330
bool show_binlog_info(THD* thd)
1508
1331
{
1509
1332
  Protocol *protocol= thd->protocol;
1600
1423
    else
1601
1424
    {
1602
1425
      /* this is an old log, open it and find the size */
1603
 
      if ((file= my_open(fname, O_RDONLY | O_SHARE | O_BINARY,
 
1426
      if ((file= my_open(fname, O_RDONLY,
1604
1427
                         MYF(0))) >= 0)
1605
1428
      {
1606
1429
        file_length= (uint64_t) my_seek(file, 0L, MY_SEEK_END, MYF(0));