~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/slave.cc

  • Committer: Brian Aker
  • Date: 2008-10-20 04:28:21 UTC
  • mto: (492.3.21 drizzle-clean-code)
  • mto: This revision was merged to the branch mainline in revision 530.
  • Revision ID: brian@tangent.org-20081020042821-rqqdrccuu8195k3y
Second pass of thd cleanup

Show diffs side-by-side

added added

removed removed

Lines of Context:
130
130
static int32_t process_io_rotate(Master_info* mi, Rotate_log_event* rev);
131
131
static int32_t process_io_create_file(Master_info* mi, Create_file_log_event* cev);
132
132
static bool wait_for_relay_log_space(Relay_log_info* rli);
133
 
static inline bool io_slave_killed(Session* thd,Master_info* mi);
134
 
static inline bool sql_slave_killed(Session* thd,Relay_log_info* rli);
135
 
static int32_t init_slave_thread(Session* thd, SLAVE_Session_TYPE thd_type);
136
 
static int32_t safe_connect(Session* thd, DRIZZLE *drizzle, Master_info* mi);
137
 
static int32_t safe_reconnect(Session* thd, DRIZZLE *drizzle, Master_info* mi,
 
133
static inline bool io_slave_killed(Session* session,Master_info* mi);
 
134
static inline bool sql_slave_killed(Session* session,Relay_log_info* rli);
 
135
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type);
 
136
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi);
 
137
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
138
138
                          bool suppress_warnings);
139
 
static int32_t connect_to_master(Session* thd, DRIZZLE *drizzle, Master_info* mi,
 
139
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
140
140
                             bool reconnect, bool suppress_warnings);
141
 
static int32_t safe_sleep(Session* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
141
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
142
142
                      void* thread_killed_arg);
143
143
static int32_t get_master_version_and_clock(DRIZZLE *drizzle, Master_info* mi);
144
144
static Log_event* next_event(Relay_log_info* rli);
145
145
static int32_t queue_event(Master_info* mi,const char* buf,uint32_t event_len);
146
 
static int32_t terminate_slave_thread(Session *thd,
 
146
static int32_t terminate_slave_thread(Session *session,
147
147
                                  pthread_mutex_t* term_lock,
148
148
                                  pthread_cond_t* term_cond,
149
149
                                  volatile uint32_t *slave_running,
150
150
                                  bool skip_lock);
151
 
static bool check_io_slave_killed(Session *thd, Master_info *mi, const char *info);
 
151
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info);
152
152
 
153
153
/*
154
154
  Find out which replications threads are running
319
319
  if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
320
320
  {
321
321
    mi->abort_slave=1;
322
 
    if ((error=terminate_slave_thread(mi->io_thd,io_lock,
 
322
    if ((error=terminate_slave_thread(mi->io_session,io_lock,
323
323
                                      &mi->stop_cond,
324
324
                                      &mi->slave_running,
325
325
                                      skip_lock)) &&
329
329
  if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
330
330
  {
331
331
    mi->rli.abort_slave=1;
332
 
    if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
 
332
    if ((error=terminate_slave_thread(mi->rli.sql_session,sql_lock,
333
333
                                      &mi->rli.stop_cond,
334
334
                                      &mi->rli.slave_running,
335
335
                                      skip_lock)) &&
370
370
   @retval 0 All OK
371
371
 */
372
372
static int32_t
373
 
terminate_slave_thread(Session *thd,
 
373
terminate_slave_thread(Session *session,
374
374
                       pthread_mutex_t* term_lock,
375
375
                       pthread_cond_t* term_cond,
376
376
                       volatile uint32_t *slave_running,
389
389
      pthread_mutex_unlock(term_lock);
390
390
    return(ER_SLAVE_NOT_RUNNING);
391
391
  }
392
 
  assert(thd != 0);
393
 
  Session_CHECK_SENTRY(thd);
 
392
  assert(session != 0);
 
393
  Session_CHECK_SENTRY(session);
394
394
 
395
395
  /*
396
396
    Is is critical to test if the slave is running. Otherwise, we might
399
399
 
400
400
  while (*slave_running)                        // Should always be true
401
401
  {
402
 
    pthread_mutex_lock(&thd->LOCK_delete);
 
402
    pthread_mutex_lock(&session->LOCK_delete);
403
403
#ifndef DONT_USE_THR_ALARM
404
404
    /*
405
405
      Error codes from pthread_kill are:
406
406
      EINVAL: invalid signal number (can't happen)
407
407
      ESRCH: thread already killed (can happen, should be ignored)
408
408
    */
409
 
    int32_t err= pthread_kill(thd->real_id, thr_client_alarm);
 
409
    int32_t err= pthread_kill(session->real_id, thr_client_alarm);
410
410
    assert(err != EINVAL);
411
411
#endif
412
 
    thd->awake(Session::NOT_KILLED);
413
 
    pthread_mutex_unlock(&thd->LOCK_delete);
 
412
    session->awake(Session::NOT_KILLED);
 
413
    pthread_mutex_unlock(&session->LOCK_delete);
414
414
 
415
415
    /*
416
416
      There is a small chance that slave thread might miss the first
480
480
  }
481
481
  if (start_cond && cond_lock) // caller has cond_lock
482
482
  {
483
 
    Session* thd = current_thd;
 
483
    Session* session = current_session;
484
484
    while (start_id == *slave_run_id)
485
485
    {
486
 
      const char* old_msg = thd->enter_cond(start_cond,cond_lock,
 
486
      const char* old_msg = session->enter_cond(start_cond,cond_lock,
487
487
                                            "Waiting for slave thread to start");
488
488
      pthread_cond_wait(start_cond,cond_lock);
489
 
      thd->exit_cond(old_msg);
 
489
      session->exit_cond(old_msg);
490
490
      pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
491
 
      if (thd->killed)
492
 
        return(thd->killed_errno());
 
491
      if (session->killed)
 
492
        return(session->killed_errno());
493
493
    }
494
494
  }
495
495
  if (start_lock)
591
591
}
592
592
 
593
593
 
594
 
static bool io_slave_killed(Session* thd, Master_info* mi)
 
594
static bool io_slave_killed(Session* session, Master_info* mi)
595
595
{
596
 
  assert(mi->io_thd == thd);
 
596
  assert(mi->io_session == session);
597
597
  assert(mi->slave_running); // tracking buffer overrun
598
 
  return(mi->abort_slave || abort_loop || thd->killed);
 
598
  return(mi->abort_slave || abort_loop || session->killed);
599
599
}
600
600
 
601
601
 
602
 
static bool sql_slave_killed(Session* thd, Relay_log_info* rli)
 
602
static bool sql_slave_killed(Session* session, Relay_log_info* rli)
603
603
{
604
 
  assert(rli->sql_thd == thd);
 
604
  assert(rli->sql_session == session);
605
605
  assert(rli->slave_running == 1);// tracking buffer overrun
606
 
  if (abort_loop || thd->killed || rli->abort_slave)
 
606
  if (abort_loop || session->killed || rli->abort_slave)
607
607
  {
608
608
    /*
609
609
      If we are in an unsafe situation (stopping could corrupt replication),
734
734
  return(1);
735
735
}
736
736
 
737
 
static bool check_io_slave_killed(Session *thd, Master_info *mi, const char *info)
 
737
static bool check_io_slave_killed(Session *session, Master_info *mi, const char *info)
738
738
{
739
 
  if (io_slave_killed(thd, mi))
 
739
  if (io_slave_killed(session, mi))
740
740
  {
741
741
    if (info && global_system_variables.log_warnings)
742
742
      sql_print_information("%s",info);
855
855
    mi->clock_diff_with_master=
856
856
      (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
857
857
  }
858
 
  else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
 
858
  else if (!check_io_slave_killed(mi->io_session, mi, NULL))
859
859
  {
860
860
    mi->clock_diff_with_master= 0; /* The "most sensible" value */
861
861
    sql_print_warning(_("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
991
991
    sprintf(query, query_format, llbuf);
992
992
 
993
993
    if (drizzle_real_query(drizzle, query, strlen(query))
994
 
        && !check_io_slave_killed(mi->io_thd, mi, NULL))
 
994
        && !check_io_slave_killed(mi->io_session, mi, NULL))
995
995
    {
996
996
      err_msg.append("The slave I/O thread stops because querying master with '");
997
997
      err_msg.append(query);
1026
1026
  bool slave_killed=0;
1027
1027
  Master_info* mi = rli->mi;
1028
1028
  const char *save_proc_info;
1029
 
  Session* thd = mi->io_thd;
 
1029
  Session* session = mi->io_session;
1030
1030
 
1031
1031
  pthread_mutex_lock(&rli->log_space_lock);
1032
 
  save_proc_info= thd->enter_cond(&rli->log_space_cond,
 
1032
  save_proc_info= session->enter_cond(&rli->log_space_cond,
1033
1033
                                  &rli->log_space_lock,
1034
1034
                                  _("Waiting for the slave SQL thread "
1035
1035
                                    "to free enough relay log space"));
1036
1036
  while (rli->log_space_limit < rli->log_space_total &&
1037
 
         !(slave_killed=io_slave_killed(thd,mi)) &&
 
1037
         !(slave_killed=io_slave_killed(session,mi)) &&
1038
1038
         !rli->ignore_log_space_limit)
1039
1039
    pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1040
 
  thd->exit_cond(save_proc_info);
 
1040
  session->exit_cond(save_proc_info);
1041
1041
  return(slave_killed);
1042
1042
}
1043
1043
 
1047
1047
 
1048
1048
  SYNOPSIS
1049
1049
  write_ignored_events_info_to_relay_log()
1050
 
    thd             pointer to I/O thread's thd
 
1050
    session             pointer to I/O thread's session
1051
1051
    mi
1052
1052
 
1053
1053
  DESCRIPTION
1055
1055
    ignored events' end position for the use of the slave SQL thread, by
1056
1056
    calling this function. Only that thread can call it (see assertion).
1057
1057
 */
1058
 
static void write_ignored_events_info_to_relay_log(Session *thd __attribute__((unused)),
 
1058
static void write_ignored_events_info_to_relay_log(Session *session __attribute__((unused)),
1059
1059
                                                   Master_info *mi)
1060
1060
{
1061
1061
  Relay_log_info *rli= &mi->rli;
1062
1062
  pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1063
1063
 
1064
 
  assert(thd == mi->io_thd);
 
1064
  assert(session == mi->io_session);
1065
1065
  pthread_mutex_lock(log_lock);
1066
1066
  if (rli->ign_master_log_name_end[0])
1067
1067
  {
1069
1069
                                               0, rli->ign_master_log_pos_end,
1070
1070
                                               Rotate_log_event::DUP_NAME);
1071
1071
    rli->ign_master_log_name_end[0]= 0;
1072
 
    /* can unlock before writing as slave SQL thd will soon see our Rotate */
 
1072
    /* can unlock before writing as slave SQL session will soon see our Rotate */
1073
1073
    pthread_mutex_unlock(log_lock);
1074
1074
    if (likely((bool)ev))
1075
1075
    {
1131
1131
    {
1132
1132
      *suppress_warnings= true;                 // Suppress reconnect warning
1133
1133
    }
1134
 
    else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
 
1134
    else if (!check_io_slave_killed(mi->io_session, mi, NULL))
1135
1135
    {
1136
1136
      char buf[256];
1137
1137
      snprintf(buf, sizeof(buf), "%s (Errno: %d)", drizzle_error(drizzle), 
1145
1145
}
1146
1146
 
1147
1147
 
1148
 
bool show_master_info(Session* thd, Master_info* mi)
 
1148
bool show_master_info(Session* session, Master_info* mi)
1149
1149
{
1150
1150
  // TODO: fix this for multi-master
1151
1151
  List<Item> field_list;
1152
 
  Protocol *protocol= thd->protocol;
 
1152
  Protocol *protocol= session->protocol;
1153
1153
 
1154
1154
  field_list.push_back(new Item_empty_string("Slave_IO_State",
1155
1155
                                                     14));
1205
1205
 
1206
1206
  if (mi->host[0])
1207
1207
  {
1208
 
    String *packet= &thd->packet;
 
1208
    String *packet= &session->packet;
1209
1209
    protocol->prepare_for_resend();
1210
1210
 
1211
1211
    /*
1212
1212
      slave_running can be accessed without run_lock but not other
1213
 
      non-volotile members like mi->io_thd, which is guarded by the mutex.
 
1213
      non-volotile members like mi->io_session, which is guarded by the mutex.
1214
1214
    */
1215
1215
    pthread_mutex_lock(&mi->run_lock);
1216
 
    protocol->store(mi->io_thd ? mi->io_thd->get_proc_info() : "", &my_charset_bin);
 
1216
    protocol->store(mi->io_session ? mi->io_session->get_proc_info() : "", &my_charset_bin);
1217
1217
    pthread_mutex_unlock(&mi->run_lock);
1218
1218
 
1219
1219
    pthread_mutex_lock(&mi->data_lock);
1308
1308
    pthread_mutex_unlock(&mi->rli.data_lock);
1309
1309
    pthread_mutex_unlock(&mi->data_lock);
1310
1310
 
1311
 
    if (my_net_write(&thd->net, (unsigned char*) thd->packet.ptr(), packet->length()))
 
1311
    if (my_net_write(&session->net, (unsigned char*) session->packet.ptr(), packet->length()))
1312
1312
      return(true);
1313
1313
  }
1314
 
  my_eof(thd);
 
1314
  my_eof(session);
1315
1315
  return(false);
1316
1316
}
1317
1317
 
1318
1318
 
1319
 
void set_slave_thread_options(Session* thd)
 
1319
void set_slave_thread_options(Session* session)
1320
1320
{
1321
1321
  /*
1322
1322
     It's nonsense to constrain the slave threads with max_join_size; if a
1327
1327
     when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1328
1328
     only for client threads.
1329
1329
  */
1330
 
  uint64_t options= thd->options | OPTION_BIG_SELECTS;
 
1330
  uint64_t options= session->options | OPTION_BIG_SELECTS;
1331
1331
  if (opt_log_slave_updates)
1332
1332
    options|= OPTION_BIN_LOG;
1333
1333
  else
1334
1334
    options&= ~OPTION_BIN_LOG;
1335
 
  thd->options= options;
1336
 
  thd->variables.completion_type= 0;
 
1335
  session->options= options;
 
1336
  session->variables.completion_type= 0;
1337
1337
  return;
1338
1338
}
1339
1339
 
1341
1341
  init_slave_thread()
1342
1342
*/
1343
1343
 
1344
 
static int32_t init_slave_thread(Session* thd, SLAVE_Session_TYPE thd_type)
 
1344
static int32_t init_slave_thread(Session* session, SLAVE_Session_TYPE session_type)
1345
1345
{
1346
1346
  int32_t simulate_error= 0;
1347
 
  thd->system_thread = (thd_type == SLAVE_Session_SQL) ?
 
1347
  session->system_thread = (session_type == SLAVE_Session_SQL) ?
1348
1348
    SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1349
 
  thd->security_ctx->skip_grants();
1350
 
  my_net_init(&thd->net, 0);
 
1349
  session->security_ctx->skip_grants();
 
1350
  my_net_init(&session->net, 0);
1351
1351
/*
1352
1352
  Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1353
1353
  slave threads, since a replication event can become this much larger
1354
1354
  than the corresponding packet (query) sent from client to master.
1355
1355
*/
1356
 
  thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet
 
1356
  session->variables.max_allowed_packet= global_system_variables.max_allowed_packet
1357
1357
    + MAX_LOG_EVENT_HEADER;  /* note, incr over the global not session var */
1358
 
  thd->slave_thread = 1;
1359
 
  set_slave_thread_options(thd);
1360
 
  thd->client_capabilities = CLIENT_LOCAL_FILES;
 
1358
  session->slave_thread = 1;
 
1359
  set_slave_thread_options(session);
 
1360
  session->client_capabilities = CLIENT_LOCAL_FILES;
1361
1361
  pthread_mutex_lock(&LOCK_thread_count);
1362
 
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
 
1362
  session->thread_id= session->variables.pseudo_thread_id= thread_id++;
1363
1363
  pthread_mutex_unlock(&LOCK_thread_count);
1364
1364
 
1365
1365
 simulate_error|= (1 << SLAVE_Session_IO);
1366
1366
 simulate_error|= (1 << SLAVE_Session_SQL);
1367
 
  if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
 
1367
  if (init_thr_lock() || session->store_globals() || simulate_error & (1<< session_type))
1368
1368
  {
1369
 
    thd->cleanup();
 
1369
    session->cleanup();
1370
1370
    return(-1);
1371
1371
  }
1372
 
  lex_start(thd);
 
1372
  lex_start(session);
1373
1373
 
1374
 
  if (thd_type == SLAVE_Session_SQL)
1375
 
    thd->set_proc_info("Waiting for the next event in relay log");
 
1374
  if (session_type == SLAVE_Session_SQL)
 
1375
    session->set_proc_info("Waiting for the next event in relay log");
1376
1376
  else
1377
 
    thd->set_proc_info("Waiting for master update");
1378
 
  thd->version=refresh_version;
1379
 
  thd->set_time();
 
1377
    session->set_proc_info("Waiting for master update");
 
1378
  session->version=refresh_version;
 
1379
  session->set_time();
1380
1380
  return(0);
1381
1381
}
1382
1382
 
1383
1383
 
1384
 
static int32_t safe_sleep(Session* thd, int32_t sec, CHECK_KILLED_FUNC thread_killed,
 
1384
static int32_t safe_sleep(Session* session, int32_t sec, CHECK_KILLED_FUNC thread_killed,
1385
1385
                      void* thread_killed_arg)
1386
1386
{
1387
1387
  int32_t nap_time;
1403
1403
    sleep(nap_time);
1404
1404
    thr_end_alarm(&alarmed);
1405
1405
 
1406
 
    if ((*thread_killed)(thd,thread_killed_arg))
 
1406
    if ((*thread_killed)(session,thread_killed_arg))
1407
1407
      return(1);
1408
1408
    start_time= my_time(0);
1409
1409
  }
1508
1508
}
1509
1509
 
1510
1510
 
1511
 
int32_t check_expected_error(Session* thd __attribute__((unused)),
 
1511
int32_t check_expected_error(Session* session __attribute__((unused)),
1512
1512
                         Relay_log_info const *rli __attribute__((unused)),
1513
1513
                         int32_t expected_error)
1514
1514
{
1532
1532
  that the error is temporary by pushing a warning with the error code
1533
1533
  ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
1534
1534
*/
1535
 
static int32_t has_temporary_error(Session *thd)
 
1535
static int32_t has_temporary_error(Session *session)
1536
1536
{
1537
 
  if (thd->is_fatal_error)
 
1537
  if (session->is_fatal_error)
1538
1538
    return(0);
1539
1539
 
1540
 
  if (thd->main_da.is_error())
 
1540
  if (session->main_da.is_error())
1541
1541
  {
1542
 
    thd->clear_error();
 
1542
    session->clear_error();
1543
1543
    my_error(ER_LOCK_DEADLOCK, MYF(0));
1544
1544
  }
1545
1545
 
1548
1548
    error or not. This is currently the case for Incident_log_event,
1549
1549
    which sets no message. Return FALSE.
1550
1550
  */
1551
 
  if (!thd->is_error())
 
1551
  if (!session->is_error())
1552
1552
    return(0);
1553
1553
 
1554
1554
  /*
1556
1556
    currently, InnoDB deadlock detected by InnoDB or lock
1557
1557
    wait timeout (innodb_lock_wait_timeout exceeded
1558
1558
  */
1559
 
  if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
1560
 
      thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
 
1559
  if (session->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
 
1560
      session->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
1561
1561
    return(1);
1562
1562
 
1563
1563
  return(0);
1590
1590
  @retval 2 No error calling ev->apply_event(), but error calling
1591
1591
  ev->update_pos().
1592
1592
*/
1593
 
int32_t apply_event_and_update_pos(Log_event* ev, Session* thd, Relay_log_info* rli,
 
1593
int32_t apply_event_and_update_pos(Log_event* ev, Session* session, Relay_log_info* rli,
1594
1594
                               bool skip)
1595
1595
{
1596
1596
  int32_t exec_res= 0;
1619
1619
    has a Rotate etc).
1620
1620
  */
1621
1621
 
1622
 
  thd->server_id = ev->server_id; // use the original server id for logging
1623
 
  thd->set_time();                            // time the query
1624
 
  thd->lex->current_select= 0;
 
1622
  session->server_id = ev->server_id; // use the original server id for logging
 
1623
  session->set_time();                            // time the query
 
1624
  session->lex->current_select= 0;
1625
1625
  if (!ev->when)
1626
1626
    ev->when= my_time(0);
1627
 
  ev->thd = thd; // because up to this point, ev->thd == 0
 
1627
  ev->session = session; // because up to this point, ev->session == 0
1628
1628
 
1629
1629
  if (skip)
1630
1630
  {
1694
1694
 
1695
1695
  @retval 1 The event was not applied.
1696
1696
*/
1697
 
static int32_t exec_relay_log_event(Session* thd, Relay_log_info* rli)
 
1697
static int32_t exec_relay_log_event(Session* session, Relay_log_info* rli)
1698
1698
{
1699
1699
  /*
1700
1700
     We acquire this mutex since we need it for all operations except
1705
1705
 
1706
1706
  Log_event * ev = next_event(rli);
1707
1707
 
1708
 
  assert(rli->sql_thd==thd);
 
1708
  assert(rli->sql_session==session);
1709
1709
 
1710
 
  if (sql_slave_killed(thd,rli))
 
1710
  if (sql_slave_killed(session,rli))
1711
1711
  {
1712
1712
    pthread_mutex_unlock(&rli->data_lock);
1713
1713
    delete ev;
1739
1739
      delete ev;
1740
1740
      return(1);
1741
1741
    }
1742
 
    exec_res= apply_event_and_update_pos(ev, thd, rli, true);
 
1742
    exec_res= apply_event_and_update_pos(ev, session, rli, true);
1743
1743
 
1744
1744
    /*
1745
1745
      Format_description_log_event should not be deleted because it will be
1761
1761
    if (slave_trans_retries)
1762
1762
    {
1763
1763
      int32_t temp_err= 0;
1764
 
      if (exec_res && (temp_err= has_temporary_error(thd)))
 
1764
      if (exec_res && (temp_err= has_temporary_error(session)))
1765
1765
      {
1766
1766
        const char *errmsg;
1767
1767
        /*
1791
1791
          else
1792
1792
          {
1793
1793
            exec_res= 0;
1794
 
            end_trans(thd, ROLLBACK);
 
1794
            end_trans(session, ROLLBACK);
1795
1795
            /* chance for concurrent connection to get more locks */
1796
 
            safe_sleep(thd, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
 
1796
            safe_sleep(session, cmin(rli->trans_retries, (uint32_t)MAX_SLAVE_RETRY_PAUSE),
1797
1797
                       (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
1798
1798
            pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
1799
1799
            rli->trans_retries++;
1851
1851
  are taken from @c messages array. In case @c master_retry_count is exceeded,
1852
1852
  no messages are added to the log.
1853
1853
 
1854
 
  @param[in]     thd                 Thread context.
 
1854
  @param[in]     session                 Thread context.
1855
1855
  @param[in]     DRIZZLE               DRIZZLE connection.
1856
1856
  @param[in]     mi                  Master connection information.
1857
1857
  @param[in,out] retry_count         Number of attempts to reconnect.
1864
1864
  @retval        1                   There was an error.
1865
1865
*/
1866
1866
 
1867
 
static int32_t try_to_reconnect(Session *thd, DRIZZLE *drizzle, Master_info *mi,
 
1867
static int32_t try_to_reconnect(Session *session, DRIZZLE *drizzle, Master_info *mi,
1868
1868
                                uint32_t *retry_count, bool suppress_warnings,
1869
1869
                                const char *messages[SLAVE_RECON_MSG_MAX])
1870
1870
{
1871
1871
  mi->slave_running= DRIZZLE_SLAVE_RUN_NOT_CONNECT;
1872
 
  thd->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
 
1872
  session->set_proc_info(_(messages[SLAVE_RECON_MSG_WAIT]));
1873
1873
  drizzle_disconnect(drizzle);
1874
1874
  if ((*retry_count)++)
1875
1875
  {
1876
1876
    if (*retry_count > master_retry_count)
1877
1877
      return 1;                             // Don't retry forever
1878
 
    safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
 
1878
    safe_sleep(session, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
1879
1879
               (void *) mi);
1880
1880
  }
1881
 
  if (check_io_slave_killed(thd, mi,
 
1881
  if (check_io_slave_killed(session, mi,
1882
1882
                            _(messages[SLAVE_RECON_MSG_KILLED_WAITING])))
1883
1883
    return 1;
1884
 
  thd->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
 
1884
  session->set_proc_info(_(messages[SLAVE_RECON_MSG_AFTER]));
1885
1885
  if (!suppress_warnings)
1886
1886
  {
1887
1887
    char buf[256], llbuff[22];
1902
1902
      sql_print_information("%s",buf);
1903
1903
    }
1904
1904
  }
1905
 
  if (safe_reconnect(thd, drizzle, mi, 1) || io_slave_killed(thd, mi))
 
1905
  if (safe_reconnect(session, drizzle, mi, 1) || io_slave_killed(session, mi))
1906
1906
  {
1907
1907
    if (global_system_variables.log_warnings)
1908
1908
      sql_print_information("%s",_(messages[SLAVE_RECON_MSG_KILLED_AFTER]));
1916
1916
 
1917
1917
pthread_handler_t handle_slave_io(void *arg)
1918
1918
{
1919
 
  Session *thd; // needs to be first for thread_stack
 
1919
  Session *session; // needs to be first for thread_stack
1920
1920
  DRIZZLE *drizzle;
1921
1921
  Master_info *mi = (Master_info*)arg;
1922
1922
  Relay_log_info *rli= &mi->rli;
1936
1936
 
1937
1937
  mi->events_till_disconnect = disconnect_slave_event_count;
1938
1938
 
1939
 
  thd= new Session;
1940
 
  Session_CHECK_SENTRY(thd);
1941
 
  mi->io_thd = thd;
 
1939
  session= new Session;
 
1940
  Session_CHECK_SENTRY(session);
 
1941
  mi->io_session = session;
1942
1942
 
1943
1943
  pthread_detach_this_thread();
1944
 
  thd->thread_stack= (char*) &thd; // remember where our stack is
1945
 
  if (init_slave_thread(thd, SLAVE_Session_IO))
 
1944
  session->thread_stack= (char*) &session; // remember where our stack is
 
1945
  if (init_slave_thread(session, SLAVE_Session_IO))
1946
1946
  {
1947
1947
    pthread_cond_broadcast(&mi->start_cond);
1948
1948
    pthread_mutex_unlock(&mi->run_lock);
1950
1950
    goto err;
1951
1951
  }
1952
1952
  pthread_mutex_lock(&LOCK_thread_count);
1953
 
  threads.append(thd);
 
1953
  threads.append(session);
1954
1954
  pthread_mutex_unlock(&LOCK_thread_count);
1955
1955
  mi->slave_running = 1;
1956
1956
  mi->abort_slave = 0;
1964
1964
    goto err;
1965
1965
  }
1966
1966
 
1967
 
  thd->set_proc_info("Connecting to master");
 
1967
  session->set_proc_info("Connecting to master");
1968
1968
  // we can get killed during safe_connect
1969
 
  if (!safe_connect(thd, drizzle, mi))
 
1969
  if (!safe_connect(session, drizzle, mi))
1970
1970
  {
1971
1971
    sql_print_information(_("Slave I/O thread: connected to master '%s@%s:%d',"
1972
1972
                            "replication started in log '%s' at position %s"),
1978
1978
    thread, since a replication event can become this much larger than
1979
1979
    the corresponding packet (query) sent from client to master.
1980
1980
  */
1981
 
    drizzle->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
 
1981
    drizzle->net.max_packet_size= session->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
1982
1982
  }
1983
1983
  else
1984
1984
  {
1990
1990
 
1991
1991
  // TODO: the assignment below should be under mutex (5.0)
1992
1992
  mi->slave_running= DRIZZLE_SLAVE_RUN_CONNECT;
1993
 
  thd->slave_net = &drizzle->net;
1994
 
  thd->set_proc_info("Checking master version");
 
1993
  session->slave_net = &drizzle->net;
 
1994
  session->set_proc_info("Checking master version");
1995
1995
  if (get_master_version_and_clock(drizzle, mi))
1996
1996
    goto err;
1997
1997
  
2000
2000
    /*
2001
2001
      Register ourselves with the master.
2002
2002
    */
2003
 
    thd->set_proc_info("Registering slave on master");
 
2003
    session->set_proc_info("Registering slave on master");
2004
2004
    if (register_slave_on_master(drizzle, mi, &suppress_warnings))
2005
2005
    {
2006
 
      if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
 
2006
      if (!check_io_slave_killed(session, mi, "Slave I/O thread killed "
2007
2007
                                 "while registering slave on master"))
2008
2008
      {
2009
2009
        sql_print_error(_("Slave I/O thread couldn't register on master"));
2010
 
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2010
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2011
2011
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
2012
2012
          goto err;
2013
2013
      }
2019
2019
    {
2020
2020
      retry_count_reg++;
2021
2021
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
2022
 
      if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2022
      if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2023
2023
                           reconnect_messages[SLAVE_RECON_ACT_REG]))
2024
2024
        goto err;
2025
2025
      goto connected;
2026
2026
    }
2027
2027
  }
2028
2028
 
2029
 
  while (!io_slave_killed(thd,mi))
 
2029
  while (!io_slave_killed(session,mi))
2030
2030
  {
2031
 
    thd->set_proc_info("Requesting binlog dump");
 
2031
    session->set_proc_info("Requesting binlog dump");
2032
2032
    if (request_dump(drizzle, mi, &suppress_warnings))
2033
2033
    {
2034
2034
      sql_print_error(_("Failed on request_dump()"));
2035
 
      if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while \
 
2035
      if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while \
2036
2036
requesting master dump")) ||
2037
 
          try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2037
          try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2038
2038
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2039
2039
        goto err;
2040
2040
      goto connected;
2043
2043
    {
2044
2044
      retry_count_dump++;
2045
2045
      sql_print_information(_("Forcing to reconnect slave I/O thread"));
2046
 
      if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2046
      if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2047
2047
                           reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2048
2048
        goto err;
2049
2049
      goto connected;
2050
2050
    }
2051
2051
 
2052
 
    while (!io_slave_killed(thd,mi))
 
2052
    while (!io_slave_killed(session,mi))
2053
2053
    {
2054
2054
      uint32_t event_len;
2055
2055
      /*
2058
2058
        important thing is to not confuse users by saying "reading" whereas
2059
2059
        we're in fact receiving nothing.
2060
2060
      */
2061
 
      thd->set_proc_info(_("Waiting for master to send event"));
 
2061
      session->set_proc_info(_("Waiting for master to send event"));
2062
2062
      event_len= read_event(drizzle, mi, &suppress_warnings);
2063
 
      if (check_io_slave_killed(thd, mi, _("Slave I/O thread killed while "
 
2063
      if (check_io_slave_killed(session, mi, _("Slave I/O thread killed while "
2064
2064
                                           "reading event")))
2065
2065
        goto err;
2066
2066
      if (!retry_count_event)
2067
2067
      {
2068
2068
        retry_count_event++;
2069
2069
        sql_print_information(_("Forcing to reconnect slave I/O thread"));
2070
 
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2070
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2071
2071
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2072
2072
          goto err;
2073
2073
        goto connected;
2083
2083
                            "slave. If the entry is correct, restart the "
2084
2084
                            "server with a higher value of "
2085
2085
                            "max_allowed_packet"),
2086
 
                          thd->variables.max_allowed_packet);
 
2086
                          session->variables.max_allowed_packet);
2087
2087
          goto err;
2088
2088
        case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2089
2089
          sql_print_error(ER(drizzle_error_number), drizzle_error_number,
2095
2095
       _("Stopping slave I/O thread due to out-of-memory error from master"));
2096
2096
          goto err;
2097
2097
        }
2098
 
        if (try_to_reconnect(thd, drizzle, mi, &retry_count, suppress_warnings,
 
2098
        if (try_to_reconnect(session, drizzle, mi, &retry_count, suppress_warnings,
2099
2099
                             reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2100
2100
          goto err;
2101
2101
        goto connected;
2102
2102
      } // if (event_len == packet_error)
2103
2103
 
2104
2104
      retry_count=0;                    // ok event, reset retry counter
2105
 
      thd->set_proc_info(_("Queueing master event to the relay log"));
 
2105
      session->set_proc_info(_("Queueing master event to the relay log"));
2106
2106
      if (queue_event(mi,(const char*)drizzle->net.read_pos + 1, event_len))
2107
2107
      {
2108
2108
        goto err;
2143
2143
                          "position %s"),
2144
2144
                        IO_RPL_LOG_NAME, llstr(mi->getLogPosition(), llbuff));
2145
2145
  pthread_mutex_lock(&LOCK_thread_count);
2146
 
  thd->query = thd->db = 0; // extra safety
2147
 
  thd->query_length= thd->db_length= 0;
 
2146
  session->query = session->db = 0; // extra safety
 
2147
  session->query_length= session->db_length= 0;
2148
2148
  pthread_mutex_unlock(&LOCK_thread_count);
2149
2149
  if (drizzle)
2150
2150
  {
2159
2159
    drizzle_close(drizzle);
2160
2160
    mi->drizzle=0;
2161
2161
  }
2162
 
  write_ignored_events_info_to_relay_log(thd, mi);
2163
 
  thd->set_proc_info(_("Waiting for slave mutex on exit"));
 
2162
  write_ignored_events_info_to_relay_log(session, mi);
 
2163
  session->set_proc_info(_("Waiting for slave mutex on exit"));
2164
2164
  pthread_mutex_lock(&mi->run_lock);
2165
2165
 
2166
2166
  /* Forget the relay log's format */
2167
2167
  delete mi->rli.relay_log.description_event_for_queue;
2168
2168
  mi->rli.relay_log.description_event_for_queue= 0;
2169
 
  assert(thd->net.buff != 0);
2170
 
  net_end(&thd->net); // destructor will not free it, because net.vio is 0
2171
 
  close_thread_tables(thd);
 
2169
  assert(session->net.buff != 0);
 
2170
  net_end(&session->net); // destructor will not free it, because net.vio is 0
 
2171
  close_thread_tables(session);
2172
2172
  pthread_mutex_lock(&LOCK_thread_count);
2173
 
  Session_CHECK_SENTRY(thd);
2174
 
  delete thd;
 
2173
  Session_CHECK_SENTRY(session);
 
2174
  delete session;
2175
2175
  pthread_mutex_unlock(&LOCK_thread_count);
2176
2176
  mi->abort_slave= 0;
2177
2177
  mi->slave_running= 0;
2178
 
  mi->io_thd= 0;
 
2178
  mi->io_session= 0;
2179
2179
  /*
2180
2180
    Note: the order of the two following calls (first broadcast, then unlock)
2181
2181
    is important. Otherwise a killer_thread can execute between the calls and
2193
2193
 
2194
2194
pthread_handler_t handle_slave_sql(void *arg)
2195
2195
{
2196
 
  Session *thd;                     /* needs to be first for thread_stack */
 
2196
  Session *session;                     /* needs to be first for thread_stack */
2197
2197
  char llbuff[22],llbuff1[22];
2198
2198
 
2199
2199
  Relay_log_info* rli = &((Master_info*)arg)->rli;
2207
2207
  errmsg= 0;
2208
2208
  rli->events_till_abort = abort_slave_event_count;
2209
2209
 
2210
 
  thd = new Session;
2211
 
  thd->thread_stack = (char*)&thd; // remember where our stack is
2212
 
  rli->sql_thd= thd;
 
2210
  session = new Session;
 
2211
  session->thread_stack = (char*)&session; // remember where our stack is
 
2212
  rli->sql_session= session;
2213
2213
  
2214
2214
  /* Inform waiting threads that slave has started */
2215
2215
  rli->slave_run_id++;
2216
2216
  rli->slave_running = 1;
2217
2217
 
2218
2218
  pthread_detach_this_thread();
2219
 
  if (init_slave_thread(thd, SLAVE_Session_SQL))
 
2219
  if (init_slave_thread(session, SLAVE_Session_SQL))
2220
2220
  {
2221
2221
    /*
2222
2222
      TODO: this is currently broken - slave start and change master
2227
2227
    sql_print_error(_("Failed during slave thread initialization"));
2228
2228
    goto err;
2229
2229
  }
2230
 
  thd->init_for_queries();
2231
 
  thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
 
2230
  session->init_for_queries();
 
2231
  session->temporary_tables = rli->save_temporary_tables; // restore temp tables
2232
2232
  pthread_mutex_lock(&LOCK_thread_count);
2233
 
  threads.append(thd);
 
2233
  threads.append(session);
2234
2234
  pthread_mutex_unlock(&LOCK_thread_count);
2235
2235
  /*
2236
2236
    We are going to set slave_running to 1. Assuming slave I/O thread is
2272
2272
                    errmsg);
2273
2273
    goto err;
2274
2274
  }
2275
 
  Session_CHECK_SENTRY(thd);
 
2275
  Session_CHECK_SENTRY(session);
2276
2276
  assert(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
2277
2277
  /*
2278
2278
    Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
2287
2287
    assert().
2288
2288
  */
2289
2289
  assert(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
2290
 
  assert(rli->sql_thd == thd);
 
2290
  assert(rli->sql_session == session);
2291
2291
 
2292
2292
  if (global_system_variables.log_warnings)
2293
2293
    sql_print_information(_("Slave SQL thread initialized, "
2301
2301
  /* execute init_slave variable */
2302
2302
  if (sys_init_slave.value_length)
2303
2303
  {
2304
 
    execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
2305
 
    if (thd->is_slave_error)
 
2304
    execute_init_command(session, &sys_init_slave, &LOCK_sys_init_slave);
 
2305
    if (session->is_slave_error)
2306
2306
    {
2307
2307
      sql_print_error(_("Slave SQL thread aborted. "
2308
2308
                        "Can't execute init_slave query"));
2328
2328
 
2329
2329
  /* Read queries from the IO/THREAD until this thread is killed */
2330
2330
 
2331
 
  while (!sql_slave_killed(thd,rli))
 
2331
  while (!sql_slave_killed(session,rli))
2332
2332
  {
2333
 
    thd->set_proc_info(_("Reading event from the relay log"));
2334
 
    assert(rli->sql_thd == thd);
2335
 
    Session_CHECK_SENTRY(thd);
2336
 
    if (exec_relay_log_event(thd,rli))
 
2333
    session->set_proc_info(_("Reading event from the relay log"));
 
2334
    assert(rli->sql_session == session);
 
2335
    Session_CHECK_SENTRY(session);
 
2336
    if (exec_relay_log_event(session,rli))
2337
2337
    {
2338
2338
      // do not scare the user if SQL thread was simply killed or stopped
2339
 
      if (!sql_slave_killed(thd,rli))
 
2339
      if (!sql_slave_killed(session,rli))
2340
2340
      {
2341
2341
        /*
2342
 
          retrieve as much info as possible from the thd and, error
 
2342
          retrieve as much info as possible from the session and, error
2343
2343
          codes and warnings and print this to the error log as to
2344
2344
          allow the user to locate the error
2345
2345
        */
2346
2346
        uint32_t const last_errno= rli->last_error().number;
2347
2347
 
2348
 
        if (thd->is_error())
 
2348
        if (session->is_error())
2349
2349
        {
2350
 
          char const *const errmsg= thd->main_da.message();
 
2350
          char const *const errmsg= session->main_da.message();
2351
2351
 
2352
2352
          if (last_errno == 0)
2353
2353
          {
2354
 
            rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), "%s", errmsg);
 
2354
            rli->report(ERROR_LEVEL, session->main_da.sql_errno(), "%s", errmsg);
2355
2355
          }
2356
 
          else if (last_errno != thd->main_da.sql_errno())
 
2356
          else if (last_errno != session->main_da.sql_errno())
2357
2357
          {
2358
2358
            sql_print_error(_("Slave (additional info): %s Error_code: %d"),
2359
 
                            errmsg, thd->main_da.sql_errno());
 
2359
                            errmsg, session->main_da.sql_errno());
2360
2360
          }
2361
2361
        }
2362
2362
 
2363
2363
        /* Print any warnings issued */
2364
 
        List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
 
2364
        List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
2365
2365
        DRIZZLE_ERROR *err;
2366
2366
        /*
2367
2367
          Added controlled slave thread cancel for replication
2408
2408
    request is detected only by the present function, not by events), so we
2409
2409
    must "proactively" clear playgrounds:
2410
2410
  */
2411
 
  rli->cleanup_context(thd, 1);
 
2411
  rli->cleanup_context(session, 1);
2412
2412
  pthread_mutex_lock(&LOCK_thread_count);
2413
2413
  /*
2414
2414
    Some extra safety, which should not been needed (normally, event deletion
2415
2415
    should already have done these assignments (each event which sets these
2416
2416
    variables is supposed to set them to 0 before terminating)).
2417
2417
  */
2418
 
  thd->query= thd->db= thd->catalog= 0;
2419
 
  thd->query_length= thd->db_length= 0;
 
2418
  session->query= session->db= session->catalog= 0;
 
2419
  session->query_length= session->db_length= 0;
2420
2420
  pthread_mutex_unlock(&LOCK_thread_count);
2421
 
  thd->set_proc_info("Waiting for slave mutex on exit");
 
2421
  session->set_proc_info("Waiting for slave mutex on exit");
2422
2422
  pthread_mutex_lock(&rli->run_lock);
2423
2423
  /* We need data_lock, at least to wake up any waiting master_pos_wait() */
2424
2424
  pthread_mutex_lock(&rli->data_lock);
2432
2432
  pthread_mutex_unlock(&rli->data_lock);
2433
2433
  pthread_cond_broadcast(&rli->data_cond);
2434
2434
  rli->ignore_log_space_limit= 0; /* don't need any lock */
2435
 
  rli->save_temporary_tables = thd->temporary_tables;
 
2435
  rli->save_temporary_tables = session->temporary_tables;
2436
2436
 
2437
2437
  /*
2438
2438
    TODO: see if we can do this conditionally in next_event() instead
2439
2439
    to avoid unneeded position re-init
2440
2440
  */
2441
 
  thd->temporary_tables = 0; // remove tempation from destructor to close them
2442
 
  assert(thd->net.buff != 0);
2443
 
  net_end(&thd->net); // destructor will not free it, because we are weird
2444
 
  assert(rli->sql_thd == thd);
2445
 
  Session_CHECK_SENTRY(thd);
2446
 
  rli->sql_thd= 0;
 
2441
  session->temporary_tables = 0; // remove tempation from destructor to close them
 
2442
  assert(session->net.buff != 0);
 
2443
  net_end(&session->net); // destructor will not free it, because we are weird
 
2444
  assert(rli->sql_session == session);
 
2445
  Session_CHECK_SENTRY(session);
 
2446
  rli->sql_session= 0;
2447
2447
  pthread_mutex_lock(&LOCK_thread_count);
2448
 
  Session_CHECK_SENTRY(thd);
2449
 
  delete thd;
 
2448
  Session_CHECK_SENTRY(session);
 
2449
  delete session;
2450
2450
  pthread_mutex_unlock(&LOCK_thread_count);
2451
2451
 /*
2452
2452
  Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
2471
2471
  int32_t error = 1;
2472
2472
  uint32_t num_bytes;
2473
2473
  bool cev_not_written;
2474
 
  Session *thd = mi->io_thd;
 
2474
  Session *session = mi->io_session;
2475
2475
  NET *net = &mi->drizzle->net;
2476
2476
 
2477
2477
  if (unlikely(!cev->is_valid()))
2483
2483
    return(0);
2484
2484
  }
2485
2485
  assert(cev->inited_from_old);
2486
 
  thd->file_id = cev->file_id = mi->file_id++;
2487
 
  thd->server_id = cev->server_id;
 
2486
  session->file_id = cev->file_id = mi->file_id++;
 
2487
  session->server_id = cev->server_id;
2488
2488
  cev_not_written = 1;
2489
2489
 
2490
2490
  if (unlikely(net_request_file(net,cev->fname)))
2500
2500
    in the loop
2501
2501
  */
2502
2502
  {
2503
 
    Append_block_log_event aev(thd,0,0,0,0);
 
2503
    Append_block_log_event aev(session,0,0,0,0);
2504
2504
 
2505
2505
    for (;;)
2506
2506
    {
2522
2522
        */
2523
2523
        if (unlikely(cev_not_written))
2524
2524
          break;
2525
 
        Execute_load_log_event xev(thd,0,0);
 
2525
        Execute_load_log_event xev(session,0,0);
2526
2526
        xev.log_pos = cev->log_pos;
2527
2527
        if (unlikely(mi->rli.relay_log.append(&xev)))
2528
2528
        {
3075
3075
 
3076
3076
  SYNPOSIS
3077
3077
    safe_connect()
3078
 
    thd                 Thread handler for slave
 
3078
    session                 Thread handler for slave
3079
3079
    DRIZZLE               DRIZZLE connection handle
3080
3080
    mi                  Replication handle
3081
3081
 
3084
3084
    #   Error
3085
3085
*/
3086
3086
 
3087
 
static int32_t safe_connect(Session* thd, DRIZZLE *drizzle, Master_info* mi)
 
3087
static int32_t safe_connect(Session* session, DRIZZLE *drizzle, Master_info* mi)
3088
3088
{
3089
 
  return(connect_to_master(thd, drizzle, mi, 0, 0));
 
3089
  return(connect_to_master(session, drizzle, mi, 0, 0));
3090
3090
}
3091
3091
 
3092
3092
 
3099
3099
    master_retry_count times
3100
3100
*/
3101
3101
 
3102
 
static int32_t connect_to_master(Session* thd, DRIZZLE *drizzle, Master_info* mi,
 
3102
static int32_t connect_to_master(Session* session, DRIZZLE *drizzle, Master_info* mi,
3103
3103
                                 bool reconnect, bool suppress_warnings)
3104
3104
{
3105
3105
  int32_t slave_was_killed;
3115
3115
  drizzle_options(drizzle, DRIZZLE_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3116
3116
  drizzle_options(drizzle, DRIZZLE_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3117
3117
 
3118
 
  while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
 
3118
  while (!(slave_was_killed = io_slave_killed(session,mi)) &&
3119
3119
         (reconnect ? drizzle_reconnect(drizzle) != 0 :
3120
3120
          drizzle_connect(drizzle, mi->getHostname(), mi->getUsername(), mi->getPassword(), 0,
3121
3121
                             mi->getPort(), 0, client_flag) == 0))
3143
3143
      slave_was_killed=1;
3144
3144
      break;
3145
3145
    }
3146
 
    safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
 
3146
    safe_sleep(session,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3147
3147
               (void*)mi);
3148
3148
  }
3149
3149
 
3173
3173
    master_retry_count times
3174
3174
*/
3175
3175
 
3176
 
static int32_t safe_reconnect(Session* thd, DRIZZLE *drizzle, Master_info* mi,
 
3176
static int32_t safe_reconnect(Session* session, DRIZZLE *drizzle, Master_info* mi,
3177
3177
                          bool suppress_warnings)
3178
3178
{
3179
 
  return(connect_to_master(thd, drizzle, mi, 1, suppress_warnings));
 
3179
  return(connect_to_master(session, drizzle, mi, 1, suppress_warnings));
3180
3180
}
3181
3181
 
3182
3182
 
3249
3249
  IO_CACHE* cur_log = rli->cur_log;
3250
3250
  pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
3251
3251
  const char* errmsg=0;
3252
 
  Session* thd = rli->sql_thd;
 
3252
  Session* session = rli->sql_session;
3253
3253
 
3254
 
  assert(thd != 0);
 
3254
  assert(session != 0);
3255
3255
 
3256
3256
  if (abort_slave_event_count && !rli->events_till_abort--)
3257
3257
    return(0);
3265
3265
  */
3266
3266
  safe_mutex_assert_owner(&rli->data_lock);
3267
3267
 
3268
 
  while (!sql_slave_killed(thd,rli))
 
3268
  while (!sql_slave_killed(session,rli))
3269
3269
  {
3270
3270
    /*
3271
3271
      We can have two kinds of log reading:
3324
3324
                                      rli->relay_log.description_event_for_exec)))
3325
3325
 
3326
3326
    {
3327
 
      assert(thd==rli->sql_thd);
 
3327
      assert(session==rli->sql_session);
3328
3328
      /*
3329
3329
        read it while we have a lock, to avoid a mutex lock in
3330
3330
        inc_event_relay_log_pos()
3334
3334
        pthread_mutex_unlock(log_lock);
3335
3335
      return(ev);
3336
3336
    }
3337
 
    assert(thd==rli->sql_thd);
 
3337
    assert(session==rli->sql_session);
3338
3338
    if (opt_reckless_slave)                     // For mysql-test
3339
3339
      cur_log->error = 0;
3340
3340
    if (cur_log->error < 0)
3440
3440
        pthread_mutex_unlock(&rli->log_space_lock);
3441
3441
        pthread_cond_broadcast(&rli->log_space_cond);
3442
3442
        // Note that wait_for_update_relay_log unlocks lock_log !
3443
 
        rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
 
3443
        rli->relay_log.wait_for_update_relay_log(rli->sql_session);
3444
3444
        // re-acquire data lock since we released it earlier
3445
3445
        pthread_mutex_lock(&rli->data_lock);
3446
3446
        rli->last_master_timestamp= save_timestamp;
3710
3710
 
3711
3711
   Detect buggy master to work around.
3712
3712
 */
3713
 
bool rpl_master_erroneous_autoinc(Session *thd)
 
3713
bool rpl_master_erroneous_autoinc(Session *session)
3714
3714
{
3715
 
  if (active_mi && active_mi->rli.sql_thd == thd)
 
3715
  if (active_mi && active_mi->rli.sql_session == session)
3716
3716
  {
3717
3717
    Relay_log_info *rli= &active_mi->rli;
3718
3718
    return rpl_master_has_bug(rli, 33029, false);