~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/log_event.cc

  • Committer: Monty Taylor
  • Date: 2008-10-22 01:52:54 UTC
  • Revision ID: monty@inaugust.com-20081022015254-65qfk9f2v0b8jlk3
Moved drizzle_com to drizzled/drizzle_common. Started splitting it up.

Show diffs side-by-side

added added

removed removed

Lines of Context:
28
28
#include <mysys/base64.h>
29
29
#include <mysys/my_bitmap.h>
30
30
 
31
 
#include <libdrizzle/gettext.h>
 
31
#include <drizzled/gettext.h>
32
32
#include <libdrizzle/libdrizzle.h>
33
33
 
34
34
#define log_cs  &my_charset_utf8_general_ci
35
35
 
36
 
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
37
 
 
38
 
 
39
 
/*
40
 
  Size of buffer for printing a double in format %.<PREC>g
41
 
 
42
 
  optional '-' + optional zero + '.'  + PREC digits + 'e' + sign +
43
 
  exponent digits + '\0'
44
 
*/
45
 
#define FMT_G_BUFSIZE(PREC) (3 + (PREC) + 5 + 1)
46
 
 
47
36
 
48
37
static const char *HA_ERR(int i)
49
38
{
108
97
   @param level     error, warning or info
109
98
   @param ha_error  HA_ERR_ code
110
99
   @param rli       pointer to the active Relay_log_info instance
111
 
   @param thd       pointer to the slave thread's thd
 
100
   @param session       pointer to the slave thread's session
112
101
   @param table     pointer to the event's table object
113
102
   @param type      the type of the event
114
103
   @param log_name  the master binlog file name
116
105
 
117
106
*/
118
107
static void inline slave_rows_error_report(enum loglevel level, int ha_error,
119
 
                                           Relay_log_info const *rli, THD *thd,
 
108
                                           Relay_log_info const *rli, Session *session,
120
109
                                           Table *table, const char * type,
121
110
                                           const char *log_name, ulong pos)
122
111
{
124
113
  char buff[MAX_SLAVE_ERRMSG], *slider;
125
114
  const char *buff_end= buff + sizeof(buff);
126
115
  uint32_t len;
127
 
  List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
 
116
  List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
128
117
  DRIZZLE_ERROR *err;
129
118
  buff[0]= 0;
130
119
 
135
124
                  _(" %s, Error_code: %d;"), err->msg, err->code);
136
125
  }
137
126
  
138
 
  rli->report(level, thd->is_error()? thd->main_da.sql_errno() : 0,
 
127
  rli->report(level, session->is_error()? session->main_da.sql_errno() : 0,
139
128
              _("Could not execute %s event on table %s.%s;"
140
129
                "%s handler error %s; "
141
130
                "the event's master log %s, end_log_pos %lu"),
231
220
  pretty_print_str()
232
221
*/
233
222
 
234
 
static void clear_all_errors(THD *thd, Relay_log_info *rli)
 
223
static void clear_all_errors(Session *session, Relay_log_info *rli)
235
224
{
236
 
  thd->is_slave_error = 0;
237
 
  thd->clear_error();
 
225
  session->is_slave_error = 0;
 
226
  session->clear_error();
238
227
  rli->clear_error();
239
228
}
240
229
 
480
469
  Log_event::Log_event()
481
470
*/
482
471
 
483
 
Log_event::Log_event(THD* thd_arg, uint16_t flags_arg, bool using_trans)
484
 
  :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), thd(thd_arg)
 
472
Log_event::Log_event(Session* session_arg, uint16_t flags_arg, bool using_trans)
 
473
  :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), session(session_arg)
485
474
{
486
 
  server_id=    thd->server_id;
487
 
  when=         thd->start_time;
 
475
  server_id=    session->server_id;
 
476
  when=         session->start_time;
488
477
  cache_stmt=   using_trans;
489
478
}
490
479
 
491
480
 
492
481
/**
493
482
  This minimal constructor is for when you are not even sure that there
494
 
  is a valid THD. For example in the server when we are shutting down or
 
483
  is a valid Session. For example in the server when we are shutting down or
495
484
  flushing logs after receiving a SIGHUP (then we must write a Rotate to
496
 
  the binlog but we have no THD, so we need this minimal constructor).
 
485
  the binlog but we have no Session, so we need this minimal constructor).
497
486
*/
498
487
 
499
488
Log_event::Log_event()
500
489
  :temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
501
 
   thd(0)
 
490
   session(0)
502
491
{
503
492
  server_id=    ::server_id;
504
493
  /*
518
507
                     const Format_description_log_event* description_event)
519
508
  :temp_buf(0), cache_stmt(0)
520
509
{
521
 
  thd= 0;
 
510
  session= 0;
522
511
  when= uint4korr(buf);
523
512
  server_id= uint4korr(buf + SERVER_ID_OFFSET);
524
513
  data_written= uint4korr(buf + EVENT_LEN_OFFSET);
638
627
 
639
628
 
640
629
/**
641
 
  Only called by SHOW BINLOG EVENTS
642
 
*/
643
 
int Log_event::net_send(Protocol *protocol, const char* log_name, my_off_t pos)
644
 
{
645
 
  const char *p= strrchr(log_name, FN_LIBCHAR);
646
 
  const char *event_type;
647
 
  if (p)
648
 
    log_name = p + 1;
649
 
 
650
 
  protocol->prepare_for_resend();
651
 
  protocol->store(log_name, &my_charset_bin);
652
 
  protocol->store((uint64_t) pos);
653
 
  event_type = get_type_str();
654
 
  protocol->store(event_type, strlen(event_type), &my_charset_bin);
655
 
  protocol->store((uint32_t) server_id);
656
 
  protocol->store((uint64_t) log_pos);
657
 
  pack_info(protocol);
658
 
  return protocol->write();
659
 
}
660
 
 
661
 
 
662
 
/**
663
630
  init_show_field_list() prepares the column names and types for the
664
631
  output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
665
632
  EVENTS.
785
752
  }
786
753
  data_len= uint4korr(buf + EVENT_LEN_OFFSET);
787
754
  if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
788
 
      data_len > current_thd->variables.max_allowed_packet)
 
755
      data_len > current_session->variables.max_allowed_packet)
789
756
  {
790
757
    result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
791
758
             LOG_READ_TOO_LARGE);
869
836
  const char *error= 0;
870
837
  Log_event *res=  0;
871
838
#ifndef max_allowed_packet
872
 
  THD *thd=current_thd;
873
 
  uint32_t max_allowed_packet= thd ? thd->variables.max_allowed_packet : ~(ulong)0;
 
839
  Session *session=current_session;
 
840
  uint32_t max_allowed_packet= session ? session->variables.max_allowed_packet : ~(ulong)0;
874
841
#endif
875
842
 
876
843
  if (data_len > max_allowed_packet)
983
950
    case ROTATE_EVENT:
984
951
      ev = new Rotate_log_event(buf, event_len, description_event);
985
952
      break;
986
 
    case SLAVE_EVENT: /* can never happen (unused event) */
987
 
      ev = new Slave_log_event(buf, event_len);
988
 
      break;
989
953
    case CREATE_FILE_EVENT:
990
954
      ev = new Create_file_log_event(buf, event_len, description_event);
991
955
      break;
1317
1281
/*
1318
1282
  SYNOPSIS
1319
1283
    Query_log_event::Query_log_event()
1320
 
      thd_arg           - thread handle
 
1284
      session_arg           - thread handle
1321
1285
      query_arg         - array of char representing the query
1322
1286
      query_length      - size of the  `query_arg' array
1323
1287
      using_trans       - there is a modified transactional table
1324
1288
      suppress_use      - suppress the generation of 'USE' statements
1325
 
      killed_status_arg - an optional with default to THD::KILLED_NO_VALUE
 
1289
      killed_status_arg - an optional with default to Session::KILLED_NO_VALUE
1326
1290
                          if the value is different from the default, the arg
1327
 
                          is set to the current thd->killed value.
1328
 
                          A caller might need to masquerade thd->killed with
1329
 
                          THD::NOT_KILLED.
 
1291
                          is set to the current session->killed value.
 
1292
                          A caller might need to masquerade session->killed with
 
1293
                          Session::NOT_KILLED.
1330
1294
  DESCRIPTION
1331
1295
  Creates an event for binlogging
1332
1296
  The value for local `killed_status' can be supplied by caller.
1333
1297
*/
1334
 
Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
 
1298
Query_log_event::Query_log_event(Session* session_arg, const char* query_arg,
1335
1299
                                 ulong query_length, bool using_trans,
1336
1300
                                 bool suppress_use,
1337
 
                                 THD::killed_state killed_status_arg)
1338
 
  :Log_event(thd_arg,
1339
 
             (thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
 
1301
                                 Session::killed_state killed_status_arg)
 
1302
  :Log_event(session_arg,
 
1303
             (session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
1340
1304
              0) |
1341
1305
             (suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1342
1306
             using_trans),
1343
 
   data_buf(0), query(query_arg), catalog(thd_arg->catalog),
1344
 
   db(thd_arg->db), q_len((uint32_t) query_length),
1345
 
   thread_id(thd_arg->thread_id),
 
1307
   data_buf(0), query(query_arg), catalog(session_arg->catalog),
 
1308
   db(session_arg->db), q_len((uint32_t) query_length),
 
1309
   thread_id(session_arg->thread_id),
1346
1310
   /* save the original thread id; we already know the server id */
1347
 
   slave_proxy_id(thd_arg->variables.pseudo_thread_id),
 
1311
   slave_proxy_id(session_arg->variables.pseudo_thread_id),
1348
1312
   flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1349
1313
   sql_mode(0),
1350
 
   auto_increment_increment(thd_arg->variables.auto_increment_increment),
1351
 
   auto_increment_offset(thd_arg->variables.auto_increment_offset),
1352
 
   lc_time_names_number(thd_arg->variables.lc_time_names->number),
 
1314
   auto_increment_increment(session_arg->variables.auto_increment_increment),
 
1315
   auto_increment_offset(session_arg->variables.auto_increment_offset),
 
1316
   lc_time_names_number(session_arg->variables.lc_time_names->number),
1353
1317
   charset_database_number(0)
1354
1318
{
1355
1319
  time_t end_time;
1356
1320
 
1357
 
  if (killed_status_arg == THD::KILLED_NO_VALUE)
1358
 
    killed_status_arg= thd_arg->killed;
 
1321
  if (killed_status_arg == Session::KILLED_NO_VALUE)
 
1322
    killed_status_arg= session_arg->killed;
1359
1323
 
1360
1324
  error_code=
1361
 
    (killed_status_arg == THD::NOT_KILLED) ?
1362
 
    (thd_arg->is_error() ? thd_arg->main_da.sql_errno() : 0) :
1363
 
    (thd_arg->killed_errno());
 
1325
    (killed_status_arg == Session::NOT_KILLED) ?
 
1326
    (session_arg->is_error() ? session_arg->main_da.sql_errno() : 0) :
 
1327
    (session_arg->killed_errno());
1364
1328
  
1365
1329
  time(&end_time);
1366
 
  exec_time = (ulong) (end_time  - thd_arg->start_time);
 
1330
  exec_time = (ulong) (end_time  - session_arg->start_time);
1367
1331
  /**
1368
1332
    @todo this means that if we have no catalog, then it is replicated
1369
1333
    as an existing catalog of length zero. is that safe? /sven
1371
1335
  catalog_len = (catalog) ? (uint32_t) strlen(catalog) : 0;
1372
1336
  /* status_vars_len is set just before writing the event */
1373
1337
  db_len = (db) ? (uint32_t) strlen(db) : 0;
1374
 
  if (thd_arg->variables.collation_database != thd_arg->db_charset)
1375
 
    charset_database_number= thd_arg->variables.collation_database->number;
 
1338
  if (session_arg->variables.collation_database != session_arg->db_charset)
 
1339
    charset_database_number= session_arg->variables.collation_database->number;
1376
1340
  
1377
1341
  /*
1378
1342
    If we don't use flags2 for anything else than options contained in
1379
 
    thd_arg->options, it would be more efficient to flags2=thd_arg->options
 
1343
    session_arg->options, it would be more efficient to flags2=session_arg->options
1380
1344
    (OPTIONS_WRITTEN_TO_BIN_LOG would be used only at reading time).
1381
1345
    But it's likely that we don't want to use 32 bits for 3 bits; in the future
1382
1346
    we will probably want to reclaim the 29 bits. So we need the &.
1383
1347
  */
1384
 
  flags2= (uint32_t) (thd_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
1385
 
  assert(thd_arg->variables.character_set_client->number < 256*256);
1386
 
  assert(thd_arg->variables.collation_connection->number < 256*256);
1387
 
  assert(thd_arg->variables.collation_server->number < 256*256);
1388
 
  assert(thd_arg->variables.character_set_client->mbminlen == 1);
1389
 
  int2store(charset, thd_arg->variables.character_set_client->number);
1390
 
  int2store(charset+2, thd_arg->variables.collation_connection->number);
1391
 
  int2store(charset+4, thd_arg->variables.collation_server->number);
1392
 
  if (thd_arg->time_zone_used)
 
1348
  flags2= (uint32_t) (session_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
 
1349
  assert(session_arg->variables.character_set_client->number < 256*256);
 
1350
  assert(session_arg->variables.collation_connection->number < 256*256);
 
1351
  assert(session_arg->variables.collation_server->number < 256*256);
 
1352
  assert(session_arg->variables.character_set_client->mbminlen == 1);
 
1353
  int2store(charset, session_arg->variables.character_set_client->number);
 
1354
  int2store(charset+2, session_arg->variables.collation_connection->number);
 
1355
  int2store(charset+4, session_arg->variables.collation_server->number);
 
1356
  if (session_arg->time_zone_used)
1393
1357
  {
1394
1358
    /*
1395
1359
      Note that our event becomes dependent on the Time_zone object
1396
1360
      representing the time zone. Fortunately such objects are never deleted
1397
1361
      or changed during mysqld's lifetime.
1398
1362
    */
1399
 
    time_zone_len= thd_arg->variables.time_zone->get_name()->length();
1400
 
    time_zone_str= thd_arg->variables.time_zone->get_name()->ptr();
 
1363
    time_zone_len= session_arg->variables.time_zone->get_name()->length();
 
1364
    time_zone_str= session_arg->variables.time_zone->get_name()->ptr();
1401
1365
  }
1402
1366
  else
1403
1367
    time_zone_len= 0;
1579
1543
      auto_increment_offset=    uint2korr(pos+2);
1580
1544
      pos+= 4;
1581
1545
      break;
1582
 
    case Q_CHARSET_CODE:
1583
 
    {
1584
 
      CHECK_SPACE(pos, end, 6);
1585
 
      charset_inited= 1;
1586
 
      memcpy(charset, pos, 6);
1587
 
      pos+= 6;
1588
 
      break;
1589
 
    }
1590
1546
    case Q_TIME_ZONE_CODE:
1591
1547
    {
1592
1548
      if (get_str_len_and_pointer(&pos, &time_zone_str, &time_zone_len, end))
1680
1636
     sql_print_error("Slave: did not get the expected number of affected \
1681
1637
     rows running query from master - expected %d, got %d (this numbers \
1682
1638
     should have matched modulo 4294967296).", 0, ...);
1683
 
     thd->query_error = 1;
 
1639
     session->query_error = 1;
1684
1640
     }
1685
1641
  @endcode
1686
1642
  We may also want an option to tell the slave to ignore "affected"
1693
1649
  LEX_STRING new_db;
1694
1650
  int expected_error,actual_error= 0;
1695
1651
  /*
1696
 
    Colleagues: please never free(thd->catalog) in MySQL. This would
1697
 
    lead to bugs as here thd->catalog is a part of an alloced block,
 
1652
    Colleagues: please never free(session->catalog) in MySQL. This would
 
1653
    lead to bugs as here session->catalog is a part of an alloced block,
1698
1654
    not an entire alloced block (see
1699
 
    Query_log_event::do_apply_event()). Same for thd->db.  Thank
 
1655
    Query_log_event::do_apply_event()). Same for session->db.  Thank
1700
1656
    you.
1701
1657
  */
1702
 
  thd->catalog= catalog_len ? (char *) catalog : (char *)"";
 
1658
  session->catalog= catalog_len ? (char *) catalog : (char *)"";
1703
1659
  new_db.length= db_len;
1704
1660
  new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
1705
 
  thd->set_db(new_db.str, new_db.length);       /* allocates a copy of 'db' */
1706
 
  thd->variables.auto_increment_increment= auto_increment_increment;
1707
 
  thd->variables.auto_increment_offset=    auto_increment_offset;
 
1661
  session->set_db(new_db.str, new_db.length);       /* allocates a copy of 'db' */
 
1662
  session->variables.auto_increment_increment= auto_increment_increment;
 
1663
  session->variables.auto_increment_offset=    auto_increment_offset;
1708
1664
 
1709
1665
  /*
1710
1666
    InnoDB internally stores the master log position it has executed so far,
1718
1674
  */
1719
1675
  const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
1720
1676
 
1721
 
  clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
 
1677
  clear_all_errors(session, const_cast<Relay_log_info*>(rli));
1722
1678
  const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
1723
1679
 
1724
1680
  /*
1731
1687
            ::do_apply_event(), then the companion SET also have so
1732
1688
            we don't need to reset_one_shot_variables().
1733
1689
  */
1734
 
  if (rpl_filter->db_ok(thd->db))
 
1690
  if (rpl_filter->db_ok(session->db))
1735
1691
  {
1736
 
    thd->set_time((time_t)when);
1737
 
    thd->query_length= q_len_arg;
1738
 
    thd->query= (char*)query_arg;
 
1692
    session->set_time((time_t)when);
 
1693
    session->query_length= q_len_arg;
 
1694
    session->query= (char*)query_arg;
1739
1695
    pthread_mutex_lock(&LOCK_thread_count);
1740
 
    thd->query_id = next_query_id();
 
1696
    session->query_id = next_query_id();
1741
1697
    pthread_mutex_unlock(&LOCK_thread_count);
1742
 
    thd->variables.pseudo_thread_id= thread_id;         // for temp tables
 
1698
    session->variables.pseudo_thread_id= thread_id;             // for temp tables
1743
1699
 
1744
1700
    if (ignored_error_code((expected_error= error_code)) ||
1745
 
        !check_expected_error(thd,rli,expected_error))
 
1701
        !check_expected_error(session,rli,expected_error))
1746
1702
    {
1747
1703
      if (flags2_inited)
1748
1704
        /*
1749
 
          all bits of thd->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
 
1705
          all bits of session->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
1750
1706
          must take their value from flags2.
1751
1707
        */
1752
 
        thd->options= flags2|(thd->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
1753
 
      /*
1754
 
        else, we are in a 3.23/4.0 binlog; we previously received a
1755
 
        Rotate_log_event which reset thd->options and sql_mode etc, so
1756
 
        nothing to do.
1757
 
      */
1758
 
      if (charset_inited)
1759
 
      {
1760
 
        if (rli->cached_charset_compare(charset))
1761
 
        {
1762
 
          /* Verify that we support the charsets found in the event. */
1763
 
          if (!(thd->variables.character_set_client=
1764
 
                get_charset(uint2korr(charset), MYF(MY_WME))) ||
1765
 
              !(thd->variables.collation_connection=
1766
 
                get_charset(uint2korr(charset+2), MYF(MY_WME))) ||
1767
 
              !(thd->variables.collation_server=
1768
 
                get_charset(uint2korr(charset+4), MYF(MY_WME))))
1769
 
          {
1770
 
            /*
1771
 
              We updated the thd->variables with nonsensical values (0). Let's
1772
 
              set them to something safe (i.e. which avoids crash), and we'll
1773
 
              stop with EE_UNKNOWN_CHARSET in compare_errors (unless set to
1774
 
              ignore this error).
1775
 
            */
1776
 
            set_slave_thread_default_charset(thd, rli);
1777
 
            goto compare_errors;
1778
 
          }
1779
 
          thd->update_charset(); // for the charset change to take effect
1780
 
        }
1781
 
      }
 
1708
        session->options= flags2|(session->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
1782
1709
      if (time_zone_len)
1783
1710
      {
1784
1711
        String tmp(time_zone_str, time_zone_len, &my_charset_bin);
1785
 
        if (!(thd->variables.time_zone= my_tz_find(thd, &tmp)))
 
1712
        if (!(session->variables.time_zone= my_tz_find(session, &tmp)))
1786
1713
        {
1787
1714
          my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr());
1788
 
          thd->variables.time_zone= global_system_variables.time_zone;
 
1715
          session->variables.time_zone= global_system_variables.time_zone;
1789
1716
          goto compare_errors;
1790
1717
        }
1791
1718
      }
1792
1719
      if (lc_time_names_number)
1793
1720
      {
1794
 
        if (!(thd->variables.lc_time_names=
 
1721
        if (!(session->variables.lc_time_names=
1795
1722
              my_locale_by_number(lc_time_names_number)))
1796
1723
        {
1797
1724
          my_printf_error(ER_UNKNOWN_ERROR,
1798
1725
                      "Unknown locale: '%d'", MYF(0), lc_time_names_number);
1799
 
          thd->variables.lc_time_names= &my_locale_en_US;
 
1726
          session->variables.lc_time_names= &my_locale_en_US;
1800
1727
          goto compare_errors;
1801
1728
        }
1802
1729
      }
1803
1730
      else
1804
 
        thd->variables.lc_time_names= &my_locale_en_US;
 
1731
        session->variables.lc_time_names= &my_locale_en_US;
1805
1732
      if (charset_database_number)
1806
1733
      {
1807
1734
        const CHARSET_INFO *cs;
1812
1739
          my_error(ER_UNKNOWN_COLLATION, MYF(0), buf);
1813
1740
          goto compare_errors;
1814
1741
        }
1815
 
        thd->variables.collation_database= cs;
 
1742
        session->variables.collation_database= cs;
1816
1743
      }
1817
1744
      else
1818
 
        thd->variables.collation_database= thd->db_charset;
 
1745
        session->variables.collation_database= session->db_charset;
1819
1746
      
1820
1747
      /* Execute the query (note that we bypass dispatch_command()) */
1821
1748
      const char* found_semicolon= NULL;
1822
 
      mysql_parse(thd, thd->query, thd->query_length, &found_semicolon);
1823
 
      log_slow_statement(thd);
 
1749
      mysql_parse(session, session->query, session->query_length, &found_semicolon);
 
1750
      log_slow_statement(session);
1824
1751
    }
1825
1752
    else
1826
1753
    {
1831
1758
        we exit gracefully; otherwise we warn about the bad error and tell DBA
1832
1759
        to check/fix it.
1833
1760
      */
1834
 
      if (mysql_test_parse_for_slave(thd, thd->query, thd->query_length))
1835
 
        clear_all_errors(thd, const_cast<Relay_log_info*>(rli)); /* Can ignore query */
 
1761
      if (mysql_test_parse_for_slave(session, session->query, session->query_length))
 
1762
        clear_all_errors(session, const_cast<Relay_log_info*>(rli)); /* Can ignore query */
1836
1763
      else
1837
1764
      {
1838
1765
        rli->report(ERROR_LEVEL, expected_error,
1843
1770
                      "this query manually on the slave and then restart the "
1844
1771
                      "slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1; "
1845
1772
                      "START SLAVE; . Query: '%s'"),
1846
 
                    expected_error, thd->query);
1847
 
        thd->is_slave_error= 1;
 
1773
                    expected_error, session->query);
 
1774
        session->is_slave_error= 1;
1848
1775
      }
1849
1776
      goto end;
1850
1777
    }
1855
1782
      If we expected a non-zero error code, and we don't get the same error
1856
1783
      code, and none of them should be ignored.
1857
1784
    */
1858
 
    actual_error= thd->is_error() ? thd->main_da.sql_errno() : 0;
 
1785
    actual_error= session->is_error() ? session->main_da.sql_errno() : 0;
1859
1786
    if ((expected_error != actual_error) &&
1860
1787
        expected_error &&
1861
1788
        !ignored_error_code(actual_error) &&
1867
1794
                    "Default database: '%s'. Query: '%s'"),
1868
1795
                  ER_SAFE(expected_error),
1869
1796
                  expected_error,
1870
 
                  actual_error ? thd->main_da.message() : _("no error"),
 
1797
                  actual_error ? session->main_da.message() : _("no error"),
1871
1798
                  actual_error,
1872
1799
                  print_slave_db_safe(db), query_arg);
1873
 
      thd->is_slave_error= 1;
 
1800
      session->is_slave_error= 1;
1874
1801
    }
1875
1802
    /*
1876
1803
      If we get the same error code as expected, or they should be ignored. 
1878
1805
    else if (expected_error == actual_error ||
1879
1806
             ignored_error_code(actual_error))
1880
1807
    {
1881
 
      clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
1882
 
      thd->killed= THD::NOT_KILLED;
 
1808
      clear_all_errors(session, const_cast<Relay_log_info*>(rli));
 
1809
      session->killed= Session::NOT_KILLED;
1883
1810
    }
1884
1811
    /*
1885
1812
      Other cases: mostly we expected no error and get one.
1886
1813
    */
1887
 
    else if (thd->is_slave_error || thd->is_fatal_error)
 
1814
    else if (session->is_slave_error || session->is_fatal_error)
1888
1815
    {
1889
1816
      rli->report(ERROR_LEVEL, actual_error,
1890
1817
                  _("Error '%s' on query. Default database: '%s'. Query: '%s'"),
1891
 
                  (actual_error ? thd->main_da.message() :
 
1818
                  (actual_error ? session->main_da.message() :
1892
1819
                   _("unexpected success or fatal error")),
1893
 
                      print_slave_db_safe(thd->db), query_arg);
1894
 
      thd->is_slave_error= 1;
 
1820
                      print_slave_db_safe(session->db), query_arg);
 
1821
      session->is_slave_error= 1;
1895
1822
    }
1896
1823
 
1897
1824
    /*
1902
1829
      sql_print_error("Slave: did not get the expected number of affected \
1903
1830
      rows running query from master - expected %d, got %d (this numbers \
1904
1831
      should have matched modulo 4294967296).", 0, ...);
1905
 
      thd->is_slave_error = 1;
 
1832
      session->is_slave_error = 1;
1906
1833
      }
1907
1834
      We may also want an option to tell the slave to ignore "affected"
1908
1835
      mismatch. This mismatch could be implemented with a new ER_ code, and
1920
1847
end:
1921
1848
  pthread_mutex_lock(&LOCK_thread_count);
1922
1849
  /*
1923
 
    Probably we have set thd->query, thd->db, thd->catalog to point to places
 
1850
    Probably we have set session->query, session->db, session->catalog to point to places
1924
1851
    in the data_buf of this event. Now the event is going to be deleted
1925
 
    probably, so data_buf will be freed, so the thd->... listed above will be
 
1852
    probably, so data_buf will be freed, so the session->... listed above will be
1926
1853
    pointers to freed memory. 
1927
1854
    So we must set them to 0, so that those bad pointers values are not later
1928
1855
    used. Note that "cleanup" queries like automatic DROP TEMPORARY Table
1929
1856
    don't suffer from these assignments to 0 as DROP TEMPORARY
1930
1857
    Table uses the db.table syntax.
1931
1858
  */
1932
 
  thd->catalog= 0;
1933
 
  thd->set_db(NULL, 0);                 /* will free the current database */
1934
 
  thd->query= 0;                        // just to be sure
1935
 
  thd->query_length= 0;
 
1859
  session->catalog= 0;
 
1860
  session->set_db(NULL, 0);                 /* will free the current database */
 
1861
  session->query= 0;                    // just to be sure
 
1862
  session->query_length= 0;
1936
1863
  pthread_mutex_unlock(&LOCK_thread_count);
1937
 
  close_thread_tables(thd);      
 
1864
  close_thread_tables(session);      
1938
1865
  /*
1939
1866
    As a disk space optimization, future masters will not log an event for
1940
1867
    LAST_INSERT_ID() if that function returned 0 (and thus they will be able
1941
 
    to replace the THD::stmt_depends_on_first_successful_insert_id_in_prev_stmt
1942
 
    variable by (THD->first_successful_insert_id_in_prev_stmt > 0) ; with the
 
1868
    to replace the Session::stmt_depends_on_first_successful_insert_id_in_prev_stmt
 
1869
    variable by (Session->first_successful_insert_id_in_prev_stmt > 0) ; with the
1943
1870
    resetting below we are ready to support that.
1944
1871
  */
1945
 
  thd->first_successful_insert_id_in_prev_stmt_for_binlog= 0;
1946
 
  thd->first_successful_insert_id_in_prev_stmt= 0;
1947
 
  thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
1948
 
  free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
1949
 
  return thd->is_slave_error;
 
1872
  session->first_successful_insert_id_in_prev_stmt_for_binlog= 0;
 
1873
  session->first_successful_insert_id_in_prev_stmt= 0;
 
1874
  session->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
 
1875
  free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
 
1876
  return session->is_slave_error;
1950
1877
}
1951
1878
 
1952
1879
int Query_log_event::do_update_pos(Relay_log_info *rli)
1956
1883
    after a SET ONE_SHOT, because SET ONE_SHOT should not be separated
1957
1884
    from its following updating query.
1958
1885
  */
1959
 
  if (thd->one_shot_set)
 
1886
  if (session->one_shot_set)
1960
1887
  {
1961
1888
    rli->inc_event_relay_log_pos();
1962
1889
    return 0;
1975
1902
  {
1976
1903
    if (strcmp("BEGIN", query) == 0)
1977
1904
    {
1978
 
      thd->options|= OPTION_BEGIN;
 
1905
      session->options|= OPTION_BEGIN;
1979
1906
      return(Log_event::continue_group(rli));
1980
1907
    }
1981
1908
 
1982
1909
    if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
1983
1910
    {
1984
 
      thd->options&= ~OPTION_BEGIN;
 
1911
      session->options&= ~OPTION_BEGIN;
1985
1912
      return(Log_event::EVENT_SKIP_COUNT);
1986
1913
    }
1987
1914
  }
2086
2013
    */
2087
2014
    if (created)
2088
2015
    {
2089
 
      close_temporary_tables(thd);
 
2016
      close_temporary_tables(session);
2090
2017
      cleanup_load_tmpdir();
2091
2018
    }
2092
2019
    break;
2103
2030
        Can distinguish, based on the value of 'created': this event was
2104
2031
        generated at master startup.
2105
2032
      */
2106
 
      close_temporary_tables(thd);
 
2033
      close_temporary_tables(session);
2107
2034
    }
2108
2035
    /*
2109
2036
      Otherwise, can't distinguish a Start_log_event generated at
2401
2328
    original place when it comes to us; we'll know this by checking
2402
2329
    log_pos ("artificial" events have log_pos == 0).
2403
2330
  */
2404
 
  if (!artificial_event && created && thd->transaction.all.ha_list)
 
2331
  if (!artificial_event && created && session->transaction.all.ha_list)
2405
2332
  {
2406
2333
    /* This is not an error (XA is safe), just an information */
2407
2334
    rli->report(INFORMATION_LEVEL, 0,
2409
2336
                  "or ROLLBACK in relay log). A probable cause is that "
2410
2337
                  "the master died while writing the transaction to "
2411
2338
                  "its binary log, thus rolled back too."));
2412
 
    const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 1);
 
2339
    const_cast<Relay_log_info*>(rli)->cleanup_context(session, 1);
2413
2340
  }
2414
2341
  /*
2415
2342
    If this event comes from ourselves, there is no cleaning task to
2673
2600
  Load_log_event::Load_log_event()
2674
2601
*/
2675
2602
 
2676
 
Load_log_event::Load_log_event(THD *thd_arg, sql_exchange *ex,
 
2603
Load_log_event::Load_log_event(Session *session_arg, sql_exchange *ex,
2677
2604
                               const char *db_arg, const char *table_name_arg,
2678
2605
                               List<Item> &fields_arg,
2679
2606
                               enum enum_duplicates handle_dup,
2680
2607
                               bool ignore, bool using_trans)
2681
 
  :Log_event(thd_arg,
2682
 
             thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
 
2608
  :Log_event(session_arg,
 
2609
             session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
2683
2610
             using_trans),
2684
 
   thread_id(thd_arg->thread_id),
2685
 
   slave_proxy_id(thd_arg->variables.pseudo_thread_id),
 
2611
   thread_id(session_arg->thread_id),
 
2612
   slave_proxy_id(session_arg->variables.pseudo_thread_id),
2686
2613
   num_fields(0),fields(0),
2687
2614
   field_lens(0),field_block_len(0),
2688
2615
   table_name(table_name_arg ? table_name_arg : ""),
2690
2617
{
2691
2618
  time_t end_time;
2692
2619
  time(&end_time);
2693
 
  exec_time = (ulong) (end_time  - thd_arg->start_time);
 
2620
  exec_time = (ulong) (end_time  - session_arg->start_time);
2694
2621
  /* db can never be a zero pointer in 4.0 */
2695
2622
  db_len = (uint32_t) strlen(db);
2696
2623
  table_name_len = (uint32_t) strlen(table_name);
2891
2818
  LEX_STRING new_db;
2892
2819
  new_db.length= db_len;
2893
2820
  new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
2894
 
  thd->set_db(new_db.str, new_db.length);
2895
 
  assert(thd->query == 0);
2896
 
  thd->query_length= 0;                         // Should not be needed
2897
 
  thd->is_slave_error= 0;
2898
 
  clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
 
2821
  session->set_db(new_db.str, new_db.length);
 
2822
  assert(session->query == 0);
 
2823
  session->query_length= 0;                         // Should not be needed
 
2824
  session->is_slave_error= 0;
 
2825
  clear_all_errors(session, const_cast<Relay_log_info*>(rli));
2899
2826
 
2900
2827
  /* see Query_log_event::do_apply_event() and BUG#13360 */
2901
2828
  assert(!rli->m_table_map.count());
2903
2830
    Usually lex_start() is called by mysql_parse(), but we need it here
2904
2831
    as the present method does not call mysql_parse().
2905
2832
  */
2906
 
  lex_start(thd);
2907
 
  mysql_reset_thd_for_next_command(thd);
 
2833
  lex_start(session);
 
2834
  mysql_reset_session_for_next_command(session);
2908
2835
 
2909
2836
  if (!use_rli_only_for_errors)
2910
2837
  {
2938
2865
            ::do_apply_event(), then the companion SET also have so
2939
2866
            we don't need to reset_one_shot_variables().
2940
2867
  */
2941
 
  if (rpl_filter->db_ok(thd->db))
 
2868
  if (rpl_filter->db_ok(session->db))
2942
2869
  {
2943
 
    thd->set_time((time_t)when);
 
2870
    session->set_time((time_t)when);
2944
2871
    pthread_mutex_lock(&LOCK_thread_count);
2945
 
    thd->query_id = next_query_id();
 
2872
    session->query_id = next_query_id();
2946
2873
    pthread_mutex_unlock(&LOCK_thread_count);
2947
2874
    /*
2948
 
      Initing thd->row_count is not necessary in theory as this variable has no
 
2875
      Initing session->row_count is not necessary in theory as this variable has no
2949
2876
      influence in the case of the slave SQL thread (it is used to generate a
2950
2877
      "data truncated" warning but which is absorbed and never gets to the
2951
2878
      error log); still we init it to avoid a Valgrind message.
2952
2879
    */
2953
 
    drizzle_reset_errors(thd, 0);
 
2880
    drizzle_reset_errors(session, 0);
2954
2881
 
2955
2882
    TableList tables;
2956
2883
    memset(&tables, 0, sizeof(tables));
2957
 
    tables.db= thd->strmake(thd->db, thd->db_length);
 
2884
    tables.db= session->strmake(session->db, session->db_length);
2958
2885
    tables.alias = tables.table_name = (char*) table_name;
2959
2886
    tables.lock_type = TL_WRITE;
2960
2887
    tables.updating= 1;
2961
2888
 
2962
2889
    // the table will be opened in mysql_load    
2963
 
    if (rpl_filter->is_on() && !rpl_filter->tables_ok(thd->db, &tables))
 
2890
    if (rpl_filter->is_on() && !rpl_filter->tables_ok(session->db, &tables))
2964
2891
    {
2965
2892
      // TODO: this is a bug - this needs to be moved to the I/O thread
2966
2893
      if (net)
2978
2905
        Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST
2979
2906
        and written to slave's binlog if binlogging is on.
2980
2907
      */
2981
 
      if (!(load_data_query= (char *)thd->alloc(get_query_buffer_length() + 1)))
 
2908
      if (!(load_data_query= (char *)session->alloc(get_query_buffer_length() + 1)))
2982
2909
      {
2983
2910
        /*
2984
 
          This will set thd->fatal_error in case of OOM. So we surely will notice
 
2911
          This will set session->fatal_error in case of OOM. So we surely will notice
2985
2912
          that something is wrong.
2986
2913
        */
2987
2914
        goto error;
2988
2915
      }
2989
2916
 
2990
 
      print_query(false, load_data_query, &end, (char **)&thd->lex->fname_start,
2991
 
                  (char **)&thd->lex->fname_end);
 
2917
      print_query(false, load_data_query, &end, (char **)&session->lex->fname_start,
 
2918
                  (char **)&session->lex->fname_end);
2992
2919
      *end= 0;
2993
 
      thd->query_length= end - load_data_query;
2994
 
      thd->query= load_data_query;
 
2920
      session->query_length= end - load_data_query;
 
2921
      session->query= load_data_query;
2995
2922
 
2996
2923
      if (sql_ex.opt_flags & REPLACE_FLAG)
2997
2924
      {
3020
2947
        handle_dup= DUP_ERROR;
3021
2948
      }
3022
2949
      /*
3023
 
        We need to set thd->lex->sql_command and thd->lex->duplicates
 
2950
        We need to set session->lex->sql_command and session->lex->duplicates
3024
2951
        since InnoDB tests these variables to decide if this is a LOAD
3025
2952
        DATA ... REPLACE INTO ... statement even though mysql_parse()
3026
2953
        is not called.  This is not needed in 5.0 since there the LOAD
3027
2954
        DATA ... statement is replicated using mysql_parse(), which
3028
 
        sets the thd->lex fields correctly.
 
2955
        sets the session->lex fields correctly.
3029
2956
      */
3030
 
      thd->lex->sql_command= SQLCOM_LOAD;
3031
 
      thd->lex->duplicates= handle_dup;
 
2957
      session->lex->sql_command= SQLCOM_LOAD;
 
2958
      session->lex->duplicates= handle_dup;
3032
2959
 
3033
2960
      sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
3034
2961
      String field_term(sql_ex.field_term,sql_ex.field_term_len,log_cs);
3048
2975
 
3049
2976
      ex.skip_lines = skip_lines;
3050
2977
      List<Item> field_list;
3051
 
      thd->lex->select_lex.context.resolve_in_table_list_only(&tables);
3052
 
      set_fields(tables.db, field_list, &thd->lex->select_lex.context);
3053
 
      thd->variables.pseudo_thread_id= thread_id;
 
2978
      session->lex->select_lex.context.resolve_in_table_list_only(&tables);
 
2979
      set_fields(tables.db, field_list, &session->lex->select_lex.context);
 
2980
      session->variables.pseudo_thread_id= thread_id;
3054
2981
      if (net)
3055
2982
      {
3056
 
        // mysql_load will use thd->net to read the file
3057
 
        thd->net.vio = net->vio;
 
2983
        // mysql_load will use session->net to read the file
 
2984
        session->net.vio = net->vio;
3058
2985
        /*
3059
2986
          Make sure the client does not get confused about the packet sequence
3060
2987
        */
3061
 
        thd->net.pkt_nr = net->pkt_nr;
 
2988
        session->net.pkt_nr = net->pkt_nr;
3062
2989
      }
3063
2990
      /*
3064
2991
        It is safe to use tmp_list twice because we are not going to
3065
2992
        update it inside mysql_load().
3066
2993
      */
3067
2994
      List<Item> tmp_list;
3068
 
      if (mysql_load(thd, &ex, &tables, field_list, tmp_list, tmp_list,
 
2995
      if (mysql_load(session, &ex, &tables, field_list, tmp_list, tmp_list,
3069
2996
                     handle_dup, ignore, net != 0))
3070
 
        thd->is_slave_error= 1;
3071
 
      if (thd->cuted_fields)
 
2997
        session->is_slave_error= 1;
 
2998
      if (session->cuted_fields)
3072
2999
      {
3073
3000
        /* log_pos is the position of the LOAD event in the master log */
3074
3001
        sql_print_warning(_("Slave: load data infile on table '%s' at "
3076
3003
                          "warning(s). Default database: '%s'"),
3077
3004
                          (char*) table_name,
3078
3005
                          llstr(log_pos,llbuff), RPL_LOG_NAME, 
3079
 
                          (ulong) thd->cuted_fields,
3080
 
                          print_slave_db_safe(thd->db));
 
3006
                          (ulong) session->cuted_fields,
 
3007
                          print_slave_db_safe(session->db));
3081
3008
      }
3082
3009
      if (net)
3083
 
        net->pkt_nr= thd->net.pkt_nr;
 
3010
        net->pkt_nr= session->net.pkt_nr;
3084
3011
    }
3085
3012
  }
3086
3013
  else
3095
3022
  }
3096
3023
 
3097
3024
error:
3098
 
  thd->net.vio = 0; 
3099
 
  const char *remember_db= thd->db;
 
3025
  session->net.vio = 0; 
 
3026
  const char *remember_db= session->db;
3100
3027
  pthread_mutex_lock(&LOCK_thread_count);
3101
 
  thd->catalog= 0;
3102
 
  thd->set_db(NULL, 0);                   /* will free the current database */
3103
 
  thd->query= 0;
3104
 
  thd->query_length= 0;
 
3028
  session->catalog= 0;
 
3029
  session->set_db(NULL, 0);                   /* will free the current database */
 
3030
  session->query= 0;
 
3031
  session->query_length= 0;
3105
3032
  pthread_mutex_unlock(&LOCK_thread_count);
3106
 
  close_thread_tables(thd);
 
3033
  close_thread_tables(session);
3107
3034
 
3108
 
  if (thd->is_slave_error)
 
3035
  if (session->is_slave_error)
3109
3036
  {
3110
3037
    /* this err/sql_errno code is copy-paste from net_send_error() */
3111
3038
    const char *err;
3112
3039
    int sql_errno;
3113
 
    if (thd->is_error())
 
3040
    if (session->is_error())
3114
3041
    {
3115
 
      err= thd->main_da.message();
3116
 
      sql_errno= thd->main_da.sql_errno();
 
3042
      err= session->main_da.message();
 
3043
      sql_errno= session->main_da.sql_errno();
3117
3044
    }
3118
3045
    else
3119
3046
    {
3124
3051
                _("Error '%s' running LOAD DATA INFILE on table '%s'. "
3125
3052
                  "Default database: '%s'"),
3126
3053
                err, (char*)table_name, print_slave_db_safe(remember_db));
3127
 
    free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
 
3054
    free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
3128
3055
    return 1;
3129
3056
  }
3130
 
  free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
 
3057
  free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
3131
3058
 
3132
 
  if (thd->is_fatal_error)
 
3059
  if (session->is_fatal_error)
3133
3060
  {
3134
3061
    char buf[256];
3135
3062
    snprintf(buf, sizeof(buf),
3256
3183
  if ((server_id != ::server_id || rli->replicate_same_server_id) &&
3257
3184
      !rli->is_in_group())
3258
3185
  {
3259
 
    memcpy(rli->group_master_log_name, new_log_ident, ident_len+1);
 
3186
    rli->group_master_log_name.assign(new_log_ident, ident_len+1);
3260
3187
    rli->notify_group_master_log_name_update();
3261
3188
    rli->group_master_log_pos= pos;
3262
 
    strmake(rli->group_relay_log_name, rli->event_relay_log_name,
3263
 
            sizeof(rli->group_relay_log_name) - 1);
 
3189
    rli->group_relay_log_name.assign(rli->event_relay_log_name);
3264
3190
    rli->notify_group_relay_log_name_update();
3265
3191
    rli->group_relay_log_pos= rli->event_relay_log_pos;
3266
3192
    /*
3267
 
      Reset thd->options and sql_mode etc, because this could be the signal of
 
3193
      Reset session->options and sql_mode etc, because this could be the signal of
3268
3194
      a master's downgrade from 5.0 to 4.0.
3269
3195
      However, no need to reset description_event_for_exec: indeed, if the next
3270
3196
      master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
3271
3197
      master is 4.0 then the events are in the slave's format (conversion).
3272
3198
    */
3273
 
    set_slave_thread_options(thd);
3274
 
    set_slave_thread_default_charset(thd, rli);
3275
 
    thd->variables.auto_increment_increment=
3276
 
      thd->variables.auto_increment_offset= 1;
 
3199
    set_slave_thread_options(session);
 
3200
    session->variables.auto_increment_increment=
 
3201
      session->variables.auto_increment_offset= 1;
3277
3202
  }
3278
3203
  pthread_mutex_unlock(&rli->data_lock);
3279
3204
  pthread_cond_broadcast(&rli->data_cond);
3379
3304
 
3380
3305
  switch (type) {
3381
3306
  case LAST_INSERT_ID_EVENT:
3382
 
    thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1;
3383
 
    thd->first_successful_insert_id_in_prev_stmt= val;
 
3307
    session->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1;
 
3308
    session->first_successful_insert_id_in_prev_stmt= val;
3384
3309
    break;
3385
3310
  case INSERT_ID_EVENT:
3386
 
    thd->force_one_auto_inc_interval(val);
 
3311
    session->force_one_auto_inc_interval(val);
3387
3312
    break;
3388
3313
  }
3389
3314
  return 0;
3454
3379
   */
3455
3380
  const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3456
3381
 
3457
 
  thd->rand.seed1= (ulong) seed1;
3458
 
  thd->rand.seed2= (ulong) seed2;
 
3382
  session->rand.seed1= (ulong) seed1;
 
3383
  session->rand.seed2= (ulong) seed2;
3459
3384
  return 0;
3460
3385
}
3461
3386
 
3522
3447
 
3523
3448
int Xid_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
3524
3449
{
3525
 
  return end_trans(thd, COMMIT);
 
3450
  return end_trans(session, COMMIT);
3526
3451
}
3527
3452
 
3528
3453
Log_event::enum_skip_reason
3529
3454
Xid_log_event::do_shall_skip(Relay_log_info *rli)
3530
3455
{
3531
3456
  if (rli->slave_skip_counter > 0) {
3532
 
    thd->options&= ~OPTION_BEGIN;
 
3457
    session->options&= ~OPTION_BEGIN;
3533
3458
    return(Log_event::EVENT_SKIP_COUNT);
3534
3459
  }
3535
3460
  return(Log_event::do_shall_skip(rli));
3773
3698
    Item_func_set_user_var can't substitute something else on its place =>
3774
3699
    0 can be passed as last argument (reference on item)
3775
3700
  */
3776
 
  e.fix_fields(thd, 0);
 
3701
  e.fix_fields(session, 0);
3777
3702
  /*
3778
3703
    A variable can just be considered as a table with
3779
3704
    a single record and with a single column. Thus, like
3780
3705
    a column value, it could always have IMPLICIT derivation.
3781
3706
   */
3782
3707
  e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0);
3783
 
  free_root(thd->mem_root,0);
 
3708
  free_root(session->mem_root,0);
3784
3709
 
3785
3710
  return 0;
3786
3711
}
3814
3739
{
3815
3740
  char buf[256+HOSTNAME_LENGTH], *pos;
3816
3741
  pos= my_stpcpy(buf, "host=");
3817
 
  pos= my_stpncpy(pos, master_host, HOSTNAME_LENGTH);
 
3742
  pos= my_stpncpy(pos, master_host.c_str(), HOSTNAME_LENGTH);
3818
3743
  pos= my_stpcpy(pos, ",port=");
3819
3744
  pos= int10_to_str((long) master_port, pos, 10);
3820
3745
  pos= my_stpcpy(pos, ",log=");
3821
 
  pos= my_stpcpy(pos, master_log);
 
3746
  pos= my_stpcpy(pos, master_log.c_str());
3822
3747
  pos= my_stpcpy(pos, ",pos=");
3823
3748
  pos= int64_t10_to_str(master_pos, pos, 10);
3824
3749
  protocol->store(buf, pos-buf, &my_charset_bin);
3829
3754
  @todo
3830
3755
  re-write this better without holding both locks at the same time
3831
3756
*/
3832
 
Slave_log_event::Slave_log_event(THD* thd_arg,
 
3757
Slave_log_event::Slave_log_event(Session* session_arg,
3833
3758
                                 Relay_log_info* rli)
3834
 
  :Log_event(thd_arg, 0, 0) , mem_pool(0), master_host(0)
 
3759
  :Log_event(session_arg, 0, 0) , mem_pool(0), master_host(0)
3835
3760
{
3836
3761
  if (!rli->inited)                             // QQ When can this happen ?
3837
3762
    return;
3840
3765
  // TODO: re-write this better without holding both locks at the same time
3841
3766
  pthread_mutex_lock(&mi->data_lock);
3842
3767
  pthread_mutex_lock(&rli->data_lock);
3843
 
  master_host_len = strlen(mi->host);
3844
 
  master_log_len = strlen(rli->group_master_log_name);
3845
3768
  // on OOM, just do not initialize the structure and print the error
3846
3769
  if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
3847
3770
                                   MYF(MY_WME))))
3848
3771
  {
3849
 
    master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
3850
 
    memcpy(master_host, mi->host, master_host_len + 1);
3851
 
    master_log = master_host + master_host_len + 1;
3852
 
    memcpy(master_log, rli->group_master_log_name, master_log_len + 1);
3853
 
    master_port = mi->port;
 
3772
    master_host.assign(mi->getHostname());
 
3773
    master_log.assign(rli->group_master_log_name);
 
3774
    master_port = mi->getPort();
3854
3775
    master_pos = rli->group_master_log_pos;
3855
3776
  }
3856
3777
  else
3869
3790
 
3870
3791
int Slave_log_event::get_data_size()
3871
3792
{
3872
 
  return master_host_len + master_log_len + 1 + SL_MASTER_HOST_OFFSET;
 
3793
  return master_host.length() + master_log.length() + 1 + SL_MASTER_HOST_OFFSET;
3873
3794
}
3874
3795
 
3875
3796
 
3885
3806
}
3886
3807
 
3887
3808
 
3888
 
void Slave_log_event::init_from_mem_pool(int data_size)
 
3809
void Slave_log_event::init_from_mem_pool()
3889
3810
{
3890
3811
  master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
3891
3812
  master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
3892
 
  master_host = mem_pool + SL_MASTER_HOST_OFFSET;
3893
 
  master_host_len = strlen(master_host);
3894
 
  // safety
3895
 
  master_log = master_host + master_host_len + 1;
3896
 
  if (master_log > mem_pool + data_size)
3897
 
  {
3898
 
    master_host = 0;
3899
 
    return;
3900
 
  }
3901
 
  master_log_len = strlen(master_log);
3902
 
}
3903
 
 
3904
 
 
3905
 
/** This code is not used, so has not been updated to be format-tolerant. */
3906
 
Slave_log_event::Slave_log_event(const char* buf, uint32_t event_len)
3907
 
  :Log_event(buf,0) /*unused event*/ ,mem_pool(0),master_host(0)
3908
 
{
3909
 
  if (event_len < LOG_EVENT_HEADER_LEN)
3910
 
    return;
3911
 
  event_len -= LOG_EVENT_HEADER_LEN;
3912
 
  if (!(mem_pool = (char*) my_malloc(event_len + 1, MYF(MY_WME))))
3913
 
    return;
3914
 
  memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len);
3915
 
  mem_pool[event_len] = 0;
3916
 
  init_from_mem_pool(event_len);
 
3813
#ifdef FIXME
 
3814
  /* Assign these correctly */
 
3815
  master_host.assign(mem_pool + SL_MASTER_HOST_OFFSET);
 
3816
  master_log.assign();
 
3817
#endif
3917
3818
}
3918
3819
 
3919
3820
 
3949
3850
    could give false triggers in MASTER_POS_WAIT() that we have reached
3950
3851
    the target position when in fact we have not.
3951
3852
  */
3952
 
  if (thd->options & OPTION_BEGIN)
 
3853
  if (session->options & OPTION_BEGIN)
3953
3854
    rli->inc_event_relay_log_pos();
3954
3855
  else
3955
3856
  {
3969
3870
*/
3970
3871
 
3971
3872
Create_file_log_event::
3972
 
Create_file_log_event(THD* thd_arg, sql_exchange* ex,
 
3873
Create_file_log_event(Session* session_arg, sql_exchange* ex,
3973
3874
                      const char* db_arg, const char* table_name_arg,
3974
3875
                      List<Item>& fields_arg, enum enum_duplicates handle_dup,
3975
3876
                      bool ignore,
3976
3877
                      unsigned char* block_arg, uint32_t block_len_arg, bool using_trans)
3977
 
  :Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
 
3878
  :Load_log_event(session_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
3978
3879
                  using_trans),
3979
3880
   fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
3980
 
   file_id(thd_arg->file_id = mysql_bin_log.next_file_id())
 
3881
   file_id(session_arg->file_id = mysql_bin_log.next_file_id())
3981
3882
{
3982
3883
  sql_ex.force_new_format();
3983
3884
  return;
4114
4015
  memset(&file, 0, sizeof(file));
4115
4016
  fname_buf= my_stpcpy(proc_info, "Making temp file ");
4116
4017
  ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
4117
 
  thd_proc_info(thd, proc_info);
 
4018
  session->set_proc_info(proc_info);
4118
4019
  my_delete(fname_buf, MYF(0)); // old copy may exist already
4119
4020
  if ((fd= my_create(fname_buf, CREATE_MODE,
4120
 
                     O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
 
4021
                     O_WRONLY | O_EXCL,
4121
4022
                     MYF(MY_WME))) < 0 ||
4122
4023
      init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
4123
4024
                    MYF(MY_WME|MY_NABP)))
4145
4046
  // fname_buf now already has .data, not .info, because we did our trick
4146
4047
  my_delete(fname_buf, MYF(0)); // old copy may exist already
4147
4048
  if ((fd= my_create(fname_buf, CREATE_MODE,
4148
 
                     O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
 
4049
                     O_WRONLY | O_EXCL,
4149
4050
                     MYF(MY_WME))) < 0)
4150
4051
  {
4151
4052
    rli->report(ERROR_LEVEL, my_errno,
4167
4068
    end_io_cache(&file);
4168
4069
  if (fd >= 0)
4169
4070
    my_close(fd, MYF(0));
4170
 
  thd_proc_info(thd, 0);
 
4071
  session->set_proc_info(0);
4171
4072
  return error == 0;
4172
4073
}
4173
4074
 
4180
4081
  Append_block_log_event ctor
4181
4082
*/
4182
4083
 
4183
 
Append_block_log_event::Append_block_log_event(THD *thd_arg,
 
4084
Append_block_log_event::Append_block_log_event(Session *session_arg,
4184
4085
                                               const char *db_arg,
4185
4086
                                               unsigned char *block_arg,
4186
4087
                                               uint32_t block_len_arg,
4187
4088
                                               bool using_trans)
4188
 
  :Log_event(thd_arg,0, using_trans), block(block_arg),
4189
 
   block_len(block_len_arg), file_id(thd_arg->file_id), db(db_arg)
 
4089
  :Log_event(session_arg,0, using_trans), block(block_arg),
 
4090
   block_len(block_len_arg), file_id(session_arg->file_id), db(db_arg)
4190
4091
{
4191
4092
}
4192
4093
 
4261
4162
 
4262
4163
  fname= my_stpcpy(proc_info, "Making temp file ");
4263
4164
  slave_load_file_stem(fname, file_id, server_id, ".data");
4264
 
  thd_proc_info(thd, proc_info);
 
4165
  session->set_proc_info(proc_info);
4265
4166
  if (get_create_or_append())
4266
4167
  {
4267
4168
    my_delete(fname, MYF(0)); // old copy may exist already
4268
4169
    if ((fd= my_create(fname, CREATE_MODE,
4269
 
                       O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
 
4170
                       O_WRONLY | O_EXCL,
4270
4171
                       MYF(MY_WME))) < 0)
4271
4172
    {
4272
4173
      rli->report(ERROR_LEVEL, my_errno,
4275
4176
      goto err;
4276
4177
    }
4277
4178
  }
4278
 
  else if ((fd = my_open(fname, O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW,
 
4179
  else if ((fd = my_open(fname, O_WRONLY | O_APPEND,
4279
4180
                         MYF(MY_WME))) < 0)
4280
4181
  {
4281
4182
    rli->report(ERROR_LEVEL, my_errno,
4295
4196
err:
4296
4197
  if (fd >= 0)
4297
4198
    my_close(fd, MYF(0));
4298
 
  thd_proc_info(thd, 0);
 
4199
  session->set_proc_info(0);
4299
4200
  return(error);
4300
4201
}
4301
4202
 
4308
4209
  Delete_file_log_event ctor
4309
4210
*/
4310
4211
 
4311
 
Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg,
 
4212
Delete_file_log_event::Delete_file_log_event(Session *session_arg, const char* db_arg,
4312
4213
                                             bool using_trans)
4313
 
  :Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
 
4214
  :Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
4314
4215
{
4315
4216
}
4316
4217
 
4378
4279
  Execute_load_log_event ctor
4379
4280
*/
4380
4281
 
4381
 
Execute_load_log_event::Execute_load_log_event(THD *thd_arg,
 
4282
Execute_load_log_event::Execute_load_log_event(Session *session_arg,
4382
4283
                                               const char* db_arg,
4383
4284
                                               bool using_trans)
4384
 
  :Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
 
4285
  :Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
4385
4286
{
4386
4287
}
4387
4288
  
4442
4343
  Load_log_event *lev= 0;
4443
4344
 
4444
4345
  ext= slave_load_file_stem(fname, file_id, server_id, ".info");
4445
 
  if ((fd = my_open(fname, O_RDONLY | O_BINARY | O_NOFOLLOW,
 
4346
  if ((fd = my_open(fname, O_RDONLY,
4446
4347
                    MYF(MY_WME))) < 0 ||
4447
4348
      init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
4448
4349
                    MYF(MY_WME|MY_NABP)))
4464
4365
    goto err;
4465
4366
  }
4466
4367
 
4467
 
  lev->thd = thd;
 
4368
  lev->session = session;
4468
4369
  /*
4469
4370
    lev->do_apply_event should use rli only for errors i.e. should
4470
4371
    not advance rli's position.
4525
4426
**************************************************************************/
4526
4427
 
4527
4428
Begin_load_query_log_event::
4528
 
Begin_load_query_log_event(THD* thd_arg, const char* db_arg, unsigned char* block_arg,
 
4429
Begin_load_query_log_event(Session* session_arg, const char* db_arg, unsigned char* block_arg,
4529
4430
                           uint32_t block_len_arg, bool using_trans)
4530
 
  :Append_block_log_event(thd_arg, db_arg, block_arg, block_len_arg,
 
4431
  :Append_block_log_event(session_arg, db_arg, block_arg, block_len_arg,
4531
4432
                          using_trans)
4532
4433
{
4533
 
   file_id= thd_arg->file_id= mysql_bin_log.next_file_id();
 
4434
   file_id= session_arg->file_id= mysql_bin_log.next_file_id();
4534
4435
}
4535
4436
 
4536
4437
 
4565
4466
 
4566
4467
 
4567
4468
Execute_load_query_log_event::
4568
 
Execute_load_query_log_event(THD *thd_arg, const char* query_arg,
 
4469
Execute_load_query_log_event(Session *session_arg, const char* query_arg,
4569
4470
                             ulong query_length_arg, uint32_t fn_pos_start_arg,
4570
4471
                             uint32_t fn_pos_end_arg,
4571
4472
                             enum_load_dup_handling dup_handling_arg,
4572
4473
                             bool using_trans, bool suppress_use,
4573
 
                             THD::killed_state killed_err_arg):
4574
 
  Query_log_event(thd_arg, query_arg, query_length_arg, using_trans,
 
4474
                             Session::killed_state killed_err_arg):
 
4475
  Query_log_event(session_arg, query_arg, query_length_arg, using_trans,
4575
4476
                  suppress_use, killed_err_arg),
4576
 
  file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg),
 
4477
  file_id(session_arg->file_id), fn_pos_start(fn_pos_start_arg),
4577
4478
  fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
4578
4479
{
4579
4480
}
4796
4697
        Rows_log_event member functions
4797
4698
**************************************************************************/
4798
4699
 
4799
 
Rows_log_event::Rows_log_event(THD *thd_arg, Table *tbl_arg, ulong tid,
 
4700
Rows_log_event::Rows_log_event(Session *session_arg, Table *tbl_arg, ulong tid,
4800
4701
                               MY_BITMAP const *cols, bool is_transactional)
4801
 
  : Log_event(thd_arg, 0, is_transactional),
 
4702
  : Log_event(session_arg, 0, is_transactional),
4802
4703
    m_row_count(0),
4803
4704
    m_table(tbl_arg),
4804
4705
    m_table_id(tid),
4814
4715
   */
4815
4716
  assert((tbl_arg && tbl_arg->s && tid != UINT32_MAX) || (!tbl_arg && !cols && tid == UINT32_MAX));
4816
4717
 
4817
 
  if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
 
4718
  if (session_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
4818
4719
      set_flags(NO_FOREIGN_KEY_CHECKS_F);
4819
 
  if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
 
4720
  if (session_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
4820
4721
      set_flags(RELAXED_UNIQUE_CHECKS_F);
4821
4722
  /* if bitmap_init fails, caught in is_valid() */
4822
4723
  if (likely(!bitmap_init(&m_cols,
5032
4933
    assert(get_flags(STMT_END_F));
5033
4934
 
5034
4935
    const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5035
 
    close_thread_tables(thd);
5036
 
    thd->clear_error();
 
4936
    close_thread_tables(session);
 
4937
    session->clear_error();
5037
4938
    return(0);
5038
4939
  }
5039
4940
 
5040
4941
  /*
5041
 
    'thd' has been set by exec_relay_log_event(), just before calling
 
4942
    'session' has been set by exec_relay_log_event(), just before calling
5042
4943
    do_apply_event(). We still check here to prevent future coding
5043
4944
    errors.
5044
4945
  */
5045
 
  assert(rli->sql_thd == thd);
 
4946
  assert(rli->sql_session == session);
5046
4947
 
5047
4948
  /*
5048
4949
    If there is no locks taken, this is the first binrow event seen
5050
4951
    used in the transaction and proceed with execution of the actual
5051
4952
    event.
5052
4953
  */
5053
 
  if (!thd->lock)
 
4954
  if (!session->lock)
5054
4955
  {
5055
4956
    bool need_reopen= 1; /* To execute the first lap of the loop below */
5056
4957
 
5057
4958
    /*
5058
 
      lock_tables() reads the contents of thd->lex, so they must be
 
4959
      lock_tables() reads the contents of session->lex, so they must be
5059
4960
      initialized. Contrary to in
5060
4961
      Table_map_log_event::do_apply_event() we don't call
5061
4962
      mysql_init_query() as that may reset the binlog format.
5062
4963
    */
5063
 
    lex_start(thd);
 
4964
    lex_start(session);
5064
4965
 
5065
4966
    /*
5066
4967
      There are a few flags that are replicated with each row event.
5068
4969
      the event.
5069
4970
    */
5070
4971
    if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
5071
 
        thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
 
4972
        session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
5072
4973
    else
5073
 
        thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
 
4974
        session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
5074
4975
 
5075
4976
    if (get_flags(RELAXED_UNIQUE_CHECKS_F))
5076
 
        thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
 
4977
        session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
5077
4978
    else
5078
 
        thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
 
4979
        session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
5079
4980
    /* A small test to verify that objects have consistent types */
5080
 
    assert(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
5081
 
 
5082
 
 
5083
 
    while ((error= lock_tables(thd, rli->tables_to_lock,
 
4981
    assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
 
4982
 
 
4983
 
 
4984
    while ((error= lock_tables(session, rli->tables_to_lock,
5084
4985
                               rli->tables_to_lock_count, &need_reopen)))
5085
4986
    {
5086
4987
      if (!need_reopen)
5087
4988
      {
5088
 
        if (thd->is_slave_error || thd->is_fatal_error)
 
4989
        if (session->is_slave_error || session->is_fatal_error)
5089
4990
        {
5090
4991
          /*
5091
4992
            Error reporting borrowed from Query_log_event with many excessive
5092
4993
            simplifications (we don't honour --slave-skip-errors)
5093
4994
          */
5094
 
          uint32_t actual_error= thd->main_da.sql_errno();
 
4995
          uint32_t actual_error= session->main_da.sql_errno();
5095
4996
          rli->report(ERROR_LEVEL, actual_error,
5096
4997
                      _("Error '%s' in %s event: when locking tables"),
5097
4998
                      (actual_error
5098
 
                       ? thd->main_da.message()
 
4999
                       ? session->main_da.message()
5099
5000
                       : _("unexpected success or fatal error")),
5100
5001
                      get_type_str());
5101
 
          thd->is_fatal_error= 1;
 
5002
          session->is_fatal_error= 1;
5102
5003
        }
5103
5004
        else
5104
5005
        {
5123
5024
        NOTE: For this new scheme there should be no pending event:
5124
5025
        need to add code to assert that is the case.
5125
5026
       */
5126
 
      thd->binlog_flush_pending_rows_event(false);
 
5027
      session->binlog_flush_pending_rows_event(false);
5127
5028
      TableList *tables= rli->tables_to_lock;
5128
 
      close_tables_for_reopen(thd, &tables);
 
5029
      close_tables_for_reopen(session, &tables);
5129
5030
 
5130
5031
      uint32_t tables_count= rli->tables_to_lock_count;
5131
 
      if ((error= open_tables(thd, &tables, &tables_count, 0)))
 
5032
      if ((error= open_tables(session, &tables, &tables_count, 0)))
5132
5033
      {
5133
 
        if (thd->is_slave_error || thd->is_fatal_error)
 
5034
        if (session->is_slave_error || session->is_fatal_error)
5134
5035
        {
5135
5036
          /*
5136
5037
            Error reporting borrowed from Query_log_event with many excessive
5137
5038
            simplifications (we don't honour --slave-skip-errors)
5138
5039
          */
5139
 
          uint32_t actual_error= thd->main_da.sql_errno();
 
5040
          uint32_t actual_error= session->main_da.sql_errno();
5140
5041
          rli->report(ERROR_LEVEL, actual_error,
5141
5042
                      _("Error '%s' on reopening tables"),
5142
5043
                      (actual_error
5143
 
                       ? thd->main_da.message()
 
5044
                       ? session->main_da.message()
5144
5045
                       : _("unexpected success or fatal error")));
5145
 
          thd->is_slave_error= 1;
 
5046
          session->is_slave_error= 1;
5146
5047
        }
5147
5048
        const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5148
5049
        return(error);
5163
5064
      {
5164
5065
        if (ptr->m_tabledef.compatible_with(rli, ptr->table))
5165
5066
        {
5166
 
          mysql_unlock_tables(thd, thd->lock);
5167
 
          thd->lock= 0;
5168
 
          thd->is_slave_error= 1;
 
5067
          mysql_unlock_tables(session, session->lock);
 
5068
          session->lock= 0;
 
5069
          session->is_slave_error= 1;
5169
5070
          const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5170
5071
          return(ERR_BAD_TABLE_DEF);
5171
5072
        }
5212
5113
      TIMESTAMP column to a table with one.
5213
5114
      So we call set_time(), like in SBR. Presently it changes nothing.
5214
5115
    */
5215
 
    thd->set_time((time_t)when);
 
5116
    session->set_time((time_t)when);
5216
5117
    /*
5217
5118
      There are a few flags that are replicated with each row event.
5218
5119
      Make sure to set/clear them before executing the main body of
5219
5120
      the event.
5220
5121
    */
5221
5122
    if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
5222
 
        thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
 
5123
        session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
5223
5124
    else
5224
 
        thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
 
5125
        session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
5225
5126
 
5226
5127
    if (get_flags(RELAXED_UNIQUE_CHECKS_F))
5227
 
        thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
 
5128
        session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
5228
5129
    else
5229
 
        thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
 
5130
        session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
5230
5131
    
5231
5132
    if (slave_allow_batching)
5232
 
      thd->options|= OPTION_ALLOW_BATCH;
 
5133
      session->options|= OPTION_ALLOW_BATCH;
5233
5134
    else
5234
 
      thd->options&= ~OPTION_ALLOW_BATCH;
 
5135
      session->options&= ~OPTION_ALLOW_BATCH;
5235
5136
    
5236
5137
    /* A small test to verify that objects have consistent types */
5237
 
    assert(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
 
5138
    assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
5238
5139
 
5239
5140
    /*
5240
5141
      Now we are in a statement and will stay in a statement until we
5275
5176
    while (error == 0 && m_curr_row < m_rows_end)
5276
5177
    {
5277
5178
      /* in_use can have been set to NULL in close_tables_for_reopen */
5278
 
      THD* old_thd= table->in_use;
 
5179
      Session* old_session= table->in_use;
5279
5180
      if (!table->in_use)
5280
 
        table->in_use= thd;
 
5181
        table->in_use= session;
5281
5182
 
5282
5183
      error= do_exec_row(rli);
5283
5184
 
5284
 
      table->in_use = old_thd;
 
5185
      table->in_use = old_session;
5285
5186
      switch (error)
5286
5187
      {
5287
5188
      case 0:
5311
5212
        if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5312
5213
        {
5313
5214
          if (global_system_variables.log_warnings)
5314
 
            slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
 
5215
            slave_rows_error_report(WARNING_LEVEL, error, rli, session, table,
5315
5216
                                    get_type_str(),
5316
5217
                                    RPL_LOG_NAME, (ulong) log_pos);
5317
5218
          error= 0;
5319
5220
        break;
5320
5221
        
5321
5222
      default:
5322
 
        thd->is_slave_error= 1;
 
5223
        session->is_slave_error= 1;
5323
5224
        break;
5324
5225
      }
5325
5226
 
5344
5245
    error= do_after_row_operations(rli, error);
5345
5246
    if (!cache_stmt)
5346
5247
    {
5347
 
      thd->options|= OPTION_KEEP_LOG;
 
5248
      session->options|= OPTION_KEEP_LOG;
5348
5249
    }
5349
5250
  } // if (table)
5350
5251
 
5355
5256
  if (rli->tables_to_lock && get_flags(STMT_END_F))
5356
5257
    const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5357
5258
  /* reset OPTION_ALLOW_BATCH as not affect later events */
5358
 
  thd->options&= ~OPTION_ALLOW_BATCH;
 
5259
  session->options&= ~OPTION_ALLOW_BATCH;
5359
5260
  
5360
5261
  if (error)
5361
5262
  {                     /* error has occured during the transaction */
5362
 
    slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
 
5263
    slave_rows_error_report(ERROR_LEVEL, error, rli, session, table,
5363
5264
                            get_type_str(), RPL_LOG_NAME, (ulong) log_pos);
5364
5265
  }
5365
5266
  if (error)
5375
5276
      thread is certainly going to stop.
5376
5277
      rollback at the caller along with sbr.
5377
5278
    */
5378
 
    thd->reset_current_stmt_binlog_row_based();
5379
 
    const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
5380
 
    thd->is_slave_error= 1;
 
5279
    session->reset_current_stmt_binlog_row_based();
 
5280
    const_cast<Relay_log_info*>(rli)->cleanup_context(session, error);
 
5281
    session->is_slave_error= 1;
5381
5282
    return(error);
5382
5283
  }
5383
5284
 
5447
5348
      (assume the last master's transaction is ignored by the slave because of
5448
5349
      replicate-ignore rules).
5449
5350
    */
5450
 
    thd->binlog_flush_pending_rows_event(true);
 
5351
    session->binlog_flush_pending_rows_event(true);
5451
5352
 
5452
5353
    /*
5453
5354
      If this event is not in a transaction, the call below will, if some
5458
5359
      are involved, commit the transaction and flush the pending event to the
5459
5360
      binlog.
5460
5361
    */
5461
 
    error= ha_autocommit_or_rollback(thd, 0);
 
5362
    error= ha_autocommit_or_rollback(session, 0);
5462
5363
 
5463
5364
    /*
5464
5365
      Now what if this is not a transactional engine? we still need to
5465
5366
      flush the pending event to the binlog; we did it with
5466
 
      thd->binlog_flush_pending_rows_event(). Note that we imitate
 
5367
      session->binlog_flush_pending_rows_event(). Note that we imitate
5467
5368
      what is done for real queries: a call to
5468
5369
      ha_autocommit_or_rollback() (sometimes only if involves a
5469
5370
      transactional engine), and a call to be sure to have the pending
5470
5371
      event flushed.
5471
5372
    */
5472
5373
 
5473
 
    thd->reset_current_stmt_binlog_row_based();
 
5374
    session->reset_current_stmt_binlog_row_based();
5474
5375
 
5475
 
    rli->cleanup_context(thd, 0);
 
5376
    rli->cleanup_context(session, 0);
5476
5377
    if (error == 0)
5477
5378
    {
5478
5379
      /*
5483
5384
      rli->stmt_done(log_pos, when);
5484
5385
 
5485
5386
      /*
5486
 
        Clear any errors pushed in thd->net.last_err* if for example "no key
 
5387
        Clear any errors pushed in session->net.last_err* if for example "no key
5487
5388
        found" (as this is allowed). This is a safety measure; apparently
5488
5389
        those errors (e.g. when executing a Delete_rows_log_event of a
5489
5390
        non-existing row, like in rpl_row_mystery22.test,
5490
 
        thd->net.last_error = "Can't find record in 't1'" and last_errno=1032)
 
5391
        session->net.last_error = "Can't find record in 't1'" and last_errno=1032)
5491
5392
        do not become visible. We still prefer to wipe them out.
5492
5393
      */
5493
 
      thd->clear_error();
 
5394
      session->clear_error();
5494
5395
    }
5495
5396
    else
5496
5397
      rli->report(ERROR_LEVEL, error,
5631
5532
  Mats says tbl->s lives longer than this event so it's ok to copy pointers
5632
5533
  (tbl->s->db etc) and not pointer content.
5633
5534
 */
5634
 
Table_map_log_event::Table_map_log_event(THD *thd, Table *tbl, ulong tid,
 
5535
Table_map_log_event::Table_map_log_event(Session *session, Table *tbl, ulong tid,
5635
5536
                                         bool is_transactional __attribute__((unused)),
5636
5537
                                         uint16_t flags)
5637
 
  : Log_event(thd, 0, true),
 
5538
  : Log_event(session, 0, true),
5638
5539
    m_table(tbl),
5639
5540
    m_dbnam(tbl->s->db.str),
5640
5541
    m_dblen(m_dbnam ? tbl->s->db.length : 0),
5827
5728
  char *db_mem, *tname_mem;
5828
5729
  size_t dummy_len;
5829
5730
  void *memory;
5830
 
  assert(rli->sql_thd == thd);
 
5731
  assert(rli->sql_session == session);
5831
5732
 
5832
5733
  /* Step the query id to mark what columns that are actually used. */
5833
5734
  pthread_mutex_lock(&LOCK_thread_count);
5834
 
  thd->query_id= next_query_id();
 
5735
  session->query_id= next_query_id();
5835
5736
  pthread_mutex_unlock(&LOCK_thread_count);
5836
5737
 
5837
5738
  if (!(memory= my_multi_malloc(MYF(MY_WME),
5861
5762
  else
5862
5763
  {
5863
5764
    /*
5864
 
      open_tables() reads the contents of thd->lex, so they must be
 
5765
      open_tables() reads the contents of session->lex, so they must be
5865
5766
      initialized, so we should call lex_start(); to be even safer, we
5866
5767
      call mysql_init_query() which does a more complete set of inits.
5867
5768
    */
5868
 
    lex_start(thd);
5869
 
    mysql_reset_thd_for_next_command(thd);
 
5769
    lex_start(session);
 
5770
    mysql_reset_session_for_next_command(session);
5870
5771
    /*
5871
5772
      Check if the slave is set to use SBR.  If so, it should switch
5872
5773
      to using RBR until the end of the "statement", i.e., next
5873
5774
      STMT_END_F or next error.
5874
5775
    */
5875
 
    if (!thd->current_stmt_binlog_row_based &&
5876
 
        mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG))
 
5776
    if (!session->current_stmt_binlog_row_based &&
 
5777
        mysql_bin_log.is_open() && (session->options & OPTION_BIN_LOG))
5877
5778
    {
5878
 
      thd->set_current_stmt_binlog_row_based();
 
5779
      session->set_current_stmt_binlog_row_based();
5879
5780
    }
5880
5781
 
5881
5782
    /*
5886
5787
      The creation of a new TableList is used to up-cast the
5887
5788
      table_list consisting of RPL_TableList items. This will work
5888
5789
      since the only case where the argument to open_tables() is
5889
 
      changed, is when thd->lex->query_tables == table_list, i.e.,
 
5790
      changed, is when session->lex->query_tables == table_list, i.e.,
5890
5791
      when the statement requires prelocking. Since this is not
5891
5792
      executed when a statement is executed, this case will not occur.
5892
5793
      As a precaution, an assertion is added to ensure that the bad
5897
5798
      of the pointer to make sure that it's not lost.
5898
5799
    */
5899
5800
    uint32_t count;
5900
 
    assert(thd->lex->query_tables != table_list);
 
5801
    assert(session->lex->query_tables != table_list);
5901
5802
    TableList *tmp_table_list= table_list;
5902
 
    if ((error= open_tables(thd, &tmp_table_list, &count, 0)))
 
5803
    if ((error= open_tables(session, &tmp_table_list, &count, 0)))
5903
5804
    {
5904
 
      if (thd->is_slave_error || thd->is_fatal_error)
 
5805
      if (session->is_slave_error || session->is_fatal_error)
5905
5806
      {
5906
5807
        /*
5907
5808
          Error reporting borrowed from Query_log_event with many excessive
5908
5809
          simplifications (we don't honour --slave-skip-errors)
5909
5810
        */
5910
 
        uint32_t actual_error= thd->main_da.sql_errno();
 
5811
        uint32_t actual_error= session->main_da.sql_errno();
5911
5812
        rli->report(ERROR_LEVEL, actual_error,
5912
5813
                    _("Error '%s' on opening table `%s`.`%s`"),
5913
5814
                    (actual_error
5914
 
                     ? thd->main_da.message()
 
5815
                     ? session->main_da.message()
5915
5816
                     : _("unexpected success or fatal error")),
5916
5817
                    table_list->db, table_list->table_name);
5917
 
        thd->is_slave_error= 1;
 
5818
        session->is_slave_error= 1;
5918
5819
      }
5919
5820
      goto err;
5920
5821
    }
6038
5939
/*
6039
5940
  Constructor used to build an event for writing to the binary log.
6040
5941
 */
6041
 
Write_rows_log_event::Write_rows_log_event(THD *thd_arg, Table *tbl_arg,
 
5942
Write_rows_log_event::Write_rows_log_event(Session *session_arg, Table *tbl_arg,
6042
5943
                                           ulong tid_arg,
6043
5944
                                           bool is_transactional)
6044
 
  : Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
 
5945
  : Rows_log_event(session_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
6045
5946
{
6046
5947
}
6047
5948
 
6073
5974
    */
6074
5975
    
6075
5976
    /* Tell the storage engine that we are using REPLACE semantics. */
6076
 
    thd->lex->duplicates= DUP_REPLACE;
 
5977
    session->lex->duplicates= DUP_REPLACE;
6077
5978
    
6078
5979
    /*
6079
5980
      Pretend we're executing a REPLACE command: this is needed for
6080
5981
      InnoDB since it is not (properly) checking the
6081
5982
      lex->duplicates flag.
6082
5983
    */
6083
 
    thd->lex->sql_command= SQLCOM_REPLACE;
 
5984
    session->lex->sql_command= SQLCOM_REPLACE;
6084
5985
    /* 
6085
5986
       Do not raise the error flag in case of hitting to an unique attribute
6086
5987
    */
6209
6110
Rows_log_event::write_row(const Relay_log_info *const rli,
6210
6111
                          const bool overwrite)
6211
6112
{
6212
 
  assert(m_table != NULL && thd != NULL);
 
6113
  assert(m_table != NULL && session != NULL);
6213
6114
 
6214
6115
  Table *table= m_table;  // pointer to event's table
6215
6116
  int error;
6389
6290
    write_row(rli,        /* if 1 then overwrite */
6390
6291
              bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
6391
6292
    
6392
 
  if (error && !thd->is_error())
 
6293
  if (error && !session->is_error())
6393
6294
  {
6394
6295
    assert(0);
6395
6296
    my_error(ER_UNKNOWN_ERROR, MYF(0));
6704
6605
  Constructor used to build an event for writing to the binary log.
6705
6606
 */
6706
6607
 
6707
 
Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, Table *tbl_arg,
 
6608
Delete_rows_log_event::Delete_rows_log_event(Session *session_arg, Table *tbl_arg,
6708
6609
                                             ulong tid,
6709
6610
                                             bool is_transactional)
6710
 
  : Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
 
6611
  : Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
6711
6612
{
6712
6613
}
6713
6614
 
6780
6681
/*
6781
6682
  Constructor used to build an event for writing to the binary log.
6782
6683
 */
6783
 
Update_rows_log_event::Update_rows_log_event(THD *thd_arg, Table *tbl_arg,
 
6684
Update_rows_log_event::Update_rows_log_event(Session *session_arg, Table *tbl_arg,
6784
6685
                                             ulong tid,
6785
6686
                                             bool is_transactional)
6786
 
: Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
 
6687
: Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
6787
6688
{
6788
6689
  init(tbl_arg->write_set);
6789
6690
}