~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/log_event.cc

  • Committer: Monty Taylor
  • Date: 2008-10-23 23:53:49 UTC
  • mto: This revision was merged to the branch mainline in revision 557.
  • Revision ID: monty@inaugust.com-20081023235349-317wgwqwgccuacmq
SplitĀ outĀ nested_join.h.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
#include "rpl_utility.h"
22
22
#include "rpl_record.h"
23
23
#include <mysys/my_dir.h>
24
 
#include <drizzled/drizzled_error_messages.h>
 
24
#include <drizzled/error.h>
 
25
#include <libdrizzle/pack.h>
25
26
 
26
27
#include <algorithm>
27
28
 
28
29
#include <mysys/base64.h>
29
30
#include <mysys/my_bitmap.h>
30
31
 
31
 
#include <libdrizzle/gettext.h>
 
32
#include <drizzled/gettext.h>
32
33
#include <libdrizzle/libdrizzle.h>
 
34
#include <drizzled/error.h>
33
35
 
34
36
#define log_cs  &my_charset_utf8_general_ci
35
37
 
36
 
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
37
 
 
38
 
 
39
 
/*
40
 
  Size of buffer for printing a double in format %.<PREC>g
41
 
 
42
 
  optional '-' + optional zero + '.'  + PREC digits + 'e' + sign +
43
 
  exponent digits + '\0'
44
 
*/
45
 
#define FMT_G_BUFSIZE(PREC) (3 + (PREC) + 5 + 1)
46
 
 
47
38
 
48
39
static const char *HA_ERR(int i)
49
40
{
108
99
   @param level     error, warning or info
109
100
   @param ha_error  HA_ERR_ code
110
101
   @param rli       pointer to the active Relay_log_info instance
111
 
   @param thd       pointer to the slave thread's thd
 
102
   @param session       pointer to the slave thread's session
112
103
   @param table     pointer to the event's table object
113
104
   @param type      the type of the event
114
105
   @param log_name  the master binlog file name
116
107
 
117
108
*/
118
109
static void inline slave_rows_error_report(enum loglevel level, int ha_error,
119
 
                                           Relay_log_info const *rli, THD *thd,
 
110
                                           Relay_log_info const *rli, Session *session,
120
111
                                           Table *table, const char * type,
121
112
                                           const char *log_name, ulong pos)
122
113
{
124
115
  char buff[MAX_SLAVE_ERRMSG], *slider;
125
116
  const char *buff_end= buff + sizeof(buff);
126
117
  uint32_t len;
127
 
  List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
 
118
  List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
128
119
  DRIZZLE_ERROR *err;
129
120
  buff[0]= 0;
130
121
 
135
126
                  _(" %s, Error_code: %d;"), err->msg, err->code);
136
127
  }
137
128
  
138
 
  rli->report(level, thd->is_error()? thd->main_da.sql_errno() : 0,
 
129
  rli->report(level, session->is_error()? session->main_da.sql_errno() : 0,
139
130
              _("Could not execute %s event on table %s.%s;"
140
131
                "%s handler error %s; "
141
132
                "the event's master log %s, end_log_pos %lu"),
231
222
  pretty_print_str()
232
223
*/
233
224
 
234
 
static void clear_all_errors(THD *thd, Relay_log_info *rli)
 
225
static void clear_all_errors(Session *session, Relay_log_info *rli)
235
226
{
236
 
  thd->is_slave_error = 0;
237
 
  thd->clear_error();
 
227
  session->is_slave_error = 0;
 
228
  session->clear_error();
238
229
  rli->clear_error();
239
230
}
240
231
 
480
471
  Log_event::Log_event()
481
472
*/
482
473
 
483
 
Log_event::Log_event(THD* thd_arg, uint16_t flags_arg, bool using_trans)
484
 
  :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), thd(thd_arg)
 
474
Log_event::Log_event(Session* session_arg, uint16_t flags_arg, bool using_trans)
 
475
  :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), session(session_arg)
485
476
{
486
 
  server_id=    thd->server_id;
487
 
  when=         thd->start_time;
 
477
  server_id=    session->server_id;
 
478
  when=         session->start_time;
488
479
  cache_stmt=   using_trans;
489
480
}
490
481
 
491
482
 
492
483
/**
493
484
  This minimal constructor is for when you are not even sure that there
494
 
  is a valid THD. For example in the server when we are shutting down or
 
485
  is a valid Session. For example in the server when we are shutting down or
495
486
  flushing logs after receiving a SIGHUP (then we must write a Rotate to
496
 
  the binlog but we have no THD, so we need this minimal constructor).
 
487
  the binlog but we have no Session, so we need this minimal constructor).
497
488
*/
498
489
 
499
490
Log_event::Log_event()
500
491
  :temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
501
 
   thd(0)
 
492
   session(0)
502
493
{
503
494
  server_id=    ::server_id;
504
495
  /*
518
509
                     const Format_description_log_event* description_event)
519
510
  :temp_buf(0), cache_stmt(0)
520
511
{
521
 
  thd= 0;
 
512
  session= 0;
522
513
  when= uint4korr(buf);
523
514
  server_id= uint4korr(buf + SERVER_ID_OFFSET);
524
515
  data_written= uint4korr(buf + EVENT_LEN_OFFSET);
638
629
 
639
630
 
640
631
/**
641
 
  Only called by SHOW BINLOG EVENTS
642
 
*/
643
 
int Log_event::net_send(Protocol *protocol, const char* log_name, my_off_t pos)
644
 
{
645
 
  const char *p= strrchr(log_name, FN_LIBCHAR);
646
 
  const char *event_type;
647
 
  if (p)
648
 
    log_name = p + 1;
649
 
 
650
 
  protocol->prepare_for_resend();
651
 
  protocol->store(log_name, &my_charset_bin);
652
 
  protocol->store((uint64_t) pos);
653
 
  event_type = get_type_str();
654
 
  protocol->store(event_type, strlen(event_type), &my_charset_bin);
655
 
  protocol->store((uint32_t) server_id);
656
 
  protocol->store((uint64_t) log_pos);
657
 
  pack_info(protocol);
658
 
  return protocol->write();
659
 
}
660
 
 
661
 
 
662
 
/**
663
632
  init_show_field_list() prepares the column names and types for the
664
633
  output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
665
634
  EVENTS.
785
754
  }
786
755
  data_len= uint4korr(buf + EVENT_LEN_OFFSET);
787
756
  if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
788
 
      data_len > current_thd->variables.max_allowed_packet)
 
757
      data_len > current_session->variables.max_allowed_packet)
789
758
  {
790
759
    result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
791
760
             LOG_READ_TOO_LARGE);
869
838
  const char *error= 0;
870
839
  Log_event *res=  0;
871
840
#ifndef max_allowed_packet
872
 
  THD *thd=current_thd;
873
 
  uint32_t max_allowed_packet= thd ? thd->variables.max_allowed_packet : ~(ulong)0;
 
841
  Session *session=current_session;
 
842
  uint32_t max_allowed_packet= session ? session->variables.max_allowed_packet : ~(ulong)0;
874
843
#endif
875
844
 
876
845
  if (data_len > max_allowed_packet)
983
952
    case ROTATE_EVENT:
984
953
      ev = new Rotate_log_event(buf, event_len, description_event);
985
954
      break;
986
 
    case SLAVE_EVENT: /* can never happen (unused event) */
987
 
      ev = new Slave_log_event(buf, event_len);
988
 
      break;
989
955
    case CREATE_FILE_EVENT:
990
956
      ev = new Create_file_log_event(buf, event_len, description_event);
991
957
      break;
1317
1283
/*
1318
1284
  SYNOPSIS
1319
1285
    Query_log_event::Query_log_event()
1320
 
      thd_arg           - thread handle
 
1286
      session_arg           - thread handle
1321
1287
      query_arg         - array of char representing the query
1322
1288
      query_length      - size of the  `query_arg' array
1323
1289
      using_trans       - there is a modified transactional table
1324
1290
      suppress_use      - suppress the generation of 'USE' statements
1325
 
      killed_status_arg - an optional with default to THD::KILLED_NO_VALUE
 
1291
      killed_status_arg - an optional with default to Session::KILLED_NO_VALUE
1326
1292
                          if the value is different from the default, the arg
1327
 
                          is set to the current thd->killed value.
1328
 
                          A caller might need to masquerade thd->killed with
1329
 
                          THD::NOT_KILLED.
 
1293
                          is set to the current session->killed value.
 
1294
                          A caller might need to masquerade session->killed with
 
1295
                          Session::NOT_KILLED.
1330
1296
  DESCRIPTION
1331
1297
  Creates an event for binlogging
1332
1298
  The value for local `killed_status' can be supplied by caller.
1333
1299
*/
1334
 
Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
 
1300
Query_log_event::Query_log_event(Session* session_arg, const char* query_arg,
1335
1301
                                 ulong query_length, bool using_trans,
1336
1302
                                 bool suppress_use,
1337
 
                                 THD::killed_state killed_status_arg)
1338
 
  :Log_event(thd_arg,
1339
 
             (thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
 
1303
                                 Session::killed_state killed_status_arg)
 
1304
  :Log_event(session_arg,
 
1305
             (session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
1340
1306
              0) |
1341
1307
             (suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1342
1308
             using_trans),
1343
 
   data_buf(0), query(query_arg), catalog(thd_arg->catalog),
1344
 
   db(thd_arg->db), q_len((uint32_t) query_length),
1345
 
   thread_id(thd_arg->thread_id),
 
1309
   data_buf(0), query(query_arg), catalog(session_arg->catalog),
 
1310
   db(session_arg->db), q_len((uint32_t) query_length),
 
1311
   thread_id(session_arg->thread_id),
1346
1312
   /* save the original thread id; we already know the server id */
1347
 
   slave_proxy_id(thd_arg->variables.pseudo_thread_id),
 
1313
   slave_proxy_id(session_arg->variables.pseudo_thread_id),
1348
1314
   flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1349
1315
   sql_mode(0),
1350
 
   auto_increment_increment(thd_arg->variables.auto_increment_increment),
1351
 
   auto_increment_offset(thd_arg->variables.auto_increment_offset),
1352
 
   lc_time_names_number(thd_arg->variables.lc_time_names->number),
 
1316
   auto_increment_increment(session_arg->variables.auto_increment_increment),
 
1317
   auto_increment_offset(session_arg->variables.auto_increment_offset),
 
1318
   lc_time_names_number(session_arg->variables.lc_time_names->number),
1353
1319
   charset_database_number(0)
1354
1320
{
1355
1321
  time_t end_time;
1356
1322
 
1357
 
  if (killed_status_arg == THD::KILLED_NO_VALUE)
1358
 
    killed_status_arg= thd_arg->killed;
 
1323
  if (killed_status_arg == Session::KILLED_NO_VALUE)
 
1324
    killed_status_arg= session_arg->killed;
1359
1325
 
1360
1326
  error_code=
1361
 
    (killed_status_arg == THD::NOT_KILLED) ?
1362
 
    (thd_arg->is_error() ? thd_arg->main_da.sql_errno() : 0) :
1363
 
    (thd_arg->killed_errno());
 
1327
    (killed_status_arg == Session::NOT_KILLED) ?
 
1328
    (session_arg->is_error() ? session_arg->main_da.sql_errno() : 0) :
 
1329
    (session_arg->killed_errno());
1364
1330
  
1365
1331
  time(&end_time);
1366
 
  exec_time = (ulong) (end_time  - thd_arg->start_time);
 
1332
  exec_time = (ulong) (end_time  - session_arg->start_time);
1367
1333
  /**
1368
1334
    @todo this means that if we have no catalog, then it is replicated
1369
1335
    as an existing catalog of length zero. is that safe? /sven
1371
1337
  catalog_len = (catalog) ? (uint32_t) strlen(catalog) : 0;
1372
1338
  /* status_vars_len is set just before writing the event */
1373
1339
  db_len = (db) ? (uint32_t) strlen(db) : 0;
1374
 
  if (thd_arg->variables.collation_database != thd_arg->db_charset)
1375
 
    charset_database_number= thd_arg->variables.collation_database->number;
 
1340
  if (session_arg->variables.collation_database != session_arg->db_charset)
 
1341
    charset_database_number= session_arg->variables.collation_database->number;
1376
1342
  
1377
1343
  /*
1378
1344
    If we don't use flags2 for anything else than options contained in
1379
 
    thd_arg->options, it would be more efficient to flags2=thd_arg->options
 
1345
    session_arg->options, it would be more efficient to flags2=session_arg->options
1380
1346
    (OPTIONS_WRITTEN_TO_BIN_LOG would be used only at reading time).
1381
1347
    But it's likely that we don't want to use 32 bits for 3 bits; in the future
1382
1348
    we will probably want to reclaim the 29 bits. So we need the &.
1383
1349
  */
1384
 
  flags2= (uint32_t) (thd_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
1385
 
  assert(thd_arg->variables.character_set_client->number < 256*256);
1386
 
  assert(thd_arg->variables.collation_connection->number < 256*256);
1387
 
  assert(thd_arg->variables.collation_server->number < 256*256);
1388
 
  assert(thd_arg->variables.character_set_client->mbminlen == 1);
1389
 
  int2store(charset, thd_arg->variables.character_set_client->number);
1390
 
  int2store(charset+2, thd_arg->variables.collation_connection->number);
1391
 
  int2store(charset+4, thd_arg->variables.collation_server->number);
1392
 
  if (thd_arg->time_zone_used)
 
1350
  flags2= (uint32_t) (session_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
 
1351
  assert(session_arg->variables.character_set_client->number < 256*256);
 
1352
  assert(session_arg->variables.collation_connection->number < 256*256);
 
1353
  assert(session_arg->variables.collation_server->number < 256*256);
 
1354
  assert(session_arg->variables.character_set_client->mbminlen == 1);
 
1355
  int2store(charset, session_arg->variables.character_set_client->number);
 
1356
  int2store(charset+2, session_arg->variables.collation_connection->number);
 
1357
  int2store(charset+4, session_arg->variables.collation_server->number);
 
1358
  if (session_arg->time_zone_used)
1393
1359
  {
1394
1360
    /*
1395
1361
      Note that our event becomes dependent on the Time_zone object
1396
1362
      representing the time zone. Fortunately such objects are never deleted
1397
1363
      or changed during mysqld's lifetime.
1398
1364
    */
1399
 
    time_zone_len= thd_arg->variables.time_zone->get_name()->length();
1400
 
    time_zone_str= thd_arg->variables.time_zone->get_name()->ptr();
 
1365
    time_zone_len= session_arg->variables.time_zone->get_name()->length();
 
1366
    time_zone_str= session_arg->variables.time_zone->get_name()->ptr();
1401
1367
  }
1402
1368
  else
1403
1369
    time_zone_len= 0;
1579
1545
      auto_increment_offset=    uint2korr(pos+2);
1580
1546
      pos+= 4;
1581
1547
      break;
1582
 
    case Q_CHARSET_CODE:
1583
 
    {
1584
 
      CHECK_SPACE(pos, end, 6);
1585
 
      charset_inited= 1;
1586
 
      memcpy(charset, pos, 6);
1587
 
      pos+= 6;
1588
 
      break;
1589
 
    }
1590
1548
    case Q_TIME_ZONE_CODE:
1591
1549
    {
1592
1550
      if (get_str_len_and_pointer(&pos, &time_zone_str, &time_zone_len, end))
1680
1638
     sql_print_error("Slave: did not get the expected number of affected \
1681
1639
     rows running query from master - expected %d, got %d (this numbers \
1682
1640
     should have matched modulo 4294967296).", 0, ...);
1683
 
     thd->query_error = 1;
 
1641
     session->query_error = 1;
1684
1642
     }
1685
1643
  @endcode
1686
1644
  We may also want an option to tell the slave to ignore "affected"
1693
1651
  LEX_STRING new_db;
1694
1652
  int expected_error,actual_error= 0;
1695
1653
  /*
1696
 
    Colleagues: please never free(thd->catalog) in MySQL. This would
1697
 
    lead to bugs as here thd->catalog is a part of an alloced block,
 
1654
    Colleagues: please never free(session->catalog) in MySQL. This would
 
1655
    lead to bugs as here session->catalog is a part of an alloced block,
1698
1656
    not an entire alloced block (see
1699
 
    Query_log_event::do_apply_event()). Same for thd->db.  Thank
 
1657
    Query_log_event::do_apply_event()). Same for session->db.  Thank
1700
1658
    you.
1701
1659
  */
1702
 
  thd->catalog= catalog_len ? (char *) catalog : (char *)"";
 
1660
  session->catalog= catalog_len ? (char *) catalog : (char *)"";
1703
1661
  new_db.length= db_len;
1704
1662
  new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
1705
 
  thd->set_db(new_db.str, new_db.length);       /* allocates a copy of 'db' */
1706
 
  thd->variables.auto_increment_increment= auto_increment_increment;
1707
 
  thd->variables.auto_increment_offset=    auto_increment_offset;
 
1663
  session->set_db(new_db.str, new_db.length);       /* allocates a copy of 'db' */
 
1664
  session->variables.auto_increment_increment= auto_increment_increment;
 
1665
  session->variables.auto_increment_offset=    auto_increment_offset;
1708
1666
 
1709
1667
  /*
1710
1668
    InnoDB internally stores the master log position it has executed so far,
1718
1676
  */
1719
1677
  const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
1720
1678
 
1721
 
  clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
 
1679
  clear_all_errors(session, const_cast<Relay_log_info*>(rli));
1722
1680
  const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
1723
1681
 
1724
1682
  /*
1731
1689
            ::do_apply_event(), then the companion SET also have so
1732
1690
            we don't need to reset_one_shot_variables().
1733
1691
  */
1734
 
  if (rpl_filter->db_ok(thd->db))
 
1692
  if (rpl_filter->db_ok(session->db))
1735
1693
  {
1736
 
    thd->set_time((time_t)when);
1737
 
    thd->query_length= q_len_arg;
1738
 
    thd->query= (char*)query_arg;
 
1694
    session->set_time((time_t)when);
 
1695
    session->query_length= q_len_arg;
 
1696
    session->query= (char*)query_arg;
1739
1697
    pthread_mutex_lock(&LOCK_thread_count);
1740
 
    thd->query_id = next_query_id();
 
1698
    session->query_id = next_query_id();
1741
1699
    pthread_mutex_unlock(&LOCK_thread_count);
1742
 
    thd->variables.pseudo_thread_id= thread_id;         // for temp tables
 
1700
    session->variables.pseudo_thread_id= thread_id;             // for temp tables
1743
1701
 
1744
1702
    if (ignored_error_code((expected_error= error_code)) ||
1745
 
        !check_expected_error(thd,rli,expected_error))
 
1703
        !check_expected_error(session,rli,expected_error))
1746
1704
    {
1747
1705
      if (flags2_inited)
1748
1706
        /*
1749
 
          all bits of thd->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
 
1707
          all bits of session->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
1750
1708
          must take their value from flags2.
1751
1709
        */
1752
 
        thd->options= flags2|(thd->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
1753
 
      /*
1754
 
        else, we are in a 3.23/4.0 binlog; we previously received a
1755
 
        Rotate_log_event which reset thd->options and sql_mode etc, so
1756
 
        nothing to do.
1757
 
      */
1758
 
      if (charset_inited)
1759
 
      {
1760
 
        if (rli->cached_charset_compare(charset))
1761
 
        {
1762
 
          /* Verify that we support the charsets found in the event. */
1763
 
          if (!(thd->variables.character_set_client=
1764
 
                get_charset(uint2korr(charset), MYF(MY_WME))) ||
1765
 
              !(thd->variables.collation_connection=
1766
 
                get_charset(uint2korr(charset+2), MYF(MY_WME))) ||
1767
 
              !(thd->variables.collation_server=
1768
 
                get_charset(uint2korr(charset+4), MYF(MY_WME))))
1769
 
          {
1770
 
            /*
1771
 
              We updated the thd->variables with nonsensical values (0). Let's
1772
 
              set them to something safe (i.e. which avoids crash), and we'll
1773
 
              stop with EE_UNKNOWN_CHARSET in compare_errors (unless set to
1774
 
              ignore this error).
1775
 
            */
1776
 
            set_slave_thread_default_charset(thd, rli);
1777
 
            goto compare_errors;
1778
 
          }
1779
 
          thd->update_charset(); // for the charset change to take effect
1780
 
        }
1781
 
      }
 
1710
        session->options= flags2|(session->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
1782
1711
      if (time_zone_len)
1783
1712
      {
1784
1713
        String tmp(time_zone_str, time_zone_len, &my_charset_bin);
1785
 
        if (!(thd->variables.time_zone= my_tz_find(thd, &tmp)))
 
1714
        if (!(session->variables.time_zone= my_tz_find(session, &tmp)))
1786
1715
        {
1787
1716
          my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr());
1788
 
          thd->variables.time_zone= global_system_variables.time_zone;
 
1717
          session->variables.time_zone= global_system_variables.time_zone;
1789
1718
          goto compare_errors;
1790
1719
        }
1791
1720
      }
1792
1721
      if (lc_time_names_number)
1793
1722
      {
1794
 
        if (!(thd->variables.lc_time_names=
 
1723
        if (!(session->variables.lc_time_names=
1795
1724
              my_locale_by_number(lc_time_names_number)))
1796
1725
        {
1797
1726
          my_printf_error(ER_UNKNOWN_ERROR,
1798
1727
                      "Unknown locale: '%d'", MYF(0), lc_time_names_number);
1799
 
          thd->variables.lc_time_names= &my_locale_en_US;
 
1728
          session->variables.lc_time_names= &my_locale_en_US;
1800
1729
          goto compare_errors;
1801
1730
        }
1802
1731
      }
1803
1732
      else
1804
 
        thd->variables.lc_time_names= &my_locale_en_US;
 
1733
        session->variables.lc_time_names= &my_locale_en_US;
1805
1734
      if (charset_database_number)
1806
1735
      {
1807
1736
        const CHARSET_INFO *cs;
1812
1741
          my_error(ER_UNKNOWN_COLLATION, MYF(0), buf);
1813
1742
          goto compare_errors;
1814
1743
        }
1815
 
        thd->variables.collation_database= cs;
 
1744
        session->variables.collation_database= cs;
1816
1745
      }
1817
1746
      else
1818
 
        thd->variables.collation_database= thd->db_charset;
 
1747
        session->variables.collation_database= session->db_charset;
1819
1748
      
1820
1749
      /* Execute the query (note that we bypass dispatch_command()) */
1821
1750
      const char* found_semicolon= NULL;
1822
 
      mysql_parse(thd, thd->query, thd->query_length, &found_semicolon);
1823
 
      log_slow_statement(thd);
 
1751
      mysql_parse(session, session->query, session->query_length, &found_semicolon);
 
1752
      log_slow_statement(session);
1824
1753
    }
1825
1754
    else
1826
1755
    {
1831
1760
        we exit gracefully; otherwise we warn about the bad error and tell DBA
1832
1761
        to check/fix it.
1833
1762
      */
1834
 
      if (mysql_test_parse_for_slave(thd, thd->query, thd->query_length))
1835
 
        clear_all_errors(thd, const_cast<Relay_log_info*>(rli)); /* Can ignore query */
 
1763
      if (mysql_test_parse_for_slave(session, session->query, session->query_length))
 
1764
        clear_all_errors(session, const_cast<Relay_log_info*>(rli)); /* Can ignore query */
1836
1765
      else
1837
1766
      {
1838
1767
        rli->report(ERROR_LEVEL, expected_error,
1843
1772
                      "this query manually on the slave and then restart the "
1844
1773
                      "slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1; "
1845
1774
                      "START SLAVE; . Query: '%s'"),
1846
 
                    expected_error, thd->query);
1847
 
        thd->is_slave_error= 1;
 
1775
                    expected_error, session->query);
 
1776
        session->is_slave_error= 1;
1848
1777
      }
1849
1778
      goto end;
1850
1779
    }
1855
1784
      If we expected a non-zero error code, and we don't get the same error
1856
1785
      code, and none of them should be ignored.
1857
1786
    */
1858
 
    actual_error= thd->is_error() ? thd->main_da.sql_errno() : 0;
 
1787
    actual_error= session->is_error() ? session->main_da.sql_errno() : 0;
1859
1788
    if ((expected_error != actual_error) &&
1860
1789
        expected_error &&
1861
1790
        !ignored_error_code(actual_error) &&
1865
1794
                  _("Query caused differenxt errors on master and slave.\n"
1866
1795
                    "Error on master: '%s' (%d), Error on slave: '%s' (%d).\n"
1867
1796
                    "Default database: '%s'. Query: '%s'"),
1868
 
                  ER_SAFE(expected_error),
 
1797
                  ER(expected_error),
1869
1798
                  expected_error,
1870
 
                  actual_error ? thd->main_da.message() : _("no error"),
 
1799
                  actual_error ? session->main_da.message() : _("no error"),
1871
1800
                  actual_error,
1872
1801
                  print_slave_db_safe(db), query_arg);
1873
 
      thd->is_slave_error= 1;
 
1802
      session->is_slave_error= 1;
1874
1803
    }
1875
1804
    /*
1876
1805
      If we get the same error code as expected, or they should be ignored. 
1878
1807
    else if (expected_error == actual_error ||
1879
1808
             ignored_error_code(actual_error))
1880
1809
    {
1881
 
      clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
1882
 
      thd->killed= THD::NOT_KILLED;
 
1810
      clear_all_errors(session, const_cast<Relay_log_info*>(rli));
 
1811
      session->killed= Session::NOT_KILLED;
1883
1812
    }
1884
1813
    /*
1885
1814
      Other cases: mostly we expected no error and get one.
1886
1815
    */
1887
 
    else if (thd->is_slave_error || thd->is_fatal_error)
 
1816
    else if (session->is_slave_error || session->is_fatal_error)
1888
1817
    {
1889
1818
      rli->report(ERROR_LEVEL, actual_error,
1890
1819
                  _("Error '%s' on query. Default database: '%s'. Query: '%s'"),
1891
 
                  (actual_error ? thd->main_da.message() :
 
1820
                  (actual_error ? session->main_da.message() :
1892
1821
                   _("unexpected success or fatal error")),
1893
 
                      print_slave_db_safe(thd->db), query_arg);
1894
 
      thd->is_slave_error= 1;
 
1822
                      print_slave_db_safe(session->db), query_arg);
 
1823
      session->is_slave_error= 1;
1895
1824
    }
1896
1825
 
1897
1826
    /*
1902
1831
      sql_print_error("Slave: did not get the expected number of affected \
1903
1832
      rows running query from master - expected %d, got %d (this numbers \
1904
1833
      should have matched modulo 4294967296).", 0, ...);
1905
 
      thd->is_slave_error = 1;
 
1834
      session->is_slave_error = 1;
1906
1835
      }
1907
1836
      We may also want an option to tell the slave to ignore "affected"
1908
1837
      mismatch. This mismatch could be implemented with a new ER_ code, and
1920
1849
end:
1921
1850
  pthread_mutex_lock(&LOCK_thread_count);
1922
1851
  /*
1923
 
    Probably we have set thd->query, thd->db, thd->catalog to point to places
 
1852
    Probably we have set session->query, session->db, session->catalog to point to places
1924
1853
    in the data_buf of this event. Now the event is going to be deleted
1925
 
    probably, so data_buf will be freed, so the thd->... listed above will be
 
1854
    probably, so data_buf will be freed, so the session->... listed above will be
1926
1855
    pointers to freed memory. 
1927
1856
    So we must set them to 0, so that those bad pointers values are not later
1928
1857
    used. Note that "cleanup" queries like automatic DROP TEMPORARY Table
1929
1858
    don't suffer from these assignments to 0 as DROP TEMPORARY
1930
1859
    Table uses the db.table syntax.
1931
1860
  */
1932
 
  thd->catalog= 0;
1933
 
  thd->set_db(NULL, 0);                 /* will free the current database */
1934
 
  thd->query= 0;                        // just to be sure
1935
 
  thd->query_length= 0;
 
1861
  session->catalog= 0;
 
1862
  session->set_db(NULL, 0);                 /* will free the current database */
 
1863
  session->query= 0;                    // just to be sure
 
1864
  session->query_length= 0;
1936
1865
  pthread_mutex_unlock(&LOCK_thread_count);
1937
 
  close_thread_tables(thd);      
 
1866
  close_thread_tables(session);      
1938
1867
  /*
1939
1868
    As a disk space optimization, future masters will not log an event for
1940
1869
    LAST_INSERT_ID() if that function returned 0 (and thus they will be able
1941
 
    to replace the THD::stmt_depends_on_first_successful_insert_id_in_prev_stmt
1942
 
    variable by (THD->first_successful_insert_id_in_prev_stmt > 0) ; with the
 
1870
    to replace the Session::stmt_depends_on_first_successful_insert_id_in_prev_stmt
 
1871
    variable by (Session->first_successful_insert_id_in_prev_stmt > 0) ; with the
1943
1872
    resetting below we are ready to support that.
1944
1873
  */
1945
 
  thd->first_successful_insert_id_in_prev_stmt_for_binlog= 0;
1946
 
  thd->first_successful_insert_id_in_prev_stmt= 0;
1947
 
  thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
1948
 
  free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
1949
 
  return thd->is_slave_error;
 
1874
  session->first_successful_insert_id_in_prev_stmt_for_binlog= 0;
 
1875
  session->first_successful_insert_id_in_prev_stmt= 0;
 
1876
  session->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
 
1877
  free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
 
1878
  return session->is_slave_error;
1950
1879
}
1951
1880
 
1952
1881
int Query_log_event::do_update_pos(Relay_log_info *rli)
1956
1885
    after a SET ONE_SHOT, because SET ONE_SHOT should not be separated
1957
1886
    from its following updating query.
1958
1887
  */
1959
 
  if (thd->one_shot_set)
 
1888
  if (session->one_shot_set)
1960
1889
  {
1961
1890
    rli->inc_event_relay_log_pos();
1962
1891
    return 0;
1975
1904
  {
1976
1905
    if (strcmp("BEGIN", query) == 0)
1977
1906
    {
1978
 
      thd->options|= OPTION_BEGIN;
 
1907
      session->options|= OPTION_BEGIN;
1979
1908
      return(Log_event::continue_group(rli));
1980
1909
    }
1981
1910
 
1982
1911
    if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
1983
1912
    {
1984
 
      thd->options&= ~OPTION_BEGIN;
 
1913
      session->options&= ~OPTION_BEGIN;
1985
1914
      return(Log_event::EVENT_SKIP_COUNT);
1986
1915
    }
1987
1916
  }
2086
2015
    */
2087
2016
    if (created)
2088
2017
    {
2089
 
      close_temporary_tables(thd);
 
2018
      close_temporary_tables(session);
2090
2019
      cleanup_load_tmpdir();
2091
2020
    }
2092
2021
    break;
2103
2032
        Can distinguish, based on the value of 'created': this event was
2104
2033
        generated at master startup.
2105
2034
      */
2106
 
      close_temporary_tables(thd);
 
2035
      close_temporary_tables(session);
2107
2036
    }
2108
2037
    /*
2109
2038
      Otherwise, can't distinguish a Start_log_event generated at
2401
2330
    original place when it comes to us; we'll know this by checking
2402
2331
    log_pos ("artificial" events have log_pos == 0).
2403
2332
  */
2404
 
  if (!artificial_event && created && thd->transaction.all.ha_list)
 
2333
  if (!artificial_event && created && session->transaction.all.ha_list)
2405
2334
  {
2406
2335
    /* This is not an error (XA is safe), just an information */
2407
2336
    rli->report(INFORMATION_LEVEL, 0,
2409
2338
                  "or ROLLBACK in relay log). A probable cause is that "
2410
2339
                  "the master died while writing the transaction to "
2411
2340
                  "its binary log, thus rolled back too."));
2412
 
    const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 1);
 
2341
    const_cast<Relay_log_info*>(rli)->cleanup_context(session, 1);
2413
2342
  }
2414
2343
  /*
2415
2344
    If this event comes from ourselves, there is no cleaning task to
2673
2602
  Load_log_event::Load_log_event()
2674
2603
*/
2675
2604
 
2676
 
Load_log_event::Load_log_event(THD *thd_arg, sql_exchange *ex,
 
2605
Load_log_event::Load_log_event(Session *session_arg, sql_exchange *ex,
2677
2606
                               const char *db_arg, const char *table_name_arg,
2678
2607
                               List<Item> &fields_arg,
2679
2608
                               enum enum_duplicates handle_dup,
2680
2609
                               bool ignore, bool using_trans)
2681
 
  :Log_event(thd_arg,
2682
 
             thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
 
2610
  :Log_event(session_arg,
 
2611
             session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
2683
2612
             using_trans),
2684
 
   thread_id(thd_arg->thread_id),
2685
 
   slave_proxy_id(thd_arg->variables.pseudo_thread_id),
 
2613
   thread_id(session_arg->thread_id),
 
2614
   slave_proxy_id(session_arg->variables.pseudo_thread_id),
2686
2615
   num_fields(0),fields(0),
2687
2616
   field_lens(0),field_block_len(0),
2688
2617
   table_name(table_name_arg ? table_name_arg : ""),
2690
2619
{
2691
2620
  time_t end_time;
2692
2621
  time(&end_time);
2693
 
  exec_time = (ulong) (end_time  - thd_arg->start_time);
 
2622
  exec_time = (ulong) (end_time  - session_arg->start_time);
2694
2623
  /* db can never be a zero pointer in 4.0 */
2695
2624
  db_len = (uint32_t) strlen(db);
2696
2625
  table_name_len = (uint32_t) strlen(table_name);
2891
2820
  LEX_STRING new_db;
2892
2821
  new_db.length= db_len;
2893
2822
  new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
2894
 
  thd->set_db(new_db.str, new_db.length);
2895
 
  assert(thd->query == 0);
2896
 
  thd->query_length= 0;                         // Should not be needed
2897
 
  thd->is_slave_error= 0;
2898
 
  clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
 
2823
  session->set_db(new_db.str, new_db.length);
 
2824
  assert(session->query == 0);
 
2825
  session->query_length= 0;                         // Should not be needed
 
2826
  session->is_slave_error= 0;
 
2827
  clear_all_errors(session, const_cast<Relay_log_info*>(rli));
2899
2828
 
2900
2829
  /* see Query_log_event::do_apply_event() and BUG#13360 */
2901
2830
  assert(!rli->m_table_map.count());
2903
2832
    Usually lex_start() is called by mysql_parse(), but we need it here
2904
2833
    as the present method does not call mysql_parse().
2905
2834
  */
2906
 
  lex_start(thd);
2907
 
  mysql_reset_thd_for_next_command(thd);
 
2835
  lex_start(session);
 
2836
  mysql_reset_session_for_next_command(session);
2908
2837
 
2909
2838
  if (!use_rli_only_for_errors)
2910
2839
  {
2938
2867
            ::do_apply_event(), then the companion SET also have so
2939
2868
            we don't need to reset_one_shot_variables().
2940
2869
  */
2941
 
  if (rpl_filter->db_ok(thd->db))
 
2870
  if (rpl_filter->db_ok(session->db))
2942
2871
  {
2943
 
    thd->set_time((time_t)when);
 
2872
    session->set_time((time_t)when);
2944
2873
    pthread_mutex_lock(&LOCK_thread_count);
2945
 
    thd->query_id = next_query_id();
 
2874
    session->query_id = next_query_id();
2946
2875
    pthread_mutex_unlock(&LOCK_thread_count);
2947
2876
    /*
2948
 
      Initing thd->row_count is not necessary in theory as this variable has no
 
2877
      Initing session->row_count is not necessary in theory as this variable has no
2949
2878
      influence in the case of the slave SQL thread (it is used to generate a
2950
2879
      "data truncated" warning but which is absorbed and never gets to the
2951
2880
      error log); still we init it to avoid a Valgrind message.
2952
2881
    */
2953
 
    drizzle_reset_errors(thd, 0);
 
2882
    drizzle_reset_errors(session, 0);
2954
2883
 
2955
2884
    TableList tables;
2956
2885
    memset(&tables, 0, sizeof(tables));
2957
 
    tables.db= thd->strmake(thd->db, thd->db_length);
 
2886
    tables.db= session->strmake(session->db, session->db_length);
2958
2887
    tables.alias = tables.table_name = (char*) table_name;
2959
2888
    tables.lock_type = TL_WRITE;
2960
2889
    tables.updating= 1;
2961
2890
 
2962
2891
    // the table will be opened in mysql_load    
2963
 
    if (rpl_filter->is_on() && !rpl_filter->tables_ok(thd->db, &tables))
 
2892
    if (rpl_filter->is_on() && !rpl_filter->tables_ok(session->db, &tables))
2964
2893
    {
2965
2894
      // TODO: this is a bug - this needs to be moved to the I/O thread
2966
2895
      if (net)
2978
2907
        Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST
2979
2908
        and written to slave's binlog if binlogging is on.
2980
2909
      */
2981
 
      if (!(load_data_query= (char *)thd->alloc(get_query_buffer_length() + 1)))
 
2910
      if (!(load_data_query= (char *)session->alloc(get_query_buffer_length() + 1)))
2982
2911
      {
2983
2912
        /*
2984
 
          This will set thd->fatal_error in case of OOM. So we surely will notice
 
2913
          This will set session->fatal_error in case of OOM. So we surely will notice
2985
2914
          that something is wrong.
2986
2915
        */
2987
2916
        goto error;
2988
2917
      }
2989
2918
 
2990
 
      print_query(false, load_data_query, &end, (char **)&thd->lex->fname_start,
2991
 
                  (char **)&thd->lex->fname_end);
 
2919
      print_query(false, load_data_query, &end, (char **)&session->lex->fname_start,
 
2920
                  (char **)&session->lex->fname_end);
2992
2921
      *end= 0;
2993
 
      thd->query_length= end - load_data_query;
2994
 
      thd->query= load_data_query;
 
2922
      session->query_length= end - load_data_query;
 
2923
      session->query= load_data_query;
2995
2924
 
2996
2925
      if (sql_ex.opt_flags & REPLACE_FLAG)
2997
2926
      {
3020
2949
        handle_dup= DUP_ERROR;
3021
2950
      }
3022
2951
      /*
3023
 
        We need to set thd->lex->sql_command and thd->lex->duplicates
 
2952
        We need to set session->lex->sql_command and session->lex->duplicates
3024
2953
        since InnoDB tests these variables to decide if this is a LOAD
3025
2954
        DATA ... REPLACE INTO ... statement even though mysql_parse()
3026
2955
        is not called.  This is not needed in 5.0 since there the LOAD
3027
2956
        DATA ... statement is replicated using mysql_parse(), which
3028
 
        sets the thd->lex fields correctly.
 
2957
        sets the session->lex fields correctly.
3029
2958
      */
3030
 
      thd->lex->sql_command= SQLCOM_LOAD;
3031
 
      thd->lex->duplicates= handle_dup;
 
2959
      session->lex->sql_command= SQLCOM_LOAD;
 
2960
      session->lex->duplicates= handle_dup;
3032
2961
 
3033
2962
      sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
3034
2963
      String field_term(sql_ex.field_term,sql_ex.field_term_len,log_cs);
3048
2977
 
3049
2978
      ex.skip_lines = skip_lines;
3050
2979
      List<Item> field_list;
3051
 
      thd->lex->select_lex.context.resolve_in_table_list_only(&tables);
3052
 
      set_fields(tables.db, field_list, &thd->lex->select_lex.context);
3053
 
      thd->variables.pseudo_thread_id= thread_id;
 
2980
      session->lex->select_lex.context.resolve_in_table_list_only(&tables);
 
2981
      set_fields(tables.db, field_list, &session->lex->select_lex.context);
 
2982
      session->variables.pseudo_thread_id= thread_id;
3054
2983
      if (net)
3055
2984
      {
3056
 
        // mysql_load will use thd->net to read the file
3057
 
        thd->net.vio = net->vio;
 
2985
        // mysql_load will use session->net to read the file
 
2986
        session->net.vio = net->vio;
3058
2987
        /*
3059
2988
          Make sure the client does not get confused about the packet sequence
3060
2989
        */
3061
 
        thd->net.pkt_nr = net->pkt_nr;
 
2990
        session->net.pkt_nr = net->pkt_nr;
3062
2991
      }
3063
2992
      /*
3064
2993
        It is safe to use tmp_list twice because we are not going to
3065
2994
        update it inside mysql_load().
3066
2995
      */
3067
2996
      List<Item> tmp_list;
3068
 
      if (mysql_load(thd, &ex, &tables, field_list, tmp_list, tmp_list,
 
2997
      if (mysql_load(session, &ex, &tables, field_list, tmp_list, tmp_list,
3069
2998
                     handle_dup, ignore, net != 0))
3070
 
        thd->is_slave_error= 1;
3071
 
      if (thd->cuted_fields)
 
2999
        session->is_slave_error= 1;
 
3000
      if (session->cuted_fields)
3072
3001
      {
3073
3002
        /* log_pos is the position of the LOAD event in the master log */
3074
3003
        sql_print_warning(_("Slave: load data infile on table '%s' at "
3076
3005
                          "warning(s). Default database: '%s'"),
3077
3006
                          (char*) table_name,
3078
3007
                          llstr(log_pos,llbuff), RPL_LOG_NAME, 
3079
 
                          (ulong) thd->cuted_fields,
3080
 
                          print_slave_db_safe(thd->db));
 
3008
                          (ulong) session->cuted_fields,
 
3009
                          print_slave_db_safe(session->db));
3081
3010
      }
3082
3011
      if (net)
3083
 
        net->pkt_nr= thd->net.pkt_nr;
 
3012
        net->pkt_nr= session->net.pkt_nr;
3084
3013
    }
3085
3014
  }
3086
3015
  else
3095
3024
  }
3096
3025
 
3097
3026
error:
3098
 
  thd->net.vio = 0; 
3099
 
  const char *remember_db= thd->db;
 
3027
  session->net.vio = 0; 
 
3028
  const char *remember_db= session->db;
3100
3029
  pthread_mutex_lock(&LOCK_thread_count);
3101
 
  thd->catalog= 0;
3102
 
  thd->set_db(NULL, 0);                   /* will free the current database */
3103
 
  thd->query= 0;
3104
 
  thd->query_length= 0;
 
3030
  session->catalog= 0;
 
3031
  session->set_db(NULL, 0);                   /* will free the current database */
 
3032
  session->query= 0;
 
3033
  session->query_length= 0;
3105
3034
  pthread_mutex_unlock(&LOCK_thread_count);
3106
 
  close_thread_tables(thd);
 
3035
  close_thread_tables(session);
3107
3036
 
3108
 
  if (thd->is_slave_error)
 
3037
  if (session->is_slave_error)
3109
3038
  {
3110
3039
    /* this err/sql_errno code is copy-paste from net_send_error() */
3111
3040
    const char *err;
3112
3041
    int sql_errno;
3113
 
    if (thd->is_error())
 
3042
    if (session->is_error())
3114
3043
    {
3115
 
      err= thd->main_da.message();
3116
 
      sql_errno= thd->main_da.sql_errno();
 
3044
      err= session->main_da.message();
 
3045
      sql_errno= session->main_da.sql_errno();
3117
3046
    }
3118
3047
    else
3119
3048
    {
3124
3053
                _("Error '%s' running LOAD DATA INFILE on table '%s'. "
3125
3054
                  "Default database: '%s'"),
3126
3055
                err, (char*)table_name, print_slave_db_safe(remember_db));
3127
 
    free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
 
3056
    free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
3128
3057
    return 1;
3129
3058
  }
3130
 
  free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
 
3059
  free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
3131
3060
 
3132
 
  if (thd->is_fatal_error)
 
3061
  if (session->is_fatal_error)
3133
3062
  {
3134
3063
    char buf[256];
3135
3064
    snprintf(buf, sizeof(buf),
3256
3185
  if ((server_id != ::server_id || rli->replicate_same_server_id) &&
3257
3186
      !rli->is_in_group())
3258
3187
  {
3259
 
    memcpy(rli->group_master_log_name, new_log_ident, ident_len+1);
 
3188
    rli->group_master_log_name.assign(new_log_ident, ident_len+1);
3260
3189
    rli->notify_group_master_log_name_update();
3261
3190
    rli->group_master_log_pos= pos;
3262
 
    strmake(rli->group_relay_log_name, rli->event_relay_log_name,
3263
 
            sizeof(rli->group_relay_log_name) - 1);
 
3191
    rli->group_relay_log_name.assign(rli->event_relay_log_name);
3264
3192
    rli->notify_group_relay_log_name_update();
3265
3193
    rli->group_relay_log_pos= rli->event_relay_log_pos;
3266
3194
    /*
3267
 
      Reset thd->options and sql_mode etc, because this could be the signal of
 
3195
      Reset session->options and sql_mode etc, because this could be the signal of
3268
3196
      a master's downgrade from 5.0 to 4.0.
3269
3197
      However, no need to reset description_event_for_exec: indeed, if the next
3270
3198
      master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
3271
3199
      master is 4.0 then the events are in the slave's format (conversion).
3272
3200
    */
3273
 
    set_slave_thread_options(thd);
3274
 
    set_slave_thread_default_charset(thd, rli);
3275
 
    thd->variables.auto_increment_increment=
3276
 
      thd->variables.auto_increment_offset= 1;
 
3201
    set_slave_thread_options(session);
 
3202
    session->variables.auto_increment_increment=
 
3203
      session->variables.auto_increment_offset= 1;
3277
3204
  }
3278
3205
  pthread_mutex_unlock(&rli->data_lock);
3279
3206
  pthread_cond_broadcast(&rli->data_cond);
3379
3306
 
3380
3307
  switch (type) {
3381
3308
  case LAST_INSERT_ID_EVENT:
3382
 
    thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1;
3383
 
    thd->first_successful_insert_id_in_prev_stmt= val;
 
3309
    session->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1;
 
3310
    session->first_successful_insert_id_in_prev_stmt= val;
3384
3311
    break;
3385
3312
  case INSERT_ID_EVENT:
3386
 
    thd->force_one_auto_inc_interval(val);
 
3313
    session->force_one_auto_inc_interval(val);
3387
3314
    break;
3388
3315
  }
3389
3316
  return 0;
3454
3381
   */
3455
3382
  const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3456
3383
 
3457
 
  thd->rand.seed1= (ulong) seed1;
3458
 
  thd->rand.seed2= (ulong) seed2;
 
3384
  session->rand.seed1= (ulong) seed1;
 
3385
  session->rand.seed2= (ulong) seed2;
3459
3386
  return 0;
3460
3387
}
3461
3388
 
3522
3449
 
3523
3450
int Xid_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
3524
3451
{
3525
 
  return end_trans(thd, COMMIT);
 
3452
  return end_trans(session, COMMIT);
3526
3453
}
3527
3454
 
3528
3455
Log_event::enum_skip_reason
3529
3456
Xid_log_event::do_shall_skip(Relay_log_info *rli)
3530
3457
{
3531
3458
  if (rli->slave_skip_counter > 0) {
3532
 
    thd->options&= ~OPTION_BEGIN;
 
3459
    session->options&= ~OPTION_BEGIN;
3533
3460
    return(Log_event::EVENT_SKIP_COUNT);
3534
3461
  }
3535
3462
  return(Log_event::do_shall_skip(rli));
3773
3700
    Item_func_set_user_var can't substitute something else on its place =>
3774
3701
    0 can be passed as last argument (reference on item)
3775
3702
  */
3776
 
  e.fix_fields(thd, 0);
 
3703
  e.fix_fields(session, 0);
3777
3704
  /*
3778
3705
    A variable can just be considered as a table with
3779
3706
    a single record and with a single column. Thus, like
3780
3707
    a column value, it could always have IMPLICIT derivation.
3781
3708
   */
3782
3709
  e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0);
3783
 
  free_root(thd->mem_root,0);
 
3710
  free_root(session->mem_root,0);
3784
3711
 
3785
3712
  return 0;
3786
3713
}
3814
3741
{
3815
3742
  char buf[256+HOSTNAME_LENGTH], *pos;
3816
3743
  pos= my_stpcpy(buf, "host=");
3817
 
  pos= my_stpncpy(pos, master_host, HOSTNAME_LENGTH);
 
3744
  pos= my_stpncpy(pos, master_host.c_str(), HOSTNAME_LENGTH);
3818
3745
  pos= my_stpcpy(pos, ",port=");
3819
3746
  pos= int10_to_str((long) master_port, pos, 10);
3820
3747
  pos= my_stpcpy(pos, ",log=");
3821
 
  pos= my_stpcpy(pos, master_log);
 
3748
  pos= my_stpcpy(pos, master_log.c_str());
3822
3749
  pos= my_stpcpy(pos, ",pos=");
3823
3750
  pos= int64_t10_to_str(master_pos, pos, 10);
3824
3751
  protocol->store(buf, pos-buf, &my_charset_bin);
3829
3756
  @todo
3830
3757
  re-write this better without holding both locks at the same time
3831
3758
*/
3832
 
Slave_log_event::Slave_log_event(THD* thd_arg,
 
3759
Slave_log_event::Slave_log_event(Session* session_arg,
3833
3760
                                 Relay_log_info* rli)
3834
 
  :Log_event(thd_arg, 0, 0) , mem_pool(0), master_host(0)
 
3761
  :Log_event(session_arg, 0, 0) , mem_pool(0), master_host(0)
3835
3762
{
3836
3763
  if (!rli->inited)                             // QQ When can this happen ?
3837
3764
    return;
3840
3767
  // TODO: re-write this better without holding both locks at the same time
3841
3768
  pthread_mutex_lock(&mi->data_lock);
3842
3769
  pthread_mutex_lock(&rli->data_lock);
3843
 
  master_host_len = strlen(mi->host);
3844
 
  master_log_len = strlen(rli->group_master_log_name);
3845
3770
  // on OOM, just do not initialize the structure and print the error
3846
3771
  if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
3847
3772
                                   MYF(MY_WME))))
3848
3773
  {
3849
 
    master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
3850
 
    memcpy(master_host, mi->host, master_host_len + 1);
3851
 
    master_log = master_host + master_host_len + 1;
3852
 
    memcpy(master_log, rli->group_master_log_name, master_log_len + 1);
3853
 
    master_port = mi->port;
 
3774
    master_host.assign(mi->getHostname());
 
3775
    master_log.assign(rli->group_master_log_name);
 
3776
    master_port = mi->getPort();
3854
3777
    master_pos = rli->group_master_log_pos;
3855
3778
  }
3856
3779
  else
3869
3792
 
3870
3793
int Slave_log_event::get_data_size()
3871
3794
{
3872
 
  return master_host_len + master_log_len + 1 + SL_MASTER_HOST_OFFSET;
 
3795
  return master_host.length() + master_log.length() + 1 + SL_MASTER_HOST_OFFSET;
3873
3796
}
3874
3797
 
3875
3798
 
3885
3808
}
3886
3809
 
3887
3810
 
3888
 
void Slave_log_event::init_from_mem_pool(int data_size)
 
3811
void Slave_log_event::init_from_mem_pool()
3889
3812
{
3890
3813
  master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
3891
3814
  master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
3892
 
  master_host = mem_pool + SL_MASTER_HOST_OFFSET;
3893
 
  master_host_len = strlen(master_host);
3894
 
  // safety
3895
 
  master_log = master_host + master_host_len + 1;
3896
 
  if (master_log > mem_pool + data_size)
3897
 
  {
3898
 
    master_host = 0;
3899
 
    return;
3900
 
  }
3901
 
  master_log_len = strlen(master_log);
3902
 
}
3903
 
 
3904
 
 
3905
 
/** This code is not used, so has not been updated to be format-tolerant. */
3906
 
Slave_log_event::Slave_log_event(const char* buf, uint32_t event_len)
3907
 
  :Log_event(buf,0) /*unused event*/ ,mem_pool(0),master_host(0)
3908
 
{
3909
 
  if (event_len < LOG_EVENT_HEADER_LEN)
3910
 
    return;
3911
 
  event_len -= LOG_EVENT_HEADER_LEN;
3912
 
  if (!(mem_pool = (char*) my_malloc(event_len + 1, MYF(MY_WME))))
3913
 
    return;
3914
 
  memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len);
3915
 
  mem_pool[event_len] = 0;
3916
 
  init_from_mem_pool(event_len);
 
3815
#ifdef FIXME
 
3816
  /* Assign these correctly */
 
3817
  master_host.assign(mem_pool + SL_MASTER_HOST_OFFSET);
 
3818
  master_log.assign();
 
3819
#endif
3917
3820
}
3918
3821
 
3919
3822
 
3949
3852
    could give false triggers in MASTER_POS_WAIT() that we have reached
3950
3853
    the target position when in fact we have not.
3951
3854
  */
3952
 
  if (thd->options & OPTION_BEGIN)
 
3855
  if (session->options & OPTION_BEGIN)
3953
3856
    rli->inc_event_relay_log_pos();
3954
3857
  else
3955
3858
  {
3969
3872
*/
3970
3873
 
3971
3874
Create_file_log_event::
3972
 
Create_file_log_event(THD* thd_arg, sql_exchange* ex,
 
3875
Create_file_log_event(Session* session_arg, sql_exchange* ex,
3973
3876
                      const char* db_arg, const char* table_name_arg,
3974
3877
                      List<Item>& fields_arg, enum enum_duplicates handle_dup,
3975
3878
                      bool ignore,
3976
3879
                      unsigned char* block_arg, uint32_t block_len_arg, bool using_trans)
3977
 
  :Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
 
3880
  :Load_log_event(session_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
3978
3881
                  using_trans),
3979
3882
   fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
3980
 
   file_id(thd_arg->file_id = mysql_bin_log.next_file_id())
 
3883
   file_id(session_arg->file_id = mysql_bin_log.next_file_id())
3981
3884
{
3982
3885
  sql_ex.force_new_format();
3983
3886
  return;
4114
4017
  memset(&file, 0, sizeof(file));
4115
4018
  fname_buf= my_stpcpy(proc_info, "Making temp file ");
4116
4019
  ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
4117
 
  thd_proc_info(thd, proc_info);
 
4020
  session->set_proc_info(proc_info);
4118
4021
  my_delete(fname_buf, MYF(0)); // old copy may exist already
4119
4022
  if ((fd= my_create(fname_buf, CREATE_MODE,
4120
 
                     O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
 
4023
                     O_WRONLY | O_EXCL,
4121
4024
                     MYF(MY_WME))) < 0 ||
4122
4025
      init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
4123
4026
                    MYF(MY_WME|MY_NABP)))
4145
4048
  // fname_buf now already has .data, not .info, because we did our trick
4146
4049
  my_delete(fname_buf, MYF(0)); // old copy may exist already
4147
4050
  if ((fd= my_create(fname_buf, CREATE_MODE,
4148
 
                     O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
 
4051
                     O_WRONLY | O_EXCL,
4149
4052
                     MYF(MY_WME))) < 0)
4150
4053
  {
4151
4054
    rli->report(ERROR_LEVEL, my_errno,
4167
4070
    end_io_cache(&file);
4168
4071
  if (fd >= 0)
4169
4072
    my_close(fd, MYF(0));
4170
 
  thd_proc_info(thd, 0);
 
4073
  session->set_proc_info(0);
4171
4074
  return error == 0;
4172
4075
}
4173
4076
 
4180
4083
  Append_block_log_event ctor
4181
4084
*/
4182
4085
 
4183
 
Append_block_log_event::Append_block_log_event(THD *thd_arg,
 
4086
Append_block_log_event::Append_block_log_event(Session *session_arg,
4184
4087
                                               const char *db_arg,
4185
4088
                                               unsigned char *block_arg,
4186
4089
                                               uint32_t block_len_arg,
4187
4090
                                               bool using_trans)
4188
 
  :Log_event(thd_arg,0, using_trans), block(block_arg),
4189
 
   block_len(block_len_arg), file_id(thd_arg->file_id), db(db_arg)
 
4091
  :Log_event(session_arg,0, using_trans), block(block_arg),
 
4092
   block_len(block_len_arg), file_id(session_arg->file_id), db(db_arg)
4190
4093
{
4191
4094
}
4192
4095
 
4261
4164
 
4262
4165
  fname= my_stpcpy(proc_info, "Making temp file ");
4263
4166
  slave_load_file_stem(fname, file_id, server_id, ".data");
4264
 
  thd_proc_info(thd, proc_info);
 
4167
  session->set_proc_info(proc_info);
4265
4168
  if (get_create_or_append())
4266
4169
  {
4267
4170
    my_delete(fname, MYF(0)); // old copy may exist already
4268
4171
    if ((fd= my_create(fname, CREATE_MODE,
4269
 
                       O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
 
4172
                       O_WRONLY | O_EXCL,
4270
4173
                       MYF(MY_WME))) < 0)
4271
4174
    {
4272
4175
      rli->report(ERROR_LEVEL, my_errno,
4275
4178
      goto err;
4276
4179
    }
4277
4180
  }
4278
 
  else if ((fd = my_open(fname, O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW,
 
4181
  else if ((fd = my_open(fname, O_WRONLY | O_APPEND,
4279
4182
                         MYF(MY_WME))) < 0)
4280
4183
  {
4281
4184
    rli->report(ERROR_LEVEL, my_errno,
4295
4198
err:
4296
4199
  if (fd >= 0)
4297
4200
    my_close(fd, MYF(0));
4298
 
  thd_proc_info(thd, 0);
 
4201
  session->set_proc_info(0);
4299
4202
  return(error);
4300
4203
}
4301
4204
 
4308
4211
  Delete_file_log_event ctor
4309
4212
*/
4310
4213
 
4311
 
Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg,
 
4214
Delete_file_log_event::Delete_file_log_event(Session *session_arg, const char* db_arg,
4312
4215
                                             bool using_trans)
4313
 
  :Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
 
4216
  :Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
4314
4217
{
4315
4218
}
4316
4219
 
4378
4281
  Execute_load_log_event ctor
4379
4282
*/
4380
4283
 
4381
 
Execute_load_log_event::Execute_load_log_event(THD *thd_arg,
 
4284
Execute_load_log_event::Execute_load_log_event(Session *session_arg,
4382
4285
                                               const char* db_arg,
4383
4286
                                               bool using_trans)
4384
 
  :Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
 
4287
  :Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
4385
4288
{
4386
4289
}
4387
4290
  
4442
4345
  Load_log_event *lev= 0;
4443
4346
 
4444
4347
  ext= slave_load_file_stem(fname, file_id, server_id, ".info");
4445
 
  if ((fd = my_open(fname, O_RDONLY | O_BINARY | O_NOFOLLOW,
 
4348
  if ((fd = my_open(fname, O_RDONLY,
4446
4349
                    MYF(MY_WME))) < 0 ||
4447
4350
      init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
4448
4351
                    MYF(MY_WME|MY_NABP)))
4464
4367
    goto err;
4465
4368
  }
4466
4369
 
4467
 
  lev->thd = thd;
 
4370
  lev->session = session;
4468
4371
  /*
4469
4372
    lev->do_apply_event should use rli only for errors i.e. should
4470
4373
    not advance rli's position.
4525
4428
**************************************************************************/
4526
4429
 
4527
4430
Begin_load_query_log_event::
4528
 
Begin_load_query_log_event(THD* thd_arg, const char* db_arg, unsigned char* block_arg,
 
4431
Begin_load_query_log_event(Session* session_arg, const char* db_arg, unsigned char* block_arg,
4529
4432
                           uint32_t block_len_arg, bool using_trans)
4530
 
  :Append_block_log_event(thd_arg, db_arg, block_arg, block_len_arg,
 
4433
  :Append_block_log_event(session_arg, db_arg, block_arg, block_len_arg,
4531
4434
                          using_trans)
4532
4435
{
4533
 
   file_id= thd_arg->file_id= mysql_bin_log.next_file_id();
 
4436
   file_id= session_arg->file_id= mysql_bin_log.next_file_id();
4534
4437
}
4535
4438
 
4536
4439
 
4565
4468
 
4566
4469
 
4567
4470
Execute_load_query_log_event::
4568
 
Execute_load_query_log_event(THD *thd_arg, const char* query_arg,
 
4471
Execute_load_query_log_event(Session *session_arg, const char* query_arg,
4569
4472
                             ulong query_length_arg, uint32_t fn_pos_start_arg,
4570
4473
                             uint32_t fn_pos_end_arg,
4571
4474
                             enum_load_dup_handling dup_handling_arg,
4572
4475
                             bool using_trans, bool suppress_use,
4573
 
                             THD::killed_state killed_err_arg):
4574
 
  Query_log_event(thd_arg, query_arg, query_length_arg, using_trans,
 
4476
                             Session::killed_state killed_err_arg):
 
4477
  Query_log_event(session_arg, query_arg, query_length_arg, using_trans,
4575
4478
                  suppress_use, killed_err_arg),
4576
 
  file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg),
 
4479
  file_id(session_arg->file_id), fn_pos_start(fn_pos_start_arg),
4577
4480
  fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
4578
4481
{
4579
4482
}
4796
4699
        Rows_log_event member functions
4797
4700
**************************************************************************/
4798
4701
 
4799
 
Rows_log_event::Rows_log_event(THD *thd_arg, Table *tbl_arg, ulong tid,
 
4702
Rows_log_event::Rows_log_event(Session *session_arg, Table *tbl_arg, ulong tid,
4800
4703
                               MY_BITMAP const *cols, bool is_transactional)
4801
 
  : Log_event(thd_arg, 0, is_transactional),
 
4704
  : Log_event(session_arg, 0, is_transactional),
4802
4705
    m_row_count(0),
4803
4706
    m_table(tbl_arg),
4804
4707
    m_table_id(tid),
4814
4717
   */
4815
4718
  assert((tbl_arg && tbl_arg->s && tid != UINT32_MAX) || (!tbl_arg && !cols && tid == UINT32_MAX));
4816
4719
 
4817
 
  if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
 
4720
  if (session_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
4818
4721
      set_flags(NO_FOREIGN_KEY_CHECKS_F);
4819
 
  if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
 
4722
  if (session_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
4820
4723
      set_flags(RELAXED_UNIQUE_CHECKS_F);
4821
4724
  /* if bitmap_init fails, caught in is_valid() */
4822
4725
  if (likely(!bitmap_init(&m_cols,
5032
4935
    assert(get_flags(STMT_END_F));
5033
4936
 
5034
4937
    const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5035
 
    close_thread_tables(thd);
5036
 
    thd->clear_error();
 
4938
    close_thread_tables(session);
 
4939
    session->clear_error();
5037
4940
    return(0);
5038
4941
  }
5039
4942
 
5040
4943
  /*
5041
 
    'thd' has been set by exec_relay_log_event(), just before calling
 
4944
    'session' has been set by exec_relay_log_event(), just before calling
5042
4945
    do_apply_event(). We still check here to prevent future coding
5043
4946
    errors.
5044
4947
  */
5045
 
  assert(rli->sql_thd == thd);
 
4948
  assert(rli->sql_session == session);
5046
4949
 
5047
4950
  /*
5048
4951
    If there is no locks taken, this is the first binrow event seen
5050
4953
    used in the transaction and proceed with execution of the actual
5051
4954
    event.
5052
4955
  */
5053
 
  if (!thd->lock)
 
4956
  if (!session->lock)
5054
4957
  {
5055
4958
    bool need_reopen= 1; /* To execute the first lap of the loop below */
5056
4959
 
5057
4960
    /*
5058
 
      lock_tables() reads the contents of thd->lex, so they must be
 
4961
      lock_tables() reads the contents of session->lex, so they must be
5059
4962
      initialized. Contrary to in
5060
4963
      Table_map_log_event::do_apply_event() we don't call
5061
4964
      mysql_init_query() as that may reset the binlog format.
5062
4965
    */
5063
 
    lex_start(thd);
 
4966
    lex_start(session);
5064
4967
 
5065
4968
    /*
5066
4969
      There are a few flags that are replicated with each row event.
5068
4971
      the event.
5069
4972
    */
5070
4973
    if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
5071
 
        thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
 
4974
        session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
5072
4975
    else
5073
 
        thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
 
4976
        session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
5074
4977
 
5075
4978
    if (get_flags(RELAXED_UNIQUE_CHECKS_F))
5076
 
        thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
 
4979
        session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
5077
4980
    else
5078
 
        thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
 
4981
        session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
5079
4982
    /* A small test to verify that objects have consistent types */
5080
 
    assert(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
5081
 
 
5082
 
 
5083
 
    while ((error= lock_tables(thd, rli->tables_to_lock,
 
4983
    assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
 
4984
 
 
4985
 
 
4986
    while ((error= lock_tables(session, rli->tables_to_lock,
5084
4987
                               rli->tables_to_lock_count, &need_reopen)))
5085
4988
    {
5086
4989
      if (!need_reopen)
5087
4990
      {
5088
 
        if (thd->is_slave_error || thd->is_fatal_error)
 
4991
        if (session->is_slave_error || session->is_fatal_error)
5089
4992
        {
5090
4993
          /*
5091
4994
            Error reporting borrowed from Query_log_event with many excessive
5092
4995
            simplifications (we don't honour --slave-skip-errors)
5093
4996
          */
5094
 
          uint32_t actual_error= thd->main_da.sql_errno();
 
4997
          uint32_t actual_error= session->main_da.sql_errno();
5095
4998
          rli->report(ERROR_LEVEL, actual_error,
5096
4999
                      _("Error '%s' in %s event: when locking tables"),
5097
5000
                      (actual_error
5098
 
                       ? thd->main_da.message()
 
5001
                       ? session->main_da.message()
5099
5002
                       : _("unexpected success or fatal error")),
5100
5003
                      get_type_str());
5101
 
          thd->is_fatal_error= 1;
 
5004
          session->is_fatal_error= 1;
5102
5005
        }
5103
5006
        else
5104
5007
        {
5123
5026
        NOTE: For this new scheme there should be no pending event:
5124
5027
        need to add code to assert that is the case.
5125
5028
       */
5126
 
      thd->binlog_flush_pending_rows_event(false);
 
5029
      session->binlog_flush_pending_rows_event(false);
5127
5030
      TableList *tables= rli->tables_to_lock;
5128
 
      close_tables_for_reopen(thd, &tables);
 
5031
      close_tables_for_reopen(session, &tables);
5129
5032
 
5130
5033
      uint32_t tables_count= rli->tables_to_lock_count;
5131
 
      if ((error= open_tables(thd, &tables, &tables_count, 0)))
 
5034
      if ((error= open_tables(session, &tables, &tables_count, 0)))
5132
5035
      {
5133
 
        if (thd->is_slave_error || thd->is_fatal_error)
 
5036
        if (session->is_slave_error || session->is_fatal_error)
5134
5037
        {
5135
5038
          /*
5136
5039
            Error reporting borrowed from Query_log_event with many excessive
5137
5040
            simplifications (we don't honour --slave-skip-errors)
5138
5041
          */
5139
 
          uint32_t actual_error= thd->main_da.sql_errno();
 
5042
          uint32_t actual_error= session->main_da.sql_errno();
5140
5043
          rli->report(ERROR_LEVEL, actual_error,
5141
5044
                      _("Error '%s' on reopening tables"),
5142
5045
                      (actual_error
5143
 
                       ? thd->main_da.message()
 
5046
                       ? session->main_da.message()
5144
5047
                       : _("unexpected success or fatal error")));
5145
 
          thd->is_slave_error= 1;
 
5048
          session->is_slave_error= 1;
5146
5049
        }
5147
5050
        const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5148
5051
        return(error);
5163
5066
      {
5164
5067
        if (ptr->m_tabledef.compatible_with(rli, ptr->table))
5165
5068
        {
5166
 
          mysql_unlock_tables(thd, thd->lock);
5167
 
          thd->lock= 0;
5168
 
          thd->is_slave_error= 1;
 
5069
          mysql_unlock_tables(session, session->lock);
 
5070
          session->lock= 0;
 
5071
          session->is_slave_error= 1;
5169
5072
          const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5170
5073
          return(ERR_BAD_TABLE_DEF);
5171
5074
        }
5212
5115
      TIMESTAMP column to a table with one.
5213
5116
      So we call set_time(), like in SBR. Presently it changes nothing.
5214
5117
    */
5215
 
    thd->set_time((time_t)when);
 
5118
    session->set_time((time_t)when);
5216
5119
    /*
5217
5120
      There are a few flags that are replicated with each row event.
5218
5121
      Make sure to set/clear them before executing the main body of
5219
5122
      the event.
5220
5123
    */
5221
5124
    if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
5222
 
        thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
 
5125
        session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
5223
5126
    else
5224
 
        thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
 
5127
        session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
5225
5128
 
5226
5129
    if (get_flags(RELAXED_UNIQUE_CHECKS_F))
5227
 
        thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
 
5130
        session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
5228
5131
    else
5229
 
        thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
 
5132
        session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
5230
5133
    
5231
5134
    if (slave_allow_batching)
5232
 
      thd->options|= OPTION_ALLOW_BATCH;
 
5135
      session->options|= OPTION_ALLOW_BATCH;
5233
5136
    else
5234
 
      thd->options&= ~OPTION_ALLOW_BATCH;
 
5137
      session->options&= ~OPTION_ALLOW_BATCH;
5235
5138
    
5236
5139
    /* A small test to verify that objects have consistent types */
5237
 
    assert(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
 
5140
    assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
5238
5141
 
5239
5142
    /*
5240
5143
      Now we are in a statement and will stay in a statement until we
5275
5178
    while (error == 0 && m_curr_row < m_rows_end)
5276
5179
    {
5277
5180
      /* in_use can have been set to NULL in close_tables_for_reopen */
5278
 
      THD* old_thd= table->in_use;
 
5181
      Session* old_session= table->in_use;
5279
5182
      if (!table->in_use)
5280
 
        table->in_use= thd;
 
5183
        table->in_use= session;
5281
5184
 
5282
5185
      error= do_exec_row(rli);
5283
5186
 
5284
 
      table->in_use = old_thd;
 
5187
      table->in_use = old_session;
5285
5188
      switch (error)
5286
5189
      {
5287
5190
      case 0:
5311
5214
        if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5312
5215
        {
5313
5216
          if (global_system_variables.log_warnings)
5314
 
            slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
 
5217
            slave_rows_error_report(WARNING_LEVEL, error, rli, session, table,
5315
5218
                                    get_type_str(),
5316
5219
                                    RPL_LOG_NAME, (ulong) log_pos);
5317
5220
          error= 0;
5319
5222
        break;
5320
5223
        
5321
5224
      default:
5322
 
        thd->is_slave_error= 1;
 
5225
        session->is_slave_error= 1;
5323
5226
        break;
5324
5227
      }
5325
5228
 
5344
5247
    error= do_after_row_operations(rli, error);
5345
5248
    if (!cache_stmt)
5346
5249
    {
5347
 
      thd->options|= OPTION_KEEP_LOG;
 
5250
      session->options|= OPTION_KEEP_LOG;
5348
5251
    }
5349
5252
  } // if (table)
5350
5253
 
5355
5258
  if (rli->tables_to_lock && get_flags(STMT_END_F))
5356
5259
    const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5357
5260
  /* reset OPTION_ALLOW_BATCH as not affect later events */
5358
 
  thd->options&= ~OPTION_ALLOW_BATCH;
 
5261
  session->options&= ~OPTION_ALLOW_BATCH;
5359
5262
  
5360
5263
  if (error)
5361
5264
  {                     /* error has occured during the transaction */
5362
 
    slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
 
5265
    slave_rows_error_report(ERROR_LEVEL, error, rli, session, table,
5363
5266
                            get_type_str(), RPL_LOG_NAME, (ulong) log_pos);
5364
5267
  }
5365
5268
  if (error)
5375
5278
      thread is certainly going to stop.
5376
5279
      rollback at the caller along with sbr.
5377
5280
    */
5378
 
    thd->reset_current_stmt_binlog_row_based();
5379
 
    const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
5380
 
    thd->is_slave_error= 1;
 
5281
    session->reset_current_stmt_binlog_row_based();
 
5282
    const_cast<Relay_log_info*>(rli)->cleanup_context(session, error);
 
5283
    session->is_slave_error= 1;
5381
5284
    return(error);
5382
5285
  }
5383
5286
 
5447
5350
      (assume the last master's transaction is ignored by the slave because of
5448
5351
      replicate-ignore rules).
5449
5352
    */
5450
 
    thd->binlog_flush_pending_rows_event(true);
 
5353
    session->binlog_flush_pending_rows_event(true);
5451
5354
 
5452
5355
    /*
5453
5356
      If this event is not in a transaction, the call below will, if some
5458
5361
      are involved, commit the transaction and flush the pending event to the
5459
5362
      binlog.
5460
5363
    */
5461
 
    error= ha_autocommit_or_rollback(thd, 0);
 
5364
    error= ha_autocommit_or_rollback(session, 0);
5462
5365
 
5463
5366
    /*
5464
5367
      Now what if this is not a transactional engine? we still need to
5465
5368
      flush the pending event to the binlog; we did it with
5466
 
      thd->binlog_flush_pending_rows_event(). Note that we imitate
 
5369
      session->binlog_flush_pending_rows_event(). Note that we imitate
5467
5370
      what is done for real queries: a call to
5468
5371
      ha_autocommit_or_rollback() (sometimes only if involves a
5469
5372
      transactional engine), and a call to be sure to have the pending
5470
5373
      event flushed.
5471
5374
    */
5472
5375
 
5473
 
    thd->reset_current_stmt_binlog_row_based();
 
5376
    session->reset_current_stmt_binlog_row_based();
5474
5377
 
5475
 
    rli->cleanup_context(thd, 0);
 
5378
    rli->cleanup_context(session, 0);
5476
5379
    if (error == 0)
5477
5380
    {
5478
5381
      /*
5483
5386
      rli->stmt_done(log_pos, when);
5484
5387
 
5485
5388
      /*
5486
 
        Clear any errors pushed in thd->net.last_err* if for example "no key
 
5389
        Clear any errors pushed in session->net.last_err* if for example "no key
5487
5390
        found" (as this is allowed). This is a safety measure; apparently
5488
5391
        those errors (e.g. when executing a Delete_rows_log_event of a
5489
5392
        non-existing row, like in rpl_row_mystery22.test,
5490
 
        thd->net.last_error = "Can't find record in 't1'" and last_errno=1032)
 
5393
        session->net.last_error = "Can't find record in 't1'" and last_errno=1032)
5491
5394
        do not become visible. We still prefer to wipe them out.
5492
5395
      */
5493
 
      thd->clear_error();
 
5396
      session->clear_error();
5494
5397
    }
5495
5398
    else
5496
5399
      rli->report(ERROR_LEVEL, error,
5631
5534
  Mats says tbl->s lives longer than this event so it's ok to copy pointers
5632
5535
  (tbl->s->db etc) and not pointer content.
5633
5536
 */
5634
 
Table_map_log_event::Table_map_log_event(THD *thd, Table *tbl, ulong tid,
 
5537
Table_map_log_event::Table_map_log_event(Session *session, Table *tbl, ulong tid,
5635
5538
                                         bool is_transactional __attribute__((unused)),
5636
5539
                                         uint16_t flags)
5637
 
  : Log_event(thd, 0, true),
 
5540
  : Log_event(session, 0, true),
5638
5541
    m_table(tbl),
5639
5542
    m_dbnam(tbl->s->db.str),
5640
5543
    m_dblen(m_dbnam ? tbl->s->db.length : 0),
5827
5730
  char *db_mem, *tname_mem;
5828
5731
  size_t dummy_len;
5829
5732
  void *memory;
5830
 
  assert(rli->sql_thd == thd);
 
5733
  assert(rli->sql_session == session);
5831
5734
 
5832
5735
  /* Step the query id to mark what columns that are actually used. */
5833
5736
  pthread_mutex_lock(&LOCK_thread_count);
5834
 
  thd->query_id= next_query_id();
 
5737
  session->query_id= next_query_id();
5835
5738
  pthread_mutex_unlock(&LOCK_thread_count);
5836
5739
 
5837
5740
  if (!(memory= my_multi_malloc(MYF(MY_WME),
5861
5764
  else
5862
5765
  {
5863
5766
    /*
5864
 
      open_tables() reads the contents of thd->lex, so they must be
 
5767
      open_tables() reads the contents of session->lex, so they must be
5865
5768
      initialized, so we should call lex_start(); to be even safer, we
5866
5769
      call mysql_init_query() which does a more complete set of inits.
5867
5770
    */
5868
 
    lex_start(thd);
5869
 
    mysql_reset_thd_for_next_command(thd);
 
5771
    lex_start(session);
 
5772
    mysql_reset_session_for_next_command(session);
5870
5773
    /*
5871
5774
      Check if the slave is set to use SBR.  If so, it should switch
5872
5775
      to using RBR until the end of the "statement", i.e., next
5873
5776
      STMT_END_F or next error.
5874
5777
    */
5875
 
    if (!thd->current_stmt_binlog_row_based &&
5876
 
        mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG))
 
5778
    if (!session->current_stmt_binlog_row_based &&
 
5779
        mysql_bin_log.is_open() && (session->options & OPTION_BIN_LOG))
5877
5780
    {
5878
 
      thd->set_current_stmt_binlog_row_based();
 
5781
      session->set_current_stmt_binlog_row_based();
5879
5782
    }
5880
5783
 
5881
5784
    /*
5886
5789
      The creation of a new TableList is used to up-cast the
5887
5790
      table_list consisting of RPL_TableList items. This will work
5888
5791
      since the only case where the argument to open_tables() is
5889
 
      changed, is when thd->lex->query_tables == table_list, i.e.,
 
5792
      changed, is when session->lex->query_tables == table_list, i.e.,
5890
5793
      when the statement requires prelocking. Since this is not
5891
5794
      executed when a statement is executed, this case will not occur.
5892
5795
      As a precaution, an assertion is added to ensure that the bad
5897
5800
      of the pointer to make sure that it's not lost.
5898
5801
    */
5899
5802
    uint32_t count;
5900
 
    assert(thd->lex->query_tables != table_list);
 
5803
    assert(session->lex->query_tables != table_list);
5901
5804
    TableList *tmp_table_list= table_list;
5902
 
    if ((error= open_tables(thd, &tmp_table_list, &count, 0)))
 
5805
    if ((error= open_tables(session, &tmp_table_list, &count, 0)))
5903
5806
    {
5904
 
      if (thd->is_slave_error || thd->is_fatal_error)
 
5807
      if (session->is_slave_error || session->is_fatal_error)
5905
5808
      {
5906
5809
        /*
5907
5810
          Error reporting borrowed from Query_log_event with many excessive
5908
5811
          simplifications (we don't honour --slave-skip-errors)
5909
5812
        */
5910
 
        uint32_t actual_error= thd->main_da.sql_errno();
 
5813
        uint32_t actual_error= session->main_da.sql_errno();
5911
5814
        rli->report(ERROR_LEVEL, actual_error,
5912
5815
                    _("Error '%s' on opening table `%s`.`%s`"),
5913
5816
                    (actual_error
5914
 
                     ? thd->main_da.message()
 
5817
                     ? session->main_da.message()
5915
5818
                     : _("unexpected success or fatal error")),
5916
5819
                    table_list->db, table_list->table_name);
5917
 
        thd->is_slave_error= 1;
 
5820
        session->is_slave_error= 1;
5918
5821
      }
5919
5822
      goto err;
5920
5823
    }
6038
5941
/*
6039
5942
  Constructor used to build an event for writing to the binary log.
6040
5943
 */
6041
 
Write_rows_log_event::Write_rows_log_event(THD *thd_arg, Table *tbl_arg,
 
5944
Write_rows_log_event::Write_rows_log_event(Session *session_arg, Table *tbl_arg,
6042
5945
                                           ulong tid_arg,
6043
5946
                                           bool is_transactional)
6044
 
  : Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
 
5947
  : Rows_log_event(session_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
6045
5948
{
6046
5949
}
6047
5950
 
6073
5976
    */
6074
5977
    
6075
5978
    /* Tell the storage engine that we are using REPLACE semantics. */
6076
 
    thd->lex->duplicates= DUP_REPLACE;
 
5979
    session->lex->duplicates= DUP_REPLACE;
6077
5980
    
6078
5981
    /*
6079
5982
      Pretend we're executing a REPLACE command: this is needed for
6080
5983
      InnoDB since it is not (properly) checking the
6081
5984
      lex->duplicates flag.
6082
5985
    */
6083
 
    thd->lex->sql_command= SQLCOM_REPLACE;
 
5986
    session->lex->sql_command= SQLCOM_REPLACE;
6084
5987
    /* 
6085
5988
       Do not raise the error flag in case of hitting to an unique attribute
6086
5989
    */
6209
6112
Rows_log_event::write_row(const Relay_log_info *const rli,
6210
6113
                          const bool overwrite)
6211
6114
{
6212
 
  assert(m_table != NULL && thd != NULL);
 
6115
  assert(m_table != NULL && session != NULL);
6213
6116
 
6214
6117
  Table *table= m_table;  // pointer to event's table
6215
6118
  int error;
6381
6284
}
6382
6285
 
6383
6286
 
 
6287
int Rows_log_event::unpack_current_row(const Relay_log_info *const rli,
 
6288
                                         MY_BITMAP const *cols)
 
6289
{
 
6290
  assert(m_table);
 
6291
  ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT);
 
6292
  int const result= ::unpack_row(rli, m_table, m_width, m_curr_row, cols,
 
6293
                                 &m_curr_row_end, &m_master_reclength);
 
6294
  if (m_curr_row_end > m_rows_end)
 
6295
    my_error(ER_SLAVE_CORRUPT_EVENT, MYF(0));
 
6296
  ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT);
 
6297
  return result;
 
6298
}
 
6299
 
 
6300
 
6384
6301
int 
6385
6302
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6386
6303
{
6389
6306
    write_row(rli,        /* if 1 then overwrite */
6390
6307
              bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
6391
6308
    
6392
 
  if (error && !thd->is_error())
 
6309
  if (error && !session->is_error())
6393
6310
  {
6394
6311
    assert(0);
6395
6312
    my_error(ER_UNKNOWN_ERROR, MYF(0));
6704
6621
  Constructor used to build an event for writing to the binary log.
6705
6622
 */
6706
6623
 
6707
 
Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, Table *tbl_arg,
 
6624
Delete_rows_log_event::Delete_rows_log_event(Session *session_arg, Table *tbl_arg,
6708
6625
                                             ulong tid,
6709
6626
                                             bool is_transactional)
6710
 
  : Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
 
6627
  : Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
6711
6628
{
6712
6629
}
6713
6630
 
6780
6697
/*
6781
6698
  Constructor used to build an event for writing to the binary log.
6782
6699
 */
6783
 
Update_rows_log_event::Update_rows_log_event(THD *thd_arg, Table *tbl_arg,
 
6700
Update_rows_log_event::Update_rows_log_event(Session *session_arg, Table *tbl_arg,
6784
6701
                                             ulong tid,
6785
6702
                                             bool is_transactional)
6786
 
: Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
 
6703
: Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
6787
6704
{
6788
6705
  init(tbl_arg->write_set);
6789
6706
}