~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/log_event.cc

  • Committer: Monty Taylor
  • Date: 2008-10-27 23:19:48 UTC
  • mto: (520.4.12 merge-innodb-plugin)
  • mto: This revision was merged to the branch mainline in revision 563.
  • Revision ID: monty@inaugust.com-20081027231948-3kl6ss04plbakqcr
Split some more things out of common_includes.h.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems
5
 
 *
6
 
 *  This program is free software; you can redistribute it and/or modify
7
 
 *  it under the terms of the GNU General Public License as published by
8
 
 *  the Free Software Foundation; version 2 of the License.
9
 
 *
10
 
 *  This program is distributed in the hope that it will be useful,
11
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 *  GNU General Public License for more details.
14
 
 *
15
 
 *  You should have received a copy of the GNU General Public License
16
 
 *  along with this program; if not, write to the Free Software
17
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
 
 */
 
1
/* Copyright (C) 2000-2004 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
19
16
 
20
17
#include <drizzled/server_includes.h>
21
 
#include <drizzled/log_event.h>
22
 
#include <drizzled/replication/rli.h>
23
 
#include <drizzled/replication/mi.h>
24
 
#include <libdrizzle/libdrizzle.h>
25
 
#include <mysys/hash.h>
26
 
#include <drizzled/replication/utility.h>
27
 
#include <drizzled/replication/record.h>
 
18
#include "rpl_rli.h"
 
19
#include "rpl_mi.h"
 
20
#include "rpl_filter.h"
 
21
#include "rpl_utility.h"
 
22
#include "rpl_record.h"
28
23
#include <mysys/my_dir.h>
29
24
#include <drizzled/error.h>
30
25
#include <libdrizzle/pack.h>
31
 
#include <drizzled/sql_parse.h>
32
 
#include <drizzled/sql_base.h>
33
 
#include <drizzled/sql_load.h>
34
 
#include <drizzled/item/return_int.h>
35
 
#include <drizzled/item/empty_string.h>
36
26
 
37
27
#include <algorithm>
38
 
#include <string>
39
28
 
40
29
#include <mysys/base64.h>
41
30
#include <mysys/my_bitmap.h>
44
33
#include <libdrizzle/libdrizzle.h>
45
34
#include <drizzled/error.h>
46
35
#include <drizzled/query_id.h>
47
 
#include <drizzled/tztime.h>
48
 
#include <drizzled/slave.h>
49
 
#include <drizzled/lock.h>
50
 
 
51
 
using namespace std;
 
36
 
 
37
#define log_cs  &my_charset_utf8_general_ci
 
38
 
52
39
 
53
40
static const char *HA_ERR(int i)
54
41
{
139
126
    len= snprintf(slider, buff_end - slider,
140
127
                  _(" %s, Error_code: %d;"), err->msg, err->code);
141
128
  }
142
 
 
 
129
  
143
130
  rli->report(level, session->is_error()? session->main_da.sql_errno() : 0,
144
131
              _("Could not execute %s event on table %s.%s;"
145
132
                "%s handler error %s; "
310
297
  buf= int10_to_str(event_server_id, buf, 10);
311
298
  *buf++ = '-';
312
299
  res= int10_to_str(file_id, buf, 10);
313
 
  strcpy(res, ext);                             // Add extension last
 
300
  my_stpcpy(res, ext);                             // Add extension last
314
301
  return res;                                   // Pointer to extension
315
302
}
316
303
 
329
316
  if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
330
317
    return;
331
318
 
332
 
  /*
 
319
  /* 
333
320
     When we are deleting temporary files, we should only remove
334
321
     the files associated with the server id of our server.
335
322
     We don't use event_server_id here because since we've disabled
336
323
     direct binlogging of Create_file/Append_file/Exec_load events
337
 
     we cannot meet Start_log event in the middle of events from one
 
324
     we cannot meet Start_log event in the middle of events from one 
338
325
     LOAD DATA.
339
326
  */
340
 
  p= strncpy(prefbuf, STRING_WITH_LEN("SQL_LOAD-")) + 9;
 
327
  p= strmake(prefbuf, STRING_WITH_LEN("SQL_LOAD-"));
341
328
  p= int10_to_str(::server_id, p, 10);
342
329
  *(p++)= '-';
343
330
  *p= 0;
398
385
    to= octet2hex(to, from, len);
399
386
  }
400
387
  else
401
 
    to= strcpy(to, "\"\"")+2;
 
388
    to= my_stpcpy(to, "\"\"");
402
389
  return to;                               // pointer to end 0 of 'to'
403
390
}
404
391
 
449
436
  case STOP_EVENT:   return "Stop";
450
437
  case QUERY_EVENT:  return "Query";
451
438
  case ROTATE_EVENT: return "Rotate";
 
439
  case INTVAR_EVENT: return "Intvar";
452
440
  case LOAD_EVENT:   return "Load";
453
441
  case NEW_LOAD_EVENT:   return "New_load";
454
442
  case SLAVE_EVENT:  return "Slave";
456
444
  case APPEND_BLOCK_EVENT: return "Append_block";
457
445
  case DELETE_FILE_EVENT: return "Delete_file";
458
446
  case EXEC_LOAD_EVENT: return "Exec_load";
 
447
  case RAND_EVENT: return "RAND";
459
448
  case XID_EVENT: return "Xid";
 
449
  case USER_VAR_EVENT: return "User var";
460
450
  case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
461
451
  case TABLE_MAP_EVENT: return "Table_map";
 
452
  case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old";
 
453
  case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old";
 
454
  case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old";
462
455
  case WRITE_ROWS_EVENT: return "Write_rows";
463
456
  case UPDATE_ROWS_EVENT: return "Update_rows";
464
457
  case DELETE_ROWS_EVENT: return "Delete_rows";
499
492
  :temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
500
493
   session(0)
501
494
{
502
 
  server_id= ::server_id;
 
495
  server_id=    ::server_id;
503
496
  /*
504
 
    We can't call time() here as this would cause a call before
 
497
    We can't call my_time() here as this would cause a call before
505
498
    my_init() is called
506
499
  */
507
500
  when=         0;
604
597
    if (debug_not_change_ts_if_art_event == 1
605
598
        && is_artificial_event())
606
599
      debug_not_change_ts_if_art_event= 0;
607
 
    rli->stmt_done(log_pos,
 
600
    rli->stmt_done(log_pos, 
608
601
                   is_artificial_event() &&
609
602
                   debug_not_change_ts_if_art_event > 0 ? 0 : when);
610
603
    if (debug_not_change_ts_if_art_event == 0)
636
629
}
637
630
 
638
631
 
639
 
const char* Log_event::get_db()
640
 
{
641
 
  return session ? session->db : 0;
642
 
}
643
 
 
644
 
 
645
632
/**
646
633
  init_show_field_list() prepares the column names and types for the
647
634
  output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
652
639
{
653
640
  field_list->push_back(new Item_empty_string("Log_name", 20));
654
641
  field_list->push_back(new Item_return_int("Pos", MY_INT32_NUM_DECIMAL_DIGITS,
655
 
                                            DRIZZLE_TYPE_LONGLONG));
 
642
                                            DRIZZLE_TYPE_LONGLONG));
656
643
  field_list->push_back(new Item_empty_string("Event_type", 20));
657
644
  field_list->push_back(new Item_return_int("Server_id", 10,
658
 
                                            DRIZZLE_TYPE_LONG));
 
645
                                            DRIZZLE_TYPE_LONG));
659
646
  field_list->push_back(new Item_return_int("End_log_pos",
660
647
                                            MY_INT32_NUM_DECIMAL_DIGITS,
661
 
                                            DRIZZLE_TYPE_LONGLONG));
 
648
                                            DRIZZLE_TYPE_LONGLONG));
662
649
  field_list->push_back(new Item_empty_string("Info", 20));
663
650
}
664
651
 
739
726
}
740
727
 
741
728
 
742
 
time_t Log_event::get_time()
743
 
{
744
 
  Session *tmp_session;
745
 
  if (when)
746
 
    return when;
747
 
  if (session)
748
 
    return session->start_time;
749
 
  if ((tmp_session= current_session))
750
 
    return tmp_session->start_time;
751
 
  return time(0);
752
 
}
753
 
 
754
 
 
755
729
/**
756
730
  This needn't be format-tolerant, because we only read
757
731
  LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length).
758
732
*/
759
733
 
760
734
int Log_event::read_log_event(IO_CACHE* file, String* packet,
761
 
                              pthread_mutex_t* log_lock)
 
735
                              pthread_mutex_t* log_lock)
762
736
{
763
737
  ulong data_len;
764
738
  int result=0;
882
856
  }
883
857
 
884
858
  // some events use the extra byte to null-terminate strings
885
 
  if (!(buf = (char*) malloc(data_len+1)))
 
859
  if (!(buf = (char*) my_malloc(data_len+1, MYF(MY_WME))))
886
860
  {
887
861
    error = "Out of memory";
888
862
    goto err;
902
876
  if (!res)
903
877
  {
904
878
    assert(error != 0);
905
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Error in Log_event::read_log_event(): "
 
879
    sql_print_error(_("Error in Log_event::read_log_event(): "
906
880
                    "'%s', data_len: %d, event_type: %d"),
907
881
                    error,data_len,head[EVENT_TYPE_OFFSET]);
908
882
    free(buf);
997
971
    case STOP_EVENT:
998
972
      ev = new Stop_log_event(buf, description_event);
999
973
      break;
 
974
    case INTVAR_EVENT:
 
975
      ev = new Intvar_log_event(buf, description_event);
 
976
      break;
1000
977
    case XID_EVENT:
1001
978
      ev = new Xid_log_event(buf, description_event);
1002
979
      break;
 
980
    case RAND_EVENT:
 
981
      ev = new Rand_log_event(buf, description_event);
 
982
      break;
 
983
    case USER_VAR_EVENT:
 
984
      ev = new User_var_log_event(buf, description_event);
 
985
      break;
1003
986
    case FORMAT_DESCRIPTION_EVENT:
1004
987
      ev = new Format_description_log_event(buf, event_len, description_event);
1005
988
      break;
1032
1015
 
1033
1016
  /*
1034
1017
    is_valid() are small event-specific sanity tests which are
1035
 
    important; for example there are some malloc() in constructors
 
1018
    important; for example there are some my_malloc() in constructors
1036
1019
    (e.g. Query_log_event::Query_log_event(char*...)); when these
1037
 
    malloc() fail we can't return an error out of the constructor
 
1020
    my_malloc() fail we can't return an error out of the constructor
1038
1021
    (because constructor is "void") ; so instead we leave the pointer we
1039
1022
    wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
1040
1023
    Same for Format_description_log_event, member 'post_header_len'.
1045
1028
    *error= "Found invalid event in binary log";
1046
1029
    return(0);
1047
1030
  }
1048
 
  return(ev);
 
1031
  return(ev);  
1049
1032
}
1050
1033
 
1051
1034
inline Log_event::enum_skip_reason
1073
1056
{
1074
1057
  // TODO: show the catalog ??
1075
1058
  char *buf, *pos;
1076
 
  if (!(buf= (char*) malloc(9 + db_len + q_len)))
 
1059
  if (!(buf= (char*) my_malloc(9 + db_len + q_len, MYF(MY_WME))))
1077
1060
    return;
1078
1061
  pos= buf;
1079
1062
  if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
1080
1063
      && db && db_len)
1081
1064
  {
1082
 
    pos= strcpy(buf, "use `")+5;
 
1065
    pos= my_stpcpy(buf, "use `");
1083
1066
    memcpy(pos, db, db_len);
1084
 
    pos= strcpy(pos+db_len, "`; ")+3;
 
1067
    pos= my_stpcpy(pos+db_len, "`; ");
1085
1068
  }
1086
1069
  if (query && q_len)
1087
1070
  {
1094
1077
 
1095
1078
 
1096
1079
/**
 
1080
  Utility function for the next method (Query_log_event::write()) .
 
1081
*/
 
1082
static void write_str_with_code_and_len(char **dst, const char *src,
 
1083
                                        int len, uint32_t code)
 
1084
{
 
1085
  assert(src);
 
1086
  *((*dst)++)= code;
 
1087
  *((*dst)++)= (unsigned char) len;
 
1088
  memcpy(*dst, src, len);
 
1089
  (*dst)+= len;
 
1090
}
 
1091
 
 
1092
 
 
1093
/**
1097
1094
  Query_log_event::write().
1098
1095
 
1099
1096
  @note
1178
1175
    int4store(start, flags2);
1179
1176
    start+= 4;
1180
1177
  }
 
1178
  if (sql_mode_inited)
 
1179
  {
 
1180
    *start++= Q_SQL_MODE_CODE;
 
1181
    int8store(start, (uint64_t)sql_mode);
 
1182
    start+= 8;
 
1183
  }
 
1184
  if (catalog_len) // i.e. this var is inited (false for 4.0 events)
 
1185
  {
 
1186
    write_str_with_code_and_len((char **)(&start),
 
1187
                                catalog, catalog_len, Q_CATALOG_NZ_CODE);
 
1188
    /*
 
1189
      In 5.0.x where x<4 masters we used to store the end zero here. This was
 
1190
      a waste of one byte so we don't do it in x>=4 masters. We change code to
 
1191
      Q_CATALOG_NZ_CODE, because re-using the old code would make x<4 slaves
 
1192
      of this x>=4 master segfault (expecting a zero when there is
 
1193
      none). Remaining compatibility problems are: the older slave will not
 
1194
      find the catalog; but it is will not crash, and it's not an issue
 
1195
      that it does not find the catalog as catalogs were not used in these
 
1196
      older MySQL versions (we store it in binlog and read it from relay log
 
1197
      but do nothing useful with it). What is an issue is that the older slave
 
1198
      will stop processing the Q_* blocks (and jumps to the db/query) as soon
 
1199
      as it sees unknown Q_CATALOG_NZ_CODE; so it will not be able to read
 
1200
      Q_AUTO_INCREMENT*, Q_CHARSET and so replication will fail silently in
 
1201
      various ways. Documented that you should not mix alpha/beta versions if
 
1202
      they are not exactly the same version, with example of 5.0.3->5.0.2 and
 
1203
      5.0.4->5.0.3. If replication is from older to new, the new will
 
1204
      recognize Q_CATALOG_CODE and have no problem.
 
1205
    */
 
1206
  }
 
1207
  if (auto_increment_increment != 1 || auto_increment_offset != 1)
 
1208
  {
 
1209
    *start++= Q_AUTO_INCREMENT;
 
1210
    int2store(start, auto_increment_increment);
 
1211
    int2store(start+2, auto_increment_offset);
 
1212
    start+= 4;
 
1213
  }
 
1214
  if (charset_inited)
 
1215
  {
 
1216
    *start++= Q_CHARSET_CODE;
 
1217
    memcpy(start, charset, 6);
 
1218
    start+= 6;
 
1219
  }
 
1220
  if (time_zone_len)
 
1221
  {
 
1222
    /* In the TZ sys table, column Name is of length 64 so this should be ok */
 
1223
    assert(time_zone_len <= MAX_TIME_ZONE_NAME_LENGTH);
 
1224
    *start++= Q_TIME_ZONE_CODE;
 
1225
    *start++= time_zone_len;
 
1226
    memcpy(start, time_zone_str, time_zone_len);
 
1227
    start+= time_zone_len;
 
1228
  }
1181
1229
  if (lc_time_names_number)
1182
1230
  {
1183
1231
    assert(lc_time_names_number <= 0xFFFF);
1201
1249
    start+= 4;
1202
1250
    }
1203
1251
  */
1204
 
 
 
1252
  
1205
1253
  /* Store length of status variables */
1206
1254
  status_vars_len= (uint) (start-start_of_status);
1207
1255
  assert(status_vars_len <= MAX_SIZE_LOG_EVENT_STATUS);
1225
1273
/**
1226
1274
  The simplest constructor that could possibly work.  This is used for
1227
1275
  creating static objects that have a special meaning and are invisible
1228
 
  to the log.
 
1276
  to the log.  
1229
1277
*/
1230
1278
Query_log_event::Query_log_event()
1231
1279
  :Log_event(), data_buf(0)
1251
1299
  The value for local `killed_status' can be supplied by caller.
1252
1300
*/
1253
1301
Query_log_event::Query_log_event(Session* session_arg, const char* query_arg,
1254
 
                                 ulong query_length, bool using_trans,
1255
 
                                 bool suppress_use,
 
1302
                                 ulong query_length, bool using_trans,
 
1303
                                 bool suppress_use,
1256
1304
                                 Session::killed_state killed_status_arg)
1257
 
:Log_event(session_arg,
1258
 
           (session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0) |
1259
 
           (suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1260
 
           using_trans),
1261
 
  data_buf(0), query(query_arg), catalog(session_arg->catalog),
1262
 
  db(session_arg->db), q_len((uint32_t) query_length),
1263
 
  thread_id(session_arg->thread_id),
1264
 
  /* save the original thread id; we already know the server id */
1265
 
  slave_proxy_id(session_arg->variables.pseudo_thread_id),
1266
 
  flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1267
 
  sql_mode(0),
1268
 
  auto_increment_increment(session_arg->variables.auto_increment_increment),
1269
 
  auto_increment_offset(session_arg->variables.auto_increment_offset),
1270
 
  lc_time_names_number(session_arg->variables.lc_time_names->number),
 
1305
  :Log_event(session_arg,
 
1306
             (session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
 
1307
              0) |
 
1308
             (suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
 
1309
             using_trans),
 
1310
   data_buf(0), query(query_arg), catalog(session_arg->catalog),
 
1311
   db(session_arg->db), q_len((uint32_t) query_length),
 
1312
   thread_id(session_arg->thread_id),
 
1313
   /* save the original thread id; we already know the server id */
 
1314
   slave_proxy_id(session_arg->variables.pseudo_thread_id),
 
1315
   flags2_inited(1), sql_mode_inited(1), charset_inited(1),
 
1316
   sql_mode(0),
 
1317
   auto_increment_increment(session_arg->variables.auto_increment_increment),
 
1318
   auto_increment_offset(session_arg->variables.auto_increment_offset),
 
1319
   lc_time_names_number(session_arg->variables.lc_time_names->number),
1271
1320
   charset_database_number(0)
1272
1321
{
1273
1322
  time_t end_time;
1279
1328
    (killed_status_arg == Session::NOT_KILLED) ?
1280
1329
    (session_arg->is_error() ? session_arg->main_da.sql_errno() : 0) :
1281
1330
    (session_arg->killed_errno());
1282
 
 
 
1331
  
1283
1332
  time(&end_time);
1284
1333
  exec_time = (ulong) (end_time  - session_arg->start_time);
1285
1334
  /**
1291
1340
  db_len = (db) ? (uint32_t) strlen(db) : 0;
1292
1341
  if (session_arg->variables.collation_database != session_arg->db_charset)
1293
1342
    charset_database_number= session_arg->variables.collation_database->number;
1294
 
 
 
1343
  
1295
1344
  /*
1296
1345
    If we don't use flags2 for anything else than options contained in
1297
1346
    session_arg->options, it would be more efficient to flags2=session_arg->options
1300
1349
    we will probably want to reclaim the 29 bits. So we need the &.
1301
1350
  */
1302
1351
  flags2= (uint32_t) (session_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
 
1352
  assert(session_arg->variables.character_set_client->number < 256*256);
 
1353
  assert(session_arg->variables.collation_connection->number < 256*256);
1303
1354
  assert(session_arg->variables.collation_server->number < 256*256);
 
1355
  assert(session_arg->variables.character_set_client->mbminlen == 1);
 
1356
  int2store(charset, session_arg->variables.character_set_client->number);
 
1357
  int2store(charset+2, session_arg->variables.collation_connection->number);
1304
1358
  int2store(charset+4, session_arg->variables.collation_server->number);
1305
 
  time_zone_len= 0;
1306
 
}
1307
 
 
1308
 
static void copy_str_and_move(const char **src,
1309
 
                              Log_event::Byte **dst,
 
1359
  if (session_arg->time_zone_used)
 
1360
  {
 
1361
    /*
 
1362
      Note that our event becomes dependent on the Time_zone object
 
1363
      representing the time zone. Fortunately such objects are never deleted
 
1364
      or changed during mysqld's lifetime.
 
1365
    */
 
1366
    time_zone_len= session_arg->variables.time_zone->get_name()->length();
 
1367
    time_zone_str= session_arg->variables.time_zone->get_name()->ptr();
 
1368
  }
 
1369
  else
 
1370
    time_zone_len= 0;
 
1371
}
 
1372
 
 
1373
 
 
1374
/* 2 utility functions for the next method */
 
1375
 
 
1376
/**
 
1377
   Read a string with length from memory.
 
1378
 
 
1379
   This function reads the string-with-length stored at
 
1380
   <code>src</code> and extract the length into <code>*len</code> and
 
1381
   a pointer to the start of the string into <code>*dst</code>. The
 
1382
   string can then be copied using <code>memcpy()</code> with the
 
1383
   number of bytes given in <code>*len</code>.
 
1384
 
 
1385
   @param src Pointer to variable holding a pointer to the memory to
 
1386
              read the string from.
 
1387
   @param dst Pointer to variable holding a pointer where the actual
 
1388
              string starts. Starting from this position, the string
 
1389
              can be copied using @c memcpy().
 
1390
   @param len Pointer to variable where the length will be stored.
 
1391
   @param end One-past-the-end of the memory where the string is
 
1392
              stored.
 
1393
 
 
1394
   @return    Zero if the entire string can be copied successfully,
 
1395
              @c UINT_MAX if the length could not be read from memory
 
1396
              (that is, if <code>*src >= end</code>), otherwise the
 
1397
              number of bytes that are missing to read the full
 
1398
              string, which happends <code>*dst + *len >= end</code>.
 
1399
*/
 
1400
static int
 
1401
get_str_len_and_pointer(const Log_event::Byte **src,
 
1402
                        const char **dst,
 
1403
                        uint32_t *len,
 
1404
                        const Log_event::Byte *end)
 
1405
{
 
1406
  if (*src >= end)
 
1407
    return -1;       // Will be UINT_MAX in two-complement arithmetics
 
1408
  uint32_t length= **src;
 
1409
  if (length > 0)
 
1410
  {
 
1411
    if (*src + length >= end)
 
1412
      return *src + length - end + 1;       // Number of bytes missing
 
1413
    *dst= (char *)*src + 1;                    // Will be copied later
 
1414
  }
 
1415
  *len= length;
 
1416
  *src+= length + 1;
 
1417
  return 0;
 
1418
}
 
1419
 
 
1420
static void copy_str_and_move(const char **src, 
 
1421
                              Log_event::Byte **dst, 
1310
1422
                              uint32_t len)
1311
1423
{
1312
1424
  memcpy(*dst, *src, len);
1355
1467
 
1356
1468
  common_header_len= description_event->common_header_len;
1357
1469
  post_header_len= description_event->post_header_len[event_type-1];
1358
 
 
 
1470
  
1359
1471
  /*
1360
1472
    We test if the event's length is sensible, and if so we compute data_len.
1361
1473
    We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
1362
1474
    We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
1363
1475
  */
1364
1476
  if (event_len < (uint)(common_header_len + post_header_len))
1365
 
    return;
 
1477
    return;                             
1366
1478
  data_len = event_len - (common_header_len + post_header_len);
1367
1479
  buf+= common_header_len;
1368
 
 
 
1480
  
1369
1481
  slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
1370
1482
  exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
1371
1483
  db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
1376
1488
    Depending on the format, we may or not have affected/warnings etc
1377
1489
    The remnent post-header to be parsed has length:
1378
1490
  */
1379
 
  tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
 
1491
  tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN; 
1380
1492
  if (tmp)
1381
1493
  {
1382
1494
    status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
1401
1513
  */
1402
1514
 
1403
1515
  /* variable-part: the status vars; only in MySQL 5.0  */
1404
 
 
 
1516
  
1405
1517
  start= (Log_event::Byte*) (buf+post_header_len);
1406
1518
  end= (const Log_event::Byte*) (start+status_vars_len);
1407
1519
  for (const Log_event::Byte* pos= start; pos < end;)
1413
1525
      flags2= uint4korr(pos);
1414
1526
      pos+= 4;
1415
1527
      break;
 
1528
    case Q_SQL_MODE_CODE:
 
1529
    {
 
1530
      CHECK_SPACE(pos, end, 8);
 
1531
      sql_mode_inited= 1;
 
1532
      sql_mode= (ulong) uint8korr(pos); // QQ: Fix when sql_mode is uint64_t
 
1533
      pos+= 8;
 
1534
      break;
 
1535
    }
 
1536
    case Q_CATALOG_NZ_CODE:
 
1537
      if (get_str_len_and_pointer(&pos, &catalog, &catalog_len, end))
 
1538
      {
 
1539
        query= 0;
 
1540
        return;
 
1541
      }
 
1542
      break;
 
1543
    case Q_AUTO_INCREMENT:
 
1544
      CHECK_SPACE(pos, end, 4);
 
1545
      auto_increment_increment= uint2korr(pos);
 
1546
      auto_increment_offset=    uint2korr(pos+2);
 
1547
      pos+= 4;
 
1548
      break;
 
1549
    case Q_TIME_ZONE_CODE:
 
1550
    {
 
1551
      if (get_str_len_and_pointer(&pos, &time_zone_str, &time_zone_len, end))
 
1552
      {
 
1553
        query= 0;
 
1554
        return;
 
1555
      }
 
1556
      break;
 
1557
    }
 
1558
    case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
 
1559
      CHECK_SPACE(pos, end, 1);
 
1560
      if ((catalog_len= *pos))
 
1561
        catalog= (char*) pos+1;                           // Will be copied later
 
1562
      CHECK_SPACE(pos, end, catalog_len + 2);
 
1563
      pos+= catalog_len+2; // leap over end 0
 
1564
      catalog_nz= 0; // catalog has end 0 in event
 
1565
      break;
1416
1566
    case Q_LC_TIME_NAMES_CODE:
1417
1567
      CHECK_SPACE(pos, end, 2);
1418
1568
      lc_time_names_number= uint2korr(pos);
1428
1578
      pos= (const unsigned char*) end;                         // Break loop
1429
1579
    }
1430
1580
  }
1431
 
 
1432
 
  if (!(start= data_buf = (Log_event::Byte*) malloc(catalog_len + 1 +
 
1581
  
 
1582
  if (!(start= data_buf = (Log_event::Byte*) my_malloc(catalog_len + 1 +
1433
1583
                                             time_zone_len + 1 +
1434
 
                                             data_len + 1)))
 
1584
                                             data_len + 1,
 
1585
                                             MYF(MY_WME))))
1435
1586
      return;
1436
1587
  if (catalog_len)                                  // If catalog is given
1437
1588
  {
1459
1610
    my_alloc call above? /sven
1460
1611
  */
1461
1612
 
1462
 
  /* A 2nd variable part; this is common to all versions */
 
1613
  /* A 2nd variable part; this is common to all versions */ 
1463
1614
  memcpy(start, end, data_len);          // Copy db and query
1464
1615
  start[data_len]= '\0';              // End query with \0 (For safetly)
1465
1616
  db= (char *)start;
1485
1636
  @code
1486
1637
     if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1487
1638
     {
1488
 
     errmsg_printf(ERRMSG_LVL_ERROR, "Slave: did not get the expected number of affected \
 
1639
     sql_print_error("Slave: did not get the expected number of affected \
1489
1640
     rows running query from master - expected %d, got %d (this numbers \
1490
1641
     should have matched modulo 4294967296).", 0, ...);
1491
1642
     session->query_error = 1;
1498
1649
int Query_log_event::do_apply_event(Relay_log_info const *rli,
1499
1650
                                      const char *query_arg, uint32_t q_len_arg)
1500
1651
{
 
1652
  LEX_STRING new_db;
1501
1653
  int expected_error,actual_error= 0;
1502
1654
  Query_id &query_id= Query_id::get_query_id();
1503
1655
  /*
1508
1660
    you.
1509
1661
  */
1510
1662
  session->catalog= catalog_len ? (char *) catalog : (char *)"";
1511
 
  session->set_db(db, strlen(db));       /* allocates a copy of 'db' */
 
1663
  new_db.length= db_len;
 
1664
  new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
 
1665
  session->set_db(new_db.str, new_db.length);       /* allocates a copy of 'db' */
1512
1666
  session->variables.auto_increment_increment= auto_increment_increment;
1513
1667
  session->variables.auto_increment_offset=    auto_increment_offset;
1514
1668
 
1537
1691
            ::do_apply_event(), then the companion SET also have so
1538
1692
            we don't need to reset_one_shot_variables().
1539
1693
  */
1540
 
  if (1)
 
1694
  if (rpl_filter->db_ok(session->db))
1541
1695
  {
1542
1696
    session->set_time((time_t)when);
1543
1697
    session->query_length= q_len_arg;
1591
1745
      }
1592
1746
      else
1593
1747
        session->variables.collation_database= session->db_charset;
1594
 
 
 
1748
      
1595
1749
      /* Execute the query (note that we bypass dispatch_command()) */
1596
1750
      const char* found_semicolon= NULL;
1597
1751
      mysql_parse(session, session->query, session->query_length, &found_semicolon);
1648
1802
      session->is_slave_error= 1;
1649
1803
    }
1650
1804
    /*
1651
 
      If we get the same error code as expected, or they should be ignored.
 
1805
      If we get the same error code as expected, or they should be ignored. 
1652
1806
    */
1653
1807
    else if (expected_error == actual_error ||
1654
1808
             ignored_error_code(actual_error))
1674
1828
      like:
1675
1829
      if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1676
1830
      {
1677
 
      errmsg_printf(ERRMSG_LVL_ERROR, "Slave: did not get the expected number of affected \
 
1831
      sql_print_error("Slave: did not get the expected number of affected \
1678
1832
      rows running query from master - expected %d, got %d (this numbers \
1679
1833
      should have matched modulo 4294967296).", 0, ...);
1680
1834
      session->is_slave_error = 1;
1698
1852
    Probably we have set session->query, session->db, session->catalog to point to places
1699
1853
    in the data_buf of this event. Now the event is going to be deleted
1700
1854
    probably, so data_buf will be freed, so the session->... listed above will be
1701
 
    pointers to freed memory.
 
1855
    pointers to freed memory. 
1702
1856
    So we must set them to 0, so that those bad pointers values are not later
1703
1857
    used. Note that "cleanup" queries like automatic DROP TEMPORARY Table
1704
1858
    don't suffer from these assignments to 0 as DROP TEMPORARY
1709
1863
  session->query= 0;                    // just to be sure
1710
1864
  session->query_length= 0;
1711
1865
  pthread_mutex_unlock(&LOCK_thread_count);
1712
 
  close_thread_tables(session);
 
1866
  close_thread_tables(session);      
 
1867
  /*
 
1868
    As a disk space optimization, future masters will not log an event for
 
1869
    LAST_INSERT_ID() if that function returned 0 (and thus they will be able
 
1870
    to replace the Session::stmt_depends_on_first_successful_insert_id_in_prev_stmt
 
1871
    variable by (Session->first_successful_insert_id_in_prev_stmt > 0) ; with the
 
1872
    resetting below we are ready to support that.
 
1873
  */
 
1874
  session->first_successful_insert_id_in_prev_stmt_for_binlog= 0;
1713
1875
  session->first_successful_insert_id_in_prev_stmt= 0;
 
1876
  session->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
1714
1877
  free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
1715
1878
  return session->is_slave_error;
1716
1879
}
1717
1880
 
1718
1881
int Query_log_event::do_update_pos(Relay_log_info *rli)
1719
1882
{
1720
 
  return Log_event::do_update_pos(rli);
 
1883
  /*
 
1884
    Note that we will not increment group* positions if we are just
 
1885
    after a SET ONE_SHOT, because SET ONE_SHOT should not be separated
 
1886
    from its following updating query.
 
1887
  */
 
1888
  if (session->one_shot_set)
 
1889
  {
 
1890
    rli->inc_event_relay_log_pos();
 
1891
    return 0;
 
1892
  }
 
1893
  else
 
1894
    return Log_event::do_update_pos(rli);
1721
1895
}
1722
1896
 
1723
1897
 
1762
1936
void Start_log_event_v3::pack_info(Protocol *protocol)
1763
1937
{
1764
1938
  char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
1765
 
  pos= strcpy(buf, "Server ver: ")+12;
1766
 
  pos= strcpy(pos, server_version)+strlen(server_version);
1767
 
  pos= strcpy(pos, ", Binlog ver: ")+14;
 
1939
  pos= my_stpcpy(buf, "Server ver: ");
 
1940
  pos= my_stpcpy(pos, server_version);
 
1941
  pos= my_stpcpy(pos, ", Binlog ver: ");
1768
1942
  pos= int10_to_str(binlog_version, pos, 10);
1769
1943
  protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
1770
1944
}
1894
2068
*/
1895
2069
 
1896
2070
Format_description_log_event::
1897
 
Format_description_log_event(uint8_t binlog_ver, const char*)
 
2071
Format_description_log_event(uint8_t binlog_ver, const char* server_ver)
1898
2072
  :Start_log_event_v3(), event_type_permutation(0)
1899
2073
{
1900
2074
  binlog_version= binlog_ver;
1903
2077
    memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
1904
2078
    common_header_len= LOG_EVENT_HEADER_LEN;
1905
2079
    number_of_event_types= LOG_EVENT_TYPES;
1906
 
    /* we'll catch malloc() error in is_valid() */
1907
 
    post_header_len=(uint8_t*) malloc(number_of_event_types*sizeof(uint8_t));
1908
 
    memset(post_header_len, 0, number_of_event_types*sizeof(uint8_t));
 
2080
    /* we'll catch my_malloc() error in is_valid() */
 
2081
    post_header_len=(uint8_t*) my_malloc(number_of_event_types*sizeof(uint8_t),
 
2082
                                       MYF(MY_ZEROFILL));
1909
2083
    /*
1910
2084
      This long list of assignments is not beautiful, but I see no way to
1911
2085
      make it nicer, as the right members are #defines, not array members, so
1934
2108
    }
1935
2109
    break;
1936
2110
 
 
2111
  case 1: /* 3.23 */
 
2112
  case 3: /* 4.0.x x>=2 */
 
2113
    /*
 
2114
      We build an artificial (i.e. not sent by the master) event, which
 
2115
      describes what those old master versions send.
 
2116
    */
 
2117
    if (binlog_ver==1)
 
2118
      my_stpcpy(server_version, server_ver ? server_ver : "3.23");
 
2119
    else
 
2120
      my_stpcpy(server_version, server_ver ? server_ver : "4.0");
 
2121
    common_header_len= binlog_ver==1 ? OLD_HEADER_LEN :
 
2122
      LOG_EVENT_MINIMAL_HEADER_LEN;
 
2123
    /*
 
2124
      The first new event in binlog version 4 is Format_desc. So any event type
 
2125
      after that does not exist in older versions. We use the events known by
 
2126
      version 3, even if version 1 had only a subset of them (this is not a
 
2127
      problem: it uses a few bytes for nothing but unifies code; it does not
 
2128
      make the slave detect less corruptions).
 
2129
    */
 
2130
    number_of_event_types= FORMAT_DESCRIPTION_EVENT - 1;
 
2131
    post_header_len=(uint8_t*) my_malloc(number_of_event_types*sizeof(uint8_t),
 
2132
                                       MYF(0));
 
2133
    if (post_header_len)
 
2134
    {
 
2135
      post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
 
2136
      post_header_len[QUERY_EVENT-1]= QUERY_HEADER_MINIMAL_LEN;
 
2137
      post_header_len[STOP_EVENT-1]= 0;
 
2138
      post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN;
 
2139
      post_header_len[INTVAR_EVENT-1]= 0;
 
2140
      post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
 
2141
      post_header_len[SLAVE_EVENT-1]= 0;
 
2142
      post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
 
2143
      post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
 
2144
      post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
 
2145
      post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
 
2146
      post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
 
2147
      post_header_len[RAND_EVENT-1]= 0;
 
2148
      post_header_len[USER_VAR_EVENT-1]= 0;
 
2149
    }
 
2150
    break;
1937
2151
  default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
1938
 
    assert(0);
 
2152
    post_header_len= 0; /* will make is_valid() fail */
 
2153
    break;
1939
2154
  }
1940
2155
  calc_server_version_split();
1941
2156
}
1972
2187
    return; /* sanity check */
1973
2188
  number_of_event_types=
1974
2189
    event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1);
1975
 
  post_header_len= (uint8_t*) malloc(number_of_event_types*
1976
 
                                     sizeof(*post_header_len));
1977
2190
  /* If alloc fails, we'll detect it in is_valid() */
1978
 
  if (post_header_len != NULL)
1979
 
    memcpy(post_header_len, buf+ST_COMMON_HEADER_LEN_OFFSET+1,
1980
 
           number_of_event_types* sizeof(*post_header_len));
 
2191
  post_header_len= (uint8_t*) my_memdup((unsigned char*)buf+ST_COMMON_HEADER_LEN_OFFSET+1,
 
2192
                                      number_of_event_types*
 
2193
                                      sizeof(*post_header_len), MYF(0));
1981
2194
  calc_server_version_split();
1982
2195
 
1983
2196
  /*
2057
2270
    static const uint8_t perm[23]=
2058
2271
      {
2059
2272
        UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
2060
 
        LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
 
2273
        INTVAR_EVENT, LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
2061
2274
        APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
2062
2275
        NEW_LOAD_EVENT,
 
2276
        RAND_EVENT, USER_VAR_EVENT,
2063
2277
        FORMAT_DESCRIPTION_EVENT,
2064
2278
        TABLE_MAP_EVENT,
 
2279
        PRE_GA_WRITE_ROWS_EVENT,
 
2280
        PRE_GA_UPDATE_ROWS_EVENT,
 
2281
        PRE_GA_DELETE_ROWS_EVENT,
2065
2282
        XID_EVENT,
2066
2283
        BEGIN_LOAD_QUERY_EVENT,
2067
2284
        EXECUTE_LOAD_QUERY_EVENT,
2128
2345
    perform, we don't call Start_log_event_v3::do_apply_event()
2129
2346
    (this was just to update the log's description event).
2130
2347
  */
2131
 
  if (server_id != ::server_id)
 
2348
  if (server_id != (uint32_t) ::server_id)
2132
2349
  {
2133
2350
    /*
2134
2351
      If the event was not requested by the slave i.e. the master sent
2150
2367
  delete rli->relay_log.description_event_for_exec;
2151
2368
  rli->relay_log.description_event_for_exec= this;
2152
2369
 
2153
 
  if (server_id == ::server_id)
 
2370
  if (server_id == (uint32_t) ::server_id)
2154
2371
  {
2155
2372
    /*
2156
2373
      We only increase the relay log position if we are skipping
2175
2392
}
2176
2393
 
2177
2394
Log_event::enum_skip_reason
2178
 
Format_description_log_event::do_shall_skip(Relay_log_info *)
 
2395
Format_description_log_event::do_shall_skip(Relay_log_info *rli __attribute__((unused)))
2179
2396
{
2180
2397
  return Log_event::EVENT_SKIP_NOT;
2181
2398
}
2253
2470
 
2254
2471
  if (need_db && db && db_len)
2255
2472
  {
2256
 
    pos= strcpy(pos, "use `")+5;
 
2473
    pos= my_stpcpy(pos, "use `");
2257
2474
    memcpy(pos, db, db_len);
2258
 
    pos= strcpy(pos+db_len, "`; ")+3;
 
2475
    pos= my_stpcpy(pos+db_len, "`; ");
2259
2476
  }
2260
2477
 
2261
 
  pos= strcpy(pos, "LOAD DATA ")+10;
 
2478
  pos= my_stpcpy(pos, "LOAD DATA ");
2262
2479
 
2263
2480
  if (fn_start)
2264
2481
    *fn_start= pos;
2265
2482
 
2266
2483
  if (check_fname_outside_temp_buf())
2267
 
    pos= strcpy(pos, "LOCAL ")+6;
2268
 
  pos= strcpy(pos, "INFILE '")+8;
 
2484
    pos= my_stpcpy(pos, "LOCAL ");
 
2485
  pos= my_stpcpy(pos, "INFILE '");
2269
2486
  memcpy(pos, fname, fname_len);
2270
 
  pos= strcpy(pos+fname_len, "' ")+2;
 
2487
  pos= my_stpcpy(pos+fname_len, "' ");
2271
2488
 
2272
2489
  if (sql_ex.opt_flags & REPLACE_FLAG)
2273
 
    pos= strcpy(pos, " REPLACE ")+9;
 
2490
    pos= my_stpcpy(pos, " REPLACE ");
2274
2491
  else if (sql_ex.opt_flags & IGNORE_FLAG)
2275
 
    pos= strcpy(pos, " IGNORE ")+8;
 
2492
    pos= my_stpcpy(pos, " IGNORE ");
2276
2493
 
2277
 
  pos= strcpy(pos ,"INTO")+4;
 
2494
  pos= my_stpcpy(pos ,"INTO");
2278
2495
 
2279
2496
  if (fn_end)
2280
2497
    *fn_end= pos;
2281
2498
 
2282
 
  pos= strcpy(pos ," Table `")+8;
 
2499
  pos= my_stpcpy(pos ," Table `");
2283
2500
  memcpy(pos, table_name, table_name_len);
2284
2501
  pos+= table_name_len;
2285
2502
 
2286
2503
  /* We have to create all optinal fields as the default is not empty */
2287
 
  pos= strcpy(pos, "` FIELDS TERMINATED BY ")+23;
 
2504
  pos= my_stpcpy(pos, "` FIELDS TERMINATED BY ");
2288
2505
  pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len);
2289
2506
  if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
2290
 
    pos= strcpy(pos, " OPTIONALLY ")+12;
2291
 
  pos= strcpy(pos, " ENCLOSED BY ")+13;
 
2507
    pos= my_stpcpy(pos, " OPTIONALLY ");
 
2508
  pos= my_stpcpy(pos, " ENCLOSED BY ");
2292
2509
  pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len);
2293
2510
 
2294
 
  pos= strcpy(pos, " ESCAPED BY ")+12;
 
2511
  pos= my_stpcpy(pos, " ESCAPED BY ");
2295
2512
  pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
2296
2513
 
2297
 
  pos= strcpy(pos, " LINES TERMINATED BY ")+21;
 
2514
  pos= my_stpcpy(pos, " LINES TERMINATED BY ");
2298
2515
  pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
2299
2516
  if (sql_ex.line_start_len)
2300
2517
  {
2301
 
    pos= strcpy(pos, " STARTING BY ")+13;
 
2518
    pos= my_stpcpy(pos, " STARTING BY ");
2302
2519
    pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
2303
2520
  }
2304
2521
 
2305
2522
  if ((long) skip_lines > 0)
2306
2523
  {
2307
 
    pos= strcpy(pos, " IGNORE ")+8;
 
2524
    pos= my_stpcpy(pos, " IGNORE ");
2308
2525
    pos= int64_t10_to_str((int64_t) skip_lines, pos, 10);
2309
 
    pos= strcpy(pos," LINES ")+7;
 
2526
    pos= my_stpcpy(pos," LINES ");    
2310
2527
  }
2311
2528
 
2312
2529
  if (num_fields)
2313
2530
  {
2314
2531
    uint32_t i;
2315
2532
    const char *field= fields;
2316
 
    pos= strcpy(pos, " (")+2;
 
2533
    pos= my_stpcpy(pos, " (");
2317
2534
    for (i = 0; i < num_fields; i++)
2318
2535
    {
2319
2536
      if (i)
2336
2553
{
2337
2554
  char *buf, *end;
2338
2555
 
2339
 
  if (!(buf= (char*) malloc(get_query_buffer_length())))
 
2556
  if (!(buf= (char*) my_malloc(get_query_buffer_length(), MYF(MY_WME))))
2340
2557
    return;
2341
2558
  print_query(true, buf, &end, 0, 0);
2342
2559
  protocol->store(buf, end-buf, &my_charset_bin);
2419
2636
  sql_ex.escaped_len = (uint8_t) ex->escaped->length();
2420
2637
  sql_ex.opt_flags = 0;
2421
2638
  sql_ex.cached_new_format = -1;
2422
 
 
 
2639
    
2423
2640
  if (ex->dumpfile)
2424
2641
    sql_ex.opt_flags|= DUMPFILE_FLAG;
2425
2642
  if (ex->opt_enclosed)
2433
2650
    break;
2434
2651
  case DUP_UPDATE:                              // Impossible here
2435
2652
  case DUP_ERROR:
2436
 
    break;
 
2653
    break;      
2437
2654
  }
2438
2655
  if (ignore)
2439
2656
    sql_ex.opt_flags|= IGNORE_FLAG;
2448
2665
    sql_ex.empty_flags |= LINE_START_EMPTY;
2449
2666
  if (!ex->escaped->length())
2450
2667
    sql_ex.empty_flags |= ESCAPED_EMPTY;
2451
 
 
 
2668
    
2452
2669
  skip_lines = ex->skip_lines;
2453
2670
 
2454
2671
  List_iterator<Item> li(fields_arg);
2487
2704
  if (event_len)
2488
2705
    copy_log_event(buf, event_len,
2489
2706
                   ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
2490
 
                    LOAD_HEADER_LEN +
 
2707
                    LOAD_HEADER_LEN + 
2491
2708
                    description_event->common_header_len :
2492
2709
                    LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
2493
2710
                   description_event);
2514
2731
  table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
2515
2732
  db_len = (uint)data_head[L_DB_LEN_OFFSET];
2516
2733
  num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
2517
 
 
 
2734
          
2518
2735
  if ((int) event_len < body_offset)
2519
2736
    return(1);
2520
2737
  /*
2525
2742
                                        buf_end,
2526
2743
                                        buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
2527
2744
    return(1);
2528
 
 
 
2745
  
2529
2746
  data_len = event_len - body_offset;
2530
2747
  if (num_fields > data_len) // simple sanity check against corruption
2531
2748
    return(1);
2547
2764
  Load_log_event::set_fields()
2548
2765
 
2549
2766
  @note
2550
 
    This function can not use the member variable
 
2767
    This function can not use the member variable 
2551
2768
    for the database, since LOAD DATA INFILE on the slave
2552
2769
    can be for a different database than the current one.
2553
2770
    This is the reason for the affected_db argument to this method.
2554
2771
*/
2555
2772
 
2556
 
void Load_log_event::set_fields(const char* affected_db,
 
2773
void Load_log_event::set_fields(const char* affected_db, 
2557
2774
                                List<Item> &field_list,
2558
2775
                                Name_resolution_context *context)
2559
2776
{
2600
2817
int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
2601
2818
                                   bool use_rli_only_for_errors)
2602
2819
{
 
2820
  LEX_STRING new_db;
2603
2821
  Query_id &query_id= Query_id::get_query_id();
2604
 
  session->set_db(db, strlen(db));
 
2822
  new_db.length= db_len;
 
2823
  new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
 
2824
  session->set_db(new_db.str, new_db.length);
2605
2825
  assert(session->query == 0);
2606
2826
  session->query_length= 0;                         // Should not be needed
2607
2827
  session->is_slave_error= 0;
2614
2834
    as the present method does not call mysql_parse().
2615
2835
  */
2616
2836
  lex_start(session);
2617
 
  session->reset_for_next_command();
 
2837
  mysql_reset_session_for_next_command(session);
2618
2838
 
2619
2839
  if (!use_rli_only_for_errors)
2620
2840
  {
2624
2844
    */
2625
2845
    const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
2626
2846
  }
2627
 
 
 
2847
 
2628
2848
   /*
2629
2849
    We test replicate_*_db rules. Note that we have already prepared
2630
2850
    the file to load, even if we are going to ignore and delete it
2648
2868
            ::do_apply_event(), then the companion SET also have so
2649
2869
            we don't need to reset_one_shot_variables().
2650
2870
  */
2651
 
  if (1)
 
2871
  if (rpl_filter->db_ok(session->db))
2652
2872
  {
2653
2873
    session->set_time((time_t)when);
2654
2874
    session->query_id = query_id.next();
2667
2887
    tables.lock_type = TL_WRITE;
2668
2888
    tables.updating= 1;
2669
2889
 
2670
 
    // the table will be opened in mysql_load
 
2890
    // the table will be opened in mysql_load    
 
2891
    if (rpl_filter->is_on() && !rpl_filter->tables_ok(session->db, &tables))
 
2892
    {
 
2893
      // TODO: this is a bug - this needs to be moved to the I/O thread
 
2894
      if (net)
 
2895
        skip_load_data_infile(net);
 
2896
    }
 
2897
    else
2671
2898
    {
2672
2899
      char llbuff[22];
2673
2900
      char *end;
2696
2923
 
2697
2924
      if (sql_ex.opt_flags & REPLACE_FLAG)
2698
2925
      {
2699
 
        handle_dup= DUP_REPLACE;
 
2926
        handle_dup= DUP_REPLACE;
2700
2927
      }
2701
2928
      else if (sql_ex.opt_flags & IGNORE_FLAG)
2702
2929
      {
2706
2933
      else
2707
2934
      {
2708
2935
        /*
2709
 
          When replication is running fine, if it was DUP_ERROR on the
 
2936
          When replication is running fine, if it was DUP_ERROR on the
2710
2937
          master then we could choose IGNORE here, because if DUP_ERROR
2711
2938
          suceeded on master, and data is identical on the master and slave,
2712
2939
          then there should be no uniqueness errors on slave, so IGNORE is
2713
2940
          the same as DUP_ERROR. But in the unlikely case of uniqueness errors
2714
2941
          (because the data on the master and slave happen to be different
2715
 
          (user error or bug), we want LOAD DATA to print an error message on
2716
 
          the slave to discover the problem.
 
2942
          (user error or bug), we want LOAD DATA to print an error message on
 
2943
          the slave to discover the problem.
2717
2944
 
2718
2945
          If reading from net (a 3.23 master), mysql_load() will change this
2719
2946
          to IGNORE.
2732
2959
      session->lex->duplicates= handle_dup;
2733
2960
 
2734
2961
      sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
2735
 
      String field_term(sql_ex.field_term,sql_ex.field_term_len,&my_charset_utf8_general_ci);
2736
 
      String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,&my_charset_utf8_general_ci);
2737
 
      String line_term(sql_ex.line_term,sql_ex.line_term_len,&my_charset_utf8_general_ci);
2738
 
      String line_start(sql_ex.line_start,sql_ex.line_start_len,&my_charset_utf8_general_ci);
2739
 
      String escaped(sql_ex.escaped,sql_ex.escaped_len, &my_charset_utf8_general_ci);
 
2962
      String field_term(sql_ex.field_term,sql_ex.field_term_len,log_cs);
 
2963
      String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,log_cs);
 
2964
      String line_term(sql_ex.line_term,sql_ex.line_term_len,log_cs);
 
2965
      String line_start(sql_ex.line_start,sql_ex.line_start_len,log_cs);
 
2966
      String escaped(sql_ex.escaped,sql_ex.escaped_len, log_cs);
2740
2967
      ex.field_term= &field_term;
2741
2968
      ex.enclosed= &enclosed;
2742
2969
      ex.line_term= &line_term;
2745
2972
 
2746
2973
      ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
2747
2974
      if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
2748
 
        ex.field_term->length(0);
 
2975
        ex.field_term->length(0);
2749
2976
 
2750
2977
      ex.skip_lines = skip_lines;
2751
2978
      List<Item> field_list;
2754
2981
      session->variables.pseudo_thread_id= thread_id;
2755
2982
      if (net)
2756
2983
      {
2757
 
        // mysql_load will use session->net to read the file
2758
 
        session->net.vio = net->vio;
2759
 
        /*
2760
 
          Make sure the client does not get confused about the packet sequence
2761
 
        */
2762
 
        session->net.pkt_nr = net->pkt_nr;
 
2984
        // mysql_load will use session->net to read the file
 
2985
        session->net.vio = net->vio;
 
2986
        /*
 
2987
          Make sure the client does not get confused about the packet sequence
 
2988
        */
 
2989
        session->net.pkt_nr = net->pkt_nr;
2763
2990
      }
2764
2991
      /*
2765
2992
        It is safe to use tmp_list twice because we are not going to
2771
2998
        session->is_slave_error= 1;
2772
2999
      if (session->cuted_fields)
2773
3000
      {
2774
 
        /* log_pos is the position of the LOAD event in the master log */
2775
 
        errmsg_printf(ERRMSG_LVL_WARN, _("Slave: load data infile on table '%s' at "
2776
 
                            "log position %s in log '%s' produced %ld "
2777
 
                            "warning(s). Default database: '%s'"),
 
3001
        /* log_pos is the position of the LOAD event in the master log */
 
3002
        sql_print_warning(_("Slave: load data infile on table '%s' at "
 
3003
                          "log position %s in log '%s' produced %ld "
 
3004
                          "warning(s). Default database: '%s'"),
2778
3005
                          (char*) table_name,
2779
 
                          llstr(log_pos,llbuff), RPL_LOG_NAME,
 
3006
                          llstr(log_pos,llbuff), RPL_LOG_NAME, 
2780
3007
                          (ulong) session->cuted_fields,
2781
3008
                          print_slave_db_safe(session->db));
2782
3009
      }
2796
3023
  }
2797
3024
 
2798
3025
error:
2799
 
  session->net.vio = 0;
 
3026
  session->net.vio = 0; 
2800
3027
  const char *remember_db= session->db;
2801
3028
  pthread_mutex_lock(&LOCK_thread_count);
2802
3029
  session->catalog= 0;
2859
3086
void Rotate_log_event::pack_info(Protocol *protocol)
2860
3087
{
2861
3088
  char buf1[256], buf[22];
2862
 
  String tmp(buf1, sizeof(buf1), &my_charset_utf8_general_ci);
 
3089
  String tmp(buf1, sizeof(buf1), log_cs);
2863
3090
  tmp.length(0);
2864
 
  tmp.append(new_log_ident.c_str(), ident_len);
 
3091
  tmp.append(new_log_ident, ident_len);
2865
3092
  tmp.append(STRING_WITH_LEN(";pos="));
2866
3093
  tmp.append(llstr(pos,buf));
2867
3094
  protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
2876
3103
Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
2877
3104
                                   uint32_t ident_len_arg, uint64_t pos_arg,
2878
3105
                                   uint32_t flags_arg)
2879
 
  :Log_event(), pos(pos_arg),
2880
 
   ident_len(ident_len_arg
2881
 
               ? ident_len_arg
2882
 
               : strlen(new_log_ident_arg)),
2883
 
   flags(flags_arg)
 
3106
  :Log_event(), new_log_ident(new_log_ident_arg),
 
3107
   pos(pos_arg),ident_len(ident_len_arg ? ident_len_arg :
 
3108
                          (uint) strlen(new_log_ident_arg)), flags(flags_arg)
2884
3109
{
2885
 
  new_log_ident.assign(new_log_ident_arg, ident_len);
 
3110
  if (flags & DUP_NAME)
 
3111
    new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME));
2886
3112
  return;
2887
3113
}
2888
3114
 
2889
3115
 
2890
3116
Rotate_log_event::Rotate_log_event(const char* buf, uint32_t event_len,
2891
3117
                                   const Format_description_log_event* description_event)
2892
 
  :Log_event(buf, description_event), flags(DUP_NAME)
 
3118
  :Log_event(buf, description_event) ,new_log_ident(0), flags(DUP_NAME)
2893
3119
{
2894
3120
  // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
2895
3121
  uint8_t header_size= description_event->common_header_len;
2900
3126
  buf += header_size;
2901
3127
  pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
2902
3128
  ident_len = (uint)(event_len -
2903
 
                     (header_size+post_header_len));
2904
 
  ident_offset = post_header_len;
 
3129
                     (header_size+post_header_len)); 
 
3130
  ident_offset = post_header_len; 
2905
3131
  set_if_smaller(ident_len,FN_REFLEN-1);
2906
 
  new_log_ident.assign(buf + ident_offset, ident_len);
 
3132
  new_log_ident= my_strndup(buf + ident_offset, (uint) ident_len, MYF(MY_WME));
2907
3133
  return;
2908
3134
}
2909
3135
 
2918
3144
  int8store(buf + R_POS_OFFSET, pos);
2919
3145
  return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
2920
3146
          my_b_safe_write(file, (unsigned char*)buf, ROTATE_HEADER_LEN) ||
2921
 
          my_b_safe_write(file, (const unsigned char*)new_log_ident.c_str(),
2922
 
                          (uint) ident_len));
 
3147
          my_b_safe_write(file, (unsigned char*)new_log_ident, (uint) ident_len));
2923
3148
}
2924
3149
 
2925
3150
 
2959
3184
  if ((server_id != ::server_id || rli->replicate_same_server_id) &&
2960
3185
      !rli->is_in_group())
2961
3186
  {
2962
 
    rli->group_master_log_name.assign(new_log_ident);
 
3187
    rli->group_master_log_name.assign(new_log_ident, ident_len+1);
2963
3188
    rli->notify_group_master_log_name_update();
2964
3189
    rli->group_master_log_pos= pos;
2965
3190
    rli->group_relay_log_name.assign(rli->event_relay_log_name);
3003
3228
 
3004
3229
 
3005
3230
/**************************************************************************
 
3231
        Intvar_log_event methods
 
3232
**************************************************************************/
 
3233
 
 
3234
/*
 
3235
  Intvar_log_event::pack_info()
 
3236
*/
 
3237
 
 
3238
void Intvar_log_event::pack_info(Protocol *protocol)
 
3239
{
 
3240
  char buf[256], *pos;
 
3241
  pos= strmake(buf, get_var_type_name(), sizeof(buf)-23);
 
3242
  *pos++= '=';
 
3243
  pos= int64_t10_to_str(val, pos, -10);
 
3244
  protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
 
3245
}
 
3246
 
 
3247
 
 
3248
/*
 
3249
  Intvar_log_event::Intvar_log_event()
 
3250
*/
 
3251
 
 
3252
Intvar_log_event::Intvar_log_event(const char* buf,
 
3253
                                   const Format_description_log_event* description_event)
 
3254
  :Log_event(buf, description_event)
 
3255
{
 
3256
  buf+= description_event->common_header_len;
 
3257
  type= buf[I_TYPE_OFFSET];
 
3258
  val= uint8korr(buf+I_VAL_OFFSET);
 
3259
}
 
3260
 
 
3261
 
 
3262
/*
 
3263
  Intvar_log_event::get_var_type_name()
 
3264
*/
 
3265
 
 
3266
const char* Intvar_log_event::get_var_type_name()
 
3267
{
 
3268
  switch(type) {
 
3269
  case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
 
3270
  case INSERT_ID_EVENT: return "INSERT_ID";
 
3271
  default: /* impossible */ return "UNKNOWN";
 
3272
  }
 
3273
}
 
3274
 
 
3275
 
 
3276
/*
 
3277
  Intvar_log_event::write()
 
3278
*/
 
3279
 
 
3280
bool Intvar_log_event::write(IO_CACHE* file)
 
3281
{
 
3282
  unsigned char buf[9];
 
3283
  buf[I_TYPE_OFFSET]= (unsigned char) type;
 
3284
  int8store(buf + I_VAL_OFFSET, val);
 
3285
  return (write_header(file, sizeof(buf)) ||
 
3286
          my_b_safe_write(file, buf, sizeof(buf)));
 
3287
}
 
3288
 
 
3289
 
 
3290
/*
 
3291
  Intvar_log_event::print()
 
3292
*/
 
3293
 
 
3294
/*
 
3295
  Intvar_log_event::do_apply_event()
 
3296
*/
 
3297
 
 
3298
int Intvar_log_event::do_apply_event(Relay_log_info const *rli)
 
3299
{
 
3300
  /*
 
3301
    We are now in a statement until the associated query log event has
 
3302
    been processed.
 
3303
   */
 
3304
  const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
 
3305
 
 
3306
  switch (type) {
 
3307
  case LAST_INSERT_ID_EVENT:
 
3308
    session->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1;
 
3309
    session->first_successful_insert_id_in_prev_stmt= val;
 
3310
    break;
 
3311
  case INSERT_ID_EVENT:
 
3312
    session->force_one_auto_inc_interval(val);
 
3313
    break;
 
3314
  }
 
3315
  return 0;
 
3316
}
 
3317
 
 
3318
int Intvar_log_event::do_update_pos(Relay_log_info *rli)
 
3319
{
 
3320
  rli->inc_event_relay_log_pos();
 
3321
  return 0;
 
3322
}
 
3323
 
 
3324
 
 
3325
Log_event::enum_skip_reason
 
3326
Intvar_log_event::do_shall_skip(Relay_log_info *rli)
 
3327
{
 
3328
  /*
 
3329
    It is a common error to set the slave skip counter to 1 instead of
 
3330
    2 when recovering from an insert which used a auto increment,
 
3331
    rand, or user var.  Therefore, if the slave skip counter is 1, we
 
3332
    just say that this event should be skipped by ignoring it, meaning
 
3333
    that we do not change the value of the slave skip counter since it
 
3334
    will be decreased by the following insert event.
 
3335
  */
 
3336
  return continue_group(rli);
 
3337
}
 
3338
 
 
3339
 
 
3340
/**************************************************************************
 
3341
  Rand_log_event methods
 
3342
**************************************************************************/
 
3343
 
 
3344
void Rand_log_event::pack_info(Protocol *protocol)
 
3345
{
 
3346
  char buf1[256], *pos;
 
3347
  pos= my_stpcpy(buf1,"rand_seed1=");
 
3348
  pos= int10_to_str((long) seed1, pos, 10);
 
3349
  pos= my_stpcpy(pos, ",rand_seed2=");
 
3350
  pos= int10_to_str((long) seed2, pos, 10);
 
3351
  protocol->store(buf1, (uint) (pos-buf1), &my_charset_bin);
 
3352
}
 
3353
 
 
3354
 
 
3355
Rand_log_event::Rand_log_event(const char* buf,
 
3356
                               const Format_description_log_event* description_event)
 
3357
  :Log_event(buf, description_event)
 
3358
{
 
3359
  buf+= description_event->common_header_len;
 
3360
  seed1= uint8korr(buf+RAND_SEED1_OFFSET);
 
3361
  seed2= uint8korr(buf+RAND_SEED2_OFFSET);
 
3362
}
 
3363
 
 
3364
 
 
3365
bool Rand_log_event::write(IO_CACHE* file)
 
3366
{
 
3367
  unsigned char buf[16];
 
3368
  int8store(buf + RAND_SEED1_OFFSET, seed1);
 
3369
  int8store(buf + RAND_SEED2_OFFSET, seed2);
 
3370
  return (write_header(file, sizeof(buf)) ||
 
3371
          my_b_safe_write(file, buf, sizeof(buf)));
 
3372
}
 
3373
 
 
3374
 
 
3375
int Rand_log_event::do_apply_event(Relay_log_info const *rli)
 
3376
{
 
3377
  /*
 
3378
    We are now in a statement until the associated query log event has
 
3379
    been processed.
 
3380
   */
 
3381
  const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
 
3382
 
 
3383
  session->rand.seed1= (ulong) seed1;
 
3384
  session->rand.seed2= (ulong) seed2;
 
3385
  return 0;
 
3386
}
 
3387
 
 
3388
int Rand_log_event::do_update_pos(Relay_log_info *rli)
 
3389
{
 
3390
  rli->inc_event_relay_log_pos();
 
3391
  return 0;
 
3392
}
 
3393
 
 
3394
 
 
3395
Log_event::enum_skip_reason
 
3396
Rand_log_event::do_shall_skip(Relay_log_info *rli)
 
3397
{
 
3398
  /*
 
3399
    It is a common error to set the slave skip counter to 1 instead of
 
3400
    2 when recovering from an insert which used a auto increment,
 
3401
    rand, or user var.  Therefore, if the slave skip counter is 1, we
 
3402
    just say that this event should be skipped by ignoring it, meaning
 
3403
    that we do not change the value of the slave skip counter since it
 
3404
    will be decreased by the following insert event.
 
3405
  */
 
3406
  return continue_group(rli);
 
3407
}
 
3408
 
 
3409
 
 
3410
/**************************************************************************
3006
3411
  Xid_log_event methods
3007
3412
**************************************************************************/
3008
3413
 
3009
3414
void Xid_log_event::pack_info(Protocol *protocol)
3010
3415
{
3011
3416
  char buf[128], *pos;
3012
 
  pos= strcpy(buf, "COMMIT /* xid=")+14;
 
3417
  pos= my_stpcpy(buf, "COMMIT /* xid=");
3013
3418
  pos= int64_t10_to_str(xid, pos, 10);
3014
 
  pos= strcpy(pos, " */")+3;
 
3419
  pos= my_stpcpy(pos, " */");
3015
3420
  protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3016
3421
}
3017
3422
 
3018
3423
/**
3019
3424
  @note
3020
3425
  It's ok not to use int8store here,
3021
 
  as long as XID::set(uint64_t) and
3022
 
  XID::get_my_xid doesn't do it either.
 
3426
  as long as xid_t::set(uint64_t) and
 
3427
  xid_t::get_my_xid doesn't do it either.
3023
3428
  We don't care about actual values of xids as long as
3024
3429
  identical numbers compare identically
3025
3430
*/
3041
3446
}
3042
3447
 
3043
3448
 
3044
 
int Xid_log_event::do_apply_event(const Relay_log_info *)
 
3449
int Xid_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
3045
3450
{
3046
3451
  return end_trans(session, COMMIT);
3047
3452
}
3058
3463
 
3059
3464
 
3060
3465
/**************************************************************************
 
3466
  User_var_log_event methods
 
3467
**************************************************************************/
 
3468
 
 
3469
void User_var_log_event::pack_info(Protocol* protocol)
 
3470
{
 
3471
  char *buf= 0;
 
3472
  uint32_t val_offset= 4 + name_len;
 
3473
  uint32_t event_len= val_offset;
 
3474
 
 
3475
  if (is_null)
 
3476
  {
 
3477
    if (!(buf= (char*) my_malloc(val_offset + 5, MYF(MY_WME))))
 
3478
      return;
 
3479
    my_stpcpy(buf + val_offset, "NULL");
 
3480
    event_len= val_offset + 4;
 
3481
  }
 
3482
  else
 
3483
  {
 
3484
    switch (type) {
 
3485
    case REAL_RESULT:
 
3486
      double real_val;
 
3487
      float8get(real_val, val);
 
3488
      if (!(buf= (char*) my_malloc(val_offset + MY_GCVT_MAX_FIELD_WIDTH + 1,
 
3489
                                   MYF(MY_WME))))
 
3490
        return;
 
3491
      event_len+= my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, MY_GCVT_MAX_FIELD_WIDTH,
 
3492
                          buf + val_offset, NULL);
 
3493
      break;
 
3494
    case INT_RESULT:
 
3495
      if (!(buf= (char*) my_malloc(val_offset + 22, MYF(MY_WME))))
 
3496
        return;
 
3497
      event_len= int64_t10_to_str(uint8korr(val), buf + val_offset,-10)-buf;
 
3498
      break;
 
3499
    case DECIMAL_RESULT:
 
3500
    {
 
3501
      if (!(buf= (char*) my_malloc(val_offset + DECIMAL_MAX_STR_LENGTH,
 
3502
                                   MYF(MY_WME))))
 
3503
        return;
 
3504
      String str(buf+val_offset, DECIMAL_MAX_STR_LENGTH, &my_charset_bin);
 
3505
      my_decimal dec;
 
3506
      binary2my_decimal(E_DEC_FATAL_ERROR, (unsigned char*) (val+2), &dec, val[0],
 
3507
                        val[1]);
 
3508
      my_decimal2string(E_DEC_FATAL_ERROR, &dec, 0, 0, 0, &str);
 
3509
      event_len= str.length() + val_offset;
 
3510
      break;
 
3511
    } 
 
3512
    case STRING_RESULT:
 
3513
      /* 15 is for 'COLLATE' and other chars */
 
3514
      buf= (char*) my_malloc(event_len+val_len*2+1+2*MY_CS_NAME_SIZE+15,
 
3515
                             MYF(MY_WME));
 
3516
      const CHARSET_INFO *cs;
 
3517
      if (!buf)
 
3518
        return;
 
3519
      if (!(cs= get_charset(charset_number, MYF(0))))
 
3520
      {
 
3521
        my_stpcpy(buf+val_offset, "???");
 
3522
        event_len+= 3;
 
3523
      }
 
3524
      else
 
3525
      {
 
3526
        char *p= strxmov(buf + val_offset, "_", cs->csname, " ", NULL);
 
3527
        p= str_to_hex(p, val, val_len);
 
3528
        p= strxmov(p, " COLLATE ", cs->name, NULL);
 
3529
        event_len= p-buf;
 
3530
      }
 
3531
      break;
 
3532
    case ROW_RESULT:
 
3533
    default:
 
3534
      assert(1);
 
3535
      return;
 
3536
    }
 
3537
  }
 
3538
  buf[0]= '@';
 
3539
  buf[1]= '`';
 
3540
  memcpy(buf+2, name, name_len);
 
3541
  buf[2+name_len]= '`';
 
3542
  buf[3+name_len]= '=';
 
3543
  protocol->store(buf, event_len, &my_charset_bin);
 
3544
  free(buf);
 
3545
}
 
3546
 
 
3547
 
 
3548
User_var_log_event::
 
3549
User_var_log_event(const char* buf,
 
3550
                   const Format_description_log_event* description_event)
 
3551
  :Log_event(buf, description_event)
 
3552
{
 
3553
  buf+= description_event->common_header_len;
 
3554
  name_len= uint4korr(buf);
 
3555
  name= (char *) buf + UV_NAME_LEN_SIZE;
 
3556
  buf+= UV_NAME_LEN_SIZE + name_len;
 
3557
  is_null= (bool) *buf;
 
3558
  if (is_null)
 
3559
  {
 
3560
    type= STRING_RESULT;
 
3561
    charset_number= my_charset_bin.number;
 
3562
    val_len= 0;
 
3563
    val= 0;  
 
3564
  }
 
3565
  else
 
3566
  {
 
3567
    type= (Item_result) buf[UV_VAL_IS_NULL];
 
3568
    charset_number= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE);
 
3569
    val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + 
 
3570
                       UV_CHARSET_NUMBER_SIZE);
 
3571
    val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
 
3572
                   UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE);
 
3573
  }
 
3574
}
 
3575
 
 
3576
 
 
3577
bool User_var_log_event::write(IO_CACHE* file)
 
3578
{
 
3579
  char buf[UV_NAME_LEN_SIZE];
 
3580
  char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + 
 
3581
            UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
 
3582
  unsigned char buf2[(8 > DECIMAL_MAX_FIELD_SIZE + 2) ? 8 : DECIMAL_MAX_FIELD_SIZE +2], *pos= buf2;
 
3583
  uint32_t buf1_length;
 
3584
  ulong event_length;
 
3585
 
 
3586
  int4store(buf, name_len);
 
3587
  
 
3588
  if ((buf1[0]= is_null))
 
3589
  {
 
3590
    buf1_length= 1;
 
3591
    val_len= 0;                                 // Length of 'pos'
 
3592
  }    
 
3593
  else
 
3594
  {
 
3595
    buf1[1]= type;
 
3596
    int4store(buf1 + 2, charset_number);
 
3597
 
 
3598
    switch (type) {
 
3599
    case REAL_RESULT:
 
3600
      float8store(buf2, *(double*) val);
 
3601
      break;
 
3602
    case INT_RESULT:
 
3603
      int8store(buf2, *(int64_t*) val);
 
3604
      break;
 
3605
    case DECIMAL_RESULT:
 
3606
    {
 
3607
      my_decimal *dec= (my_decimal *)val;
 
3608
      dec->fix_buffer_pointer();
 
3609
      buf2[0]= (char)(dec->intg + dec->frac);
 
3610
      buf2[1]= (char)dec->frac;
 
3611
      decimal2bin((decimal_t*)val, buf2+2, buf2[0], buf2[1]);
 
3612
      val_len= decimal_bin_size(buf2[0], buf2[1]) + 2;
 
3613
      break;
 
3614
    }
 
3615
    case STRING_RESULT:
 
3616
      pos= (unsigned char*) val;
 
3617
      break;
 
3618
    case ROW_RESULT:
 
3619
    default:
 
3620
      assert(1);
 
3621
      return 0;
 
3622
    }
 
3623
    int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
 
3624
    buf1_length= 10;
 
3625
  }
 
3626
 
 
3627
  /* Length of the whole event */
 
3628
  event_length= sizeof(buf)+ name_len + buf1_length + val_len;
 
3629
 
 
3630
  return (write_header(file, event_length) ||
 
3631
          my_b_safe_write(file, (unsigned char*) buf, sizeof(buf))   ||
 
3632
          my_b_safe_write(file, (unsigned char*) name, name_len)     ||
 
3633
          my_b_safe_write(file, (unsigned char*) buf1, buf1_length) ||
 
3634
          my_b_safe_write(file, pos, val_len));
 
3635
}
 
3636
 
 
3637
 
 
3638
 
 
3639
/*
 
3640
  User_var_log_event::do_apply_event()
 
3641
*/
 
3642
 
 
3643
int User_var_log_event::do_apply_event(Relay_log_info const *rli)
 
3644
{
 
3645
  Item *it= 0;
 
3646
  const CHARSET_INFO *charset;
 
3647
  if (!(charset= get_charset(charset_number, MYF(MY_WME))))
 
3648
    return 1;
 
3649
  LEX_STRING user_var_name;
 
3650
  user_var_name.str= name;
 
3651
  user_var_name.length= name_len;
 
3652
  double real_val;
 
3653
  int64_t int_val;
 
3654
 
 
3655
  /*
 
3656
    We are now in a statement until the associated query log event has
 
3657
    been processed.
 
3658
   */
 
3659
  const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
 
3660
 
 
3661
  if (is_null)
 
3662
  {
 
3663
    it= new Item_null();
 
3664
  }
 
3665
  else
 
3666
  {
 
3667
    switch (type) {
 
3668
    case REAL_RESULT:
 
3669
      float8get(real_val, val);
 
3670
      it= new Item_float(real_val, 0);
 
3671
      val= (char*) &real_val;           // Pointer to value in native format
 
3672
      val_len= 8;
 
3673
      break;
 
3674
    case INT_RESULT:
 
3675
      int_val= (int64_t) uint8korr(val);
 
3676
      it= new Item_int(int_val);
 
3677
      val= (char*) &int_val;            // Pointer to value in native format
 
3678
      val_len= 8;
 
3679
      break;
 
3680
    case DECIMAL_RESULT:
 
3681
    {
 
3682
      Item_decimal *dec= new Item_decimal((unsigned char*) val+2, val[0], val[1]);
 
3683
      it= dec;
 
3684
      val= (char *)dec->val_decimal(NULL);
 
3685
      val_len= sizeof(my_decimal);
 
3686
      break;
 
3687
    }
 
3688
    case STRING_RESULT:
 
3689
      it= new Item_string(val, val_len, charset);
 
3690
      break;
 
3691
    case ROW_RESULT:
 
3692
    default:
 
3693
      assert(1);
 
3694
      return 0;
 
3695
    }
 
3696
  }
 
3697
  Item_func_set_user_var e(user_var_name, it);
 
3698
  /*
 
3699
    Item_func_set_user_var can't substitute something else on its place =>
 
3700
    0 can be passed as last argument (reference on item)
 
3701
  */
 
3702
  e.fix_fields(session, 0);
 
3703
  /*
 
3704
    A variable can just be considered as a table with
 
3705
    a single record and with a single column. Thus, like
 
3706
    a column value, it could always have IMPLICIT derivation.
 
3707
   */
 
3708
  e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0);
 
3709
  free_root(session->mem_root,0);
 
3710
 
 
3711
  return 0;
 
3712
}
 
3713
 
 
3714
int User_var_log_event::do_update_pos(Relay_log_info *rli)
 
3715
{
 
3716
  rli->inc_event_relay_log_pos();
 
3717
  return 0;
 
3718
}
 
3719
 
 
3720
Log_event::enum_skip_reason
 
3721
User_var_log_event::do_shall_skip(Relay_log_info *rli)
 
3722
{
 
3723
  /*
 
3724
    It is a common error to set the slave skip counter to 1 instead
 
3725
    of 2 when recovering from an insert which used a auto increment,
 
3726
    rand, or user var.  Therefore, if the slave skip counter is 1, we
 
3727
    just say that this event should be skipped by ignoring it, meaning
 
3728
    that we do not change the value of the slave skip counter since it
 
3729
    will be decreased by the following insert event.
 
3730
  */
 
3731
  return continue_group(rli);
 
3732
}
 
3733
 
 
3734
 
 
3735
/**************************************************************************
3061
3736
  Slave_log_event methods
3062
3737
**************************************************************************/
3063
3738
 
3064
3739
void Slave_log_event::pack_info(Protocol *protocol)
3065
3740
{
3066
 
  ostringstream stream;
3067
 
  stream << "host=" << master_host << ",port=" << master_port;
3068
 
  stream << ",log=" << master_log << ",pos=" << master_pos;
3069
 
 
3070
 
  protocol->store(stream.str().c_str(), stream.str().length(),
3071
 
                  &my_charset_bin);
 
3741
  char buf[256+HOSTNAME_LENGTH], *pos;
 
3742
  pos= my_stpcpy(buf, "host=");
 
3743
  pos= my_stpncpy(pos, master_host.c_str(), HOSTNAME_LENGTH);
 
3744
  pos= my_stpcpy(pos, ",port=");
 
3745
  pos= int10_to_str((long) master_port, pos, 10);
 
3746
  pos= my_stpcpy(pos, ",log=");
 
3747
  pos= my_stpcpy(pos, master_log.c_str());
 
3748
  pos= my_stpcpy(pos, ",pos=");
 
3749
  pos= int64_t10_to_str(master_pos, pos, 10);
 
3750
  protocol->store(buf, pos-buf, &my_charset_bin);
3072
3751
}
3073
3752
 
3074
3753
 
3088
3767
  pthread_mutex_lock(&mi->data_lock);
3089
3768
  pthread_mutex_lock(&rli->data_lock);
3090
3769
  // on OOM, just do not initialize the structure and print the error
3091
 
  if ((mem_pool = (char*)malloc(get_data_size() + 1)))
 
3770
  if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
 
3771
                                   MYF(MY_WME))))
3092
3772
  {
3093
3773
    master_host.assign(mi->getHostname());
3094
3774
    master_log.assign(rli->group_master_log_name);
3096
3776
    master_pos = rli->group_master_log_pos;
3097
3777
  }
3098
3778
  else
3099
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Out of memory while recording slave event"));
 
3779
    sql_print_error(_("Out of memory while recording slave event"));
3100
3780
  pthread_mutex_unlock(&rli->data_lock);
3101
3781
  pthread_mutex_unlock(&mi->data_lock);
3102
3782
  return;
3139
3819
}
3140
3820
 
3141
3821
 
3142
 
int Slave_log_event::do_apply_event(const Relay_log_info *)
 
3822
int Slave_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
3143
3823
{
3144
 
  if (drizzle_bin_log.is_open())
3145
 
    drizzle_bin_log.write(this);
 
3824
  if (mysql_bin_log.is_open())
 
3825
    mysql_bin_log.write(this);
3146
3826
  return 0;
3147
3827
}
3148
3828
 
3199
3879
  :Load_log_event(session_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
3200
3880
                  using_trans),
3201
3881
   fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
3202
 
   file_id(session_arg->file_id = drizzle_bin_log.next_file_id())
 
3882
   file_id(session_arg->file_id = mysql_bin_log.next_file_id())
3203
3883
{
3204
3884
  sql_ex.force_new_format();
3205
3885
  return;
3260
3940
  uint32_t header_len= description_event->common_header_len;
3261
3941
  uint8_t load_header_len= description_event->post_header_len[LOAD_EVENT-1];
3262
3942
  uint8_t create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
3263
 
  if (!(event_buf= (const char*)malloc(len)) ||
3264
 
      memcpy((char *)event_buf, buf, len) ||
 
3943
  if (!(event_buf= (char*) my_memdup(buf, len, MYF(MY_WME))) ||
3265
3944
      copy_log_event(event_buf,len,
3266
3945
                     ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
3267
3946
                      load_header_len + header_len :
3272
3951
    return;
3273
3952
  if (description_event->binlog_version!=1)
3274
3953
  {
3275
 
    file_id= uint4korr(buf +
 
3954
    file_id= uint4korr(buf + 
3276
3955
                       header_len +
3277
3956
                       load_header_len + CF_FILE_ID_OFFSET);
3278
3957
    /*
3284
3963
      as these Load events are not changed between 4.0 and 5.0 (as logging of
3285
3964
      LOAD DATA INFILE does not use Load_log_event in 5.0).
3286
3965
 
3287
 
      The + 1 is for \0 terminating fname
 
3966
      The + 1 is for \0 terminating fname  
3288
3967
    */
3289
3968
    block_offset= (description_event->common_header_len +
3290
3969
                   Load_log_event::get_data_size() +
3310
3989
void Create_file_log_event::pack_info(Protocol *protocol)
3311
3990
{
3312
3991
  char buf[NAME_LEN*2 + 30 + 21*2], *pos;
3313
 
  pos= strcpy(buf, "db=")+3;
 
3992
  pos= my_stpcpy(buf, "db=");
3314
3993
  memcpy(pos, db, db_len);
3315
 
  pos= strcpy(pos + db_len, ";table=")+7;
 
3994
  pos= my_stpcpy(pos + db_len, ";table=");
3316
3995
  memcpy(pos, table_name, table_name_len);
3317
 
  pos= strcpy(pos + table_name_len, ";file_id=")+9;
 
3996
  pos= my_stpcpy(pos + table_name_len, ";file_id=");
3318
3997
  pos= int10_to_str((long) file_id, pos, 10);
3319
 
  pos= strcpy(pos, ";block_len=")+11;
 
3998
  pos= my_stpcpy(pos, ";block_len=");
3320
3999
  pos= int10_to_str((long) block_len, pos, 10);
3321
4000
  protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3322
4001
}
3335
4014
  int error = 1;
3336
4015
 
3337
4016
  memset(&file, 0, sizeof(file));
3338
 
  fname_buf= strcpy(proc_info, "Making temp file ")+17;
 
4017
  fname_buf= my_stpcpy(proc_info, "Making temp file ");
3339
4018
  ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
3340
4019
  session->set_proc_info(proc_info);
3341
4020
  my_delete(fname_buf, MYF(0)); // old copy may exist already
3350
4029
                fname_buf);
3351
4030
    goto err;
3352
4031
  }
3353
 
 
 
4032
  
3354
4033
  // a trick to avoid allocating another buffer
3355
4034
  fname= fname_buf;
3356
 
  fname_len= (uint) ((strcpy(ext, ".data") + 5) - fname);
 
4035
  fname_len= (uint) (my_stpcpy(ext, ".data") - fname);
3357
4036
  if (write_base(&file))
3358
4037
  {
3359
 
    strcpy(ext, ".info"); // to have it right in the error message
 
4038
    my_stpcpy(ext, ".info"); // to have it right in the error message
3360
4039
    rli->report(ERROR_LEVEL, my_errno,
3361
4040
                _("Error in Create_file event: could not write to file '%s'"),
3362
4041
                fname_buf);
3422
4101
                                               const Format_description_log_event* description_event)
3423
4102
  :Log_event(buf, description_event),block(0)
3424
4103
{
3425
 
  uint8_t common_header_len= description_event->common_header_len;
 
4104
  uint8_t common_header_len= description_event->common_header_len; 
3426
4105
  uint8_t append_block_header_len=
3427
4106
    description_event->post_header_len[APPEND_BLOCK_EVENT-1];
3428
4107
  uint32_t total_header_len= common_header_len+append_block_header_len;
3482
4161
  int fd;
3483
4162
  int error = 1;
3484
4163
 
3485
 
  fname= strcpy(proc_info, "Making temp file ")+17;
 
4164
  fname= my_stpcpy(proc_info, "Making temp file ");
3486
4165
  slave_load_file_stem(fname, file_id, server_id, ".data");
3487
4166
  session->set_proc_info(proc_info);
3488
4167
  if (get_create_or_append())
3582
4261
  Delete_file_log_event::do_apply_event()
3583
4262
*/
3584
4263
 
3585
 
int Delete_file_log_event::do_apply_event(const Relay_log_info *)
 
4264
int Delete_file_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
3586
4265
{
3587
4266
  char fname[FN_REFLEN+10];
3588
4267
  char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
3589
4268
  (void) my_delete(fname, MYF(MY_WME));
3590
 
  strcpy(ext, ".info");
 
4269
  my_stpcpy(ext, ".info");
3591
4270
  (void) my_delete(fname, MYF(MY_WME));
3592
4271
  return 0;
3593
4272
}
3607
4286
  :Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
3608
4287
{
3609
4288
}
3610
 
 
 
4289
  
3611
4290
 
3612
4291
/*
3613
4292
  Execute_load_log_event ctor
3633
4312
{
3634
4313
  unsigned char buf[EXEC_LOAD_HEADER_LEN];
3635
4314
  int4store(buf + EL_FILE_ID_OFFSET, file_id);
3636
 
  return (write_header(file, sizeof(buf)) ||
 
4315
  return (write_header(file, sizeof(buf)) || 
3637
4316
          my_b_safe_write(file, buf, sizeof(buf)));
3638
4317
}
3639
4318
 
3697
4376
  */
3698
4377
 
3699
4378
  const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
3700
 
  if (lev->do_apply_event(0,rli,1))
 
4379
  if (lev->do_apply_event(0,rli,1)) 
3701
4380
  {
3702
4381
    /*
3703
4382
      We want to indicate the name of the file that could not be loaded
3707
4386
      don't want to overwrite it with the filename.
3708
4387
      What we want instead is add the filename to the current error message.
3709
4388
    */
3710
 
    char *tmp= strdup(rli->last_error().message);
 
4389
    char *tmp= my_strdup(rli->last_error().message, MYF(MY_WME));
3711
4390
    if (tmp)
3712
4391
    {
3713
4392
      rli->report(ERROR_LEVEL, rli->last_error().number,
3753
4432
  :Append_block_log_event(session_arg, db_arg, block_arg, block_len_arg,
3754
4433
                          using_trans)
3755
4434
{
3756
 
   file_id= session_arg->file_id= drizzle_bin_log.next_file_id();
 
4435
   file_id= session_arg->file_id= mysql_bin_log.next_file_id();
3757
4436
}
3758
4437
 
3759
4438
 
3846
4525
void Execute_load_query_log_event::pack_info(Protocol *protocol)
3847
4526
{
3848
4527
  char *buf, *pos;
3849
 
  if (!(buf= (char*) malloc(9 + db_len + q_len + 10 + 21)))
 
4528
  if (!(buf= (char*) my_malloc(9 + db_len + q_len + 10 + 21, MYF(MY_WME))))
3850
4529
    return;
3851
4530
  pos= buf;
3852
4531
  if (db && db_len)
3853
4532
  {
3854
 
    pos= strcpy(buf, "use `")+5;
 
4533
    pos= my_stpcpy(buf, "use `");
3855
4534
    memcpy(pos, db, db_len);
3856
 
    pos= strcpy(pos+db_len, "`; ")+3;
 
4535
    pos= my_stpcpy(pos+db_len, "`; ");
3857
4536
  }
3858
4537
  if (query && q_len)
3859
4538
  {
3860
4539
    memcpy(pos, query, q_len);
3861
4540
    pos+= q_len;
3862
4541
  }
3863
 
  pos= strcpy(pos, " ;file_id=")+10;
 
4542
  pos= my_stpcpy(pos, " ;file_id=");
3864
4543
  pos= int10_to_str((long) file_id, pos, 10);
3865
4544
  protocol->store(buf, pos-buf, &my_charset_bin);
3866
4545
  free(buf);
3876
4555
  char *fname_end;
3877
4556
  int error;
3878
4557
 
3879
 
  buf= (char*) malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
3880
 
                      (FN_REFLEN + 10) + 10 + 8 + 5);
 
4558
  buf= (char*) my_malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
 
4559
                         (FN_REFLEN + 10) + 10 + 8 + 5, MYF(MY_WME));
3881
4560
 
3882
4561
  /* Replace filename and LOCAL keyword in query before executing it */
3883
4562
  if (buf == NULL)
3891
4570
  p= buf;
3892
4571
  memcpy(p, query, fn_pos_start);
3893
4572
  p+= fn_pos_start;
3894
 
  fname= (p= strncpy(p, STRING_WITH_LEN(" INFILE \'")) + 9);
 
4573
  fname= (p= strmake(p, STRING_WITH_LEN(" INFILE \'")));
3895
4574
  p= slave_load_file_stem(p, file_id, server_id, ".data");
3896
4575
  fname_end= p= strchr(p, '\0');                      // Safer than p=p+5
3897
4576
  *(p++)='\'';
3898
4577
  switch (dup_handling) {
3899
4578
  case LOAD_DUP_IGNORE:
3900
 
    p= strncpy(p, STRING_WITH_LEN(" IGNORE")) + 7;
 
4579
    p= strmake(p, STRING_WITH_LEN(" IGNORE"));
3901
4580
    break;
3902
4581
  case LOAD_DUP_REPLACE:
3903
 
    p= strncpy(p, STRING_WITH_LEN(" REPLACE")) + 8;
 
4582
    p= strmake(p, STRING_WITH_LEN(" REPLACE"));
3904
4583
    break;
3905
4584
  default:
3906
4585
    /* Ordinary load data */
3907
4586
    break;
3908
4587
  }
3909
 
  size_t end_len = q_len-fn_pos_end;
3910
 
  p= strncpy(p, STRING_WITH_LEN(" INTO")) + 5;
3911
 
  p= strncpy(p, query+fn_pos_end, end_len);
3912
 
  p+= end_len;
 
4588
  p= strmake(p, STRING_WITH_LEN(" INTO"));
 
4589
  p= strmake(p, query+fn_pos_end, q_len-fn_pos_end);
3913
4590
 
3914
4591
  error= Query_log_event::do_apply_event(rli, buf, p-buf);
3915
4592
 
3948
4625
            my_b_safe_write(file,(unsigned char*) &opt_flags,1));
3949
4626
  }
3950
4627
  else
3951
 
    assert(0);
3952
 
  return true;
 
4628
  {
 
4629
    /**
 
4630
      @todo This is sensitive to field padding. We should write a
 
4631
      char[7], not an old_sql_ex. /sven
 
4632
    */
 
4633
    old_sql_ex old_ex;
 
4634
    old_ex.field_term= *field_term;
 
4635
    old_ex.enclosed=   *enclosed;
 
4636
    old_ex.line_term=  *line_term;
 
4637
    old_ex.line_start= *line_start;
 
4638
    old_ex.escaped=    *escaped;
 
4639
    old_ex.opt_flags=  opt_flags;
 
4640
    old_ex.empty_flags=empty_flags;
 
4641
    return my_b_safe_write(file, (unsigned char*) &old_ex, sizeof(old_ex)) != 0;
 
4642
  }
3953
4643
}
3954
4644
 
3955
4645
 
4015
4705
    m_table(tbl_arg),
4016
4706
    m_table_id(tid),
4017
4707
    m_width(tbl_arg ? tbl_arg->s->fields : 1),
4018
 
    m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
 
4708
    m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0) 
4019
4709
    , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4020
4710
{
4021
4711
  /*
4128
4818
 
4129
4819
  size_t const data_size= event_len - (ptr_rows_data - (const unsigned char *) buf);
4130
4820
 
4131
 
  m_rows_buf= (unsigned char*) malloc(data_size);
 
4821
  m_rows_buf= (unsigned char*) my_malloc(data_size, MYF(MY_WME));
4132
4822
  if (likely((bool)m_rows_buf))
4133
4823
  {
4134
4824
    m_curr_row= m_rows_buf;
4144
4834
 
4145
4835
Rows_log_event::~Rows_log_event()
4146
4836
{
4147
 
  if (m_cols.bitmap == m_bitbuf) // no malloc happened
 
4837
  if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
4148
4838
    m_cols.bitmap= 0; // so no free in bitmap_free
4149
4839
  bitmap_free(&m_cols); // To pair with bitmap_init().
4150
4840
  free((unsigned char*)m_rows_buf);
4165
4855
    data_size+= no_bytes_in_map(&m_cols_ai);
4166
4856
 
4167
4857
  data_size+= (m_rows_cur - m_rows_buf);
4168
 
  return data_size;
 
4858
  return data_size; 
4169
4859
}
4170
4860
 
4171
4861
 
4196
4886
  if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
4197
4887
  {
4198
4888
    size_t const block_size= 1024;
4199
 
    const size_t cur_size= m_rows_cur - m_rows_buf;
4200
 
    const size_t new_alloc=
 
4889
    my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
 
4890
    my_ptrdiff_t const new_alloc= 
4201
4891
        block_size * ((cur_size + length + block_size - 1) / block_size);
4202
4892
 
4203
 
    unsigned char* new_buf= (unsigned char*)realloc(m_rows_buf, new_alloc);
 
4893
    unsigned char* const new_buf= (unsigned char*)my_realloc((unsigned char*)m_rows_buf, (uint) new_alloc,
 
4894
                                           MYF(MY_ALLOW_ZERO_PTR|MY_WME));
4204
4895
    if (unlikely(!new_buf))
4205
4896
      return(HA_ERR_OUT_OF_MEM);
4206
4897
 
4321
5012
        return(error);
4322
5013
      }
4323
5014
 
 
5015
      /*
 
5016
        So we need to reopen the tables.
 
5017
 
 
5018
        We need to flush the pending RBR event, since it keeps a
 
5019
        pointer to an open table.
 
5020
 
 
5021
        ALTERNATIVE SOLUTION (not implemented): Extract a pointer to
 
5022
        the pending RBR event and reset the table pointer after the
 
5023
        tables has been reopened.
 
5024
 
 
5025
        NOTE: For this new scheme there should be no pending event:
 
5026
        need to add code to assert that is the case.
 
5027
       */
 
5028
      session->binlog_flush_pending_rows_event(false);
4324
5029
      TableList *tables= rli->tables_to_lock;
4325
5030
      close_tables_for_reopen(session, &tables);
4326
5031
 
4389
5094
    }
4390
5095
  }
4391
5096
 
4392
 
  Table*
4393
 
    table=
 
5097
  Table* 
 
5098
    table= 
4394
5099
    m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
4395
5100
 
4396
5101
  if (table)
4424
5129
        session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
4425
5130
    else
4426
5131
        session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
4427
 
 
 
5132
    
4428
5133
    if (slave_allow_batching)
4429
5134
      session->options|= OPTION_ALLOW_BATCH;
4430
5135
    else
4431
5136
      session->options&= ~OPTION_ALLOW_BATCH;
4432
 
 
 
5137
    
4433
5138
    /* A small test to verify that objects have consistent types */
4434
5139
    assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
4435
5140
 
4447
5152
     if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
4448
5153
      set_flags(COMPLETE_ROWS_F);
4449
5154
 
4450
 
    /*
 
5155
    /* 
4451
5156
      Set tables write and read sets.
4452
 
 
 
5157
      
4453
5158
      Read_set contains all slave columns (in case we are going to fetch
4454
5159
      a complete record from slave)
4455
 
 
4456
 
      Write_set equals the m_cols bitmap sent from master but it can be
4457
 
      longer if slave has extra columns.
4458
 
     */
 
5160
      
 
5161
      Write_set equals the m_cols bitmap sent from master but it can be 
 
5162
      longer if slave has extra columns. 
 
5163
     */ 
4459
5164
 
4460
5165
    bitmap_set_all(table->read_set);
4461
5166
    bitmap_set_all(table->write_set);
4464
5169
 
4465
5170
    this->slave_exec_mode= slave_exec_mode_options; // fix the mode
4466
5171
 
4467
 
    // Do event specific preparations
 
5172
    // Do event specific preparations 
4468
5173
    error= do_before_row_operations(rli);
4469
5174
 
4470
5175
    // row processing loop
4486
5191
      /*
4487
5192
        The following list of "idempotent" errors
4488
5193
        means that an error from the list might happen
4489
 
        because of idempotent (more than once)
 
5194
        because of idempotent (more than once) 
4490
5195
        applying of a binlog file.
4491
5196
        Notice, that binlog has a  ddl operation its
4492
5197
        second applying may cause
4493
5198
 
4494
5199
        case HA_ERR_TABLE_DEF_CHANGED:
4495
5200
        case HA_ERR_CANNOT_ADD_FOREIGN:
4496
 
 
 
5201
        
4497
5202
        which are not included into to the list.
4498
5203
      */
4499
5204
      case HA_ERR_RECORD_CHANGED:
4514
5219
          error= 0;
4515
5220
        }
4516
5221
        break;
4517
 
 
 
5222
        
4518
5223
      default:
4519
5224
        session->is_slave_error= 1;
4520
5225
        break;
4523
5228
      /*
4524
5229
       If m_curr_row_end  was not set during event execution (e.g., because
4525
5230
       of errors) we can't proceed to the next row. If the error is transient
4526
 
       (i.e., error==0 at this point) we must call unpack_current_row() to set
 
5231
       (i.e., error==0 at this point) we must call unpack_current_row() to set 
4527
5232
       m_curr_row_end.
4528
 
      */
 
5233
      */ 
4529
5234
      if (!m_curr_row_end && !error)
4530
5235
        unpack_current_row(rli, &m_cols);
4531
 
 
 
5236
  
4532
5237
      // at this moment m_curr_row_end should be set
4533
 
      assert(error || m_curr_row_end != NULL);
 
5238
      assert(error || m_curr_row_end != NULL); 
4534
5239
      assert(error || m_curr_row < m_curr_row_end);
4535
5240
      assert(error || m_curr_row_end <= m_rows_end);
4536
 
 
 
5241
  
4537
5242
      m_curr_row= m_curr_row_end;
4538
 
 
 
5243
 
4539
5244
    } // row processing loop
4540
5245
 
4541
5246
    error= do_after_row_operations(rli, error);
4553
5258
    const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4554
5259
  /* reset OPTION_ALLOW_BATCH as not affect later events */
4555
5260
  session->options&= ~OPTION_ALLOW_BATCH;
4556
 
 
 
5261
  
4557
5262
  if (error)
4558
5263
  {                     /* error has occured during the transaction */
4559
5264
    slave_rows_error_report(ERROR_LEVEL, error, rli, session, table,
4572
5277
      thread is certainly going to stop.
4573
5278
      rollback at the caller along with sbr.
4574
5279
    */
 
5280
    session->reset_current_stmt_binlog_row_based();
4575
5281
    const_cast<Relay_log_info*>(rli)->cleanup_context(session, error);
4576
5282
    session->is_slave_error= 1;
4577
5283
    return(error);
4601
5307
      problem.  When WL#2975 is implemented, just remove the member
4602
5308
      Relay_log_info::last_event_start_time and all its occurrences.
4603
5309
    */
4604
 
    time_t t= time(0);
4605
 
 
4606
 
    /* don't trust time() all the time */
4607
 
    if (t == (time_t)-1)
4608
 
      return (-1);
4609
 
    const_cast<Relay_log_info*>(rli)->last_event_start_time= time(0);
 
5310
    const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
4610
5311
  }
4611
5312
 
4612
5313
  return(0);
4634
5335
  if (get_flags(STMT_END_F))
4635
5336
  {
4636
5337
    /*
 
5338
      This is the end of a statement or transaction, so close (and
 
5339
      unlock) the tables we opened when processing the
 
5340
      Table_map_log_event starting the statement.
 
5341
 
 
5342
      OBSERVER.  This will clear *all* mappings, not only those that
 
5343
      are open for the table. There is not good handle for on-close
 
5344
      actions for tables.
 
5345
 
 
5346
      NOTE. Even if we have no table ('table' == 0) we still need to be
 
5347
      here, so that we increase the group relay log position. If we didn't, we
 
5348
      could have a group relay log position which lags behind "forever"
 
5349
      (assume the last master's transaction is ignored by the slave because of
 
5350
      replicate-ignore rules).
 
5351
    */
 
5352
    session->binlog_flush_pending_rows_event(true);
 
5353
 
 
5354
    /*
4637
5355
      If this event is not in a transaction, the call below will, if some
4638
5356
      transactional storage engines are involved, commit the statement into
4639
5357
      them and flush the pending event to binlog.
4654
5372
      event flushed.
4655
5373
    */
4656
5374
 
 
5375
    session->reset_current_stmt_binlog_row_based();
 
5376
 
4657
5377
    rli->cleanup_context(session, 0);
4658
5378
    if (error == 0)
4659
5379
    {
4746
5466
 
4747
5467
/**
4748
5468
  @page How replication of field metadata works.
4749
 
 
4750
 
  When a table map is created, the master first calls
4751
 
  Table_map_log_event::save_field_metadata() which calculates how many
4752
 
  values will be in the field metadata. Only those fields that require the
4753
 
  extra data are added. The method also loops through all of the fields in
 
5469
  
 
5470
  When a table map is created, the master first calls 
 
5471
  Table_map_log_event::save_field_metadata() which calculates how many 
 
5472
  values will be in the field metadata. Only those fields that require the 
 
5473
  extra data are added. The method also loops through all of the fields in 
4754
5474
  the table calling the method Field::save_field_metadata() which returns the
4755
5475
  values for the field that will be saved in the metadata and replicated to
4756
5476
  the slave. Once all fields have been processed, the table map is written to
4757
5477
  the binlog adding the size of the field metadata and the field metadata to
4758
5478
  the end of the body of the table map.
4759
5479
 
4760
 
  When a table map is read on the slave, the field metadata is read from the
4761
 
  table map and passed to the table_def class constructor which saves the
4762
 
  field metadata from the table map into an array based on the type of the
4763
 
  field. Field metadata values not present (those fields that do not use extra
4764
 
  data) in the table map are initialized as zero (0). The array size is the
 
5480
  When a table map is read on the slave, the field metadata is read from the 
 
5481
  table map and passed to the table_def class constructor which saves the 
 
5482
  field metadata from the table map into an array based on the type of the 
 
5483
  field. Field metadata values not present (those fields that do not use extra 
 
5484
  data) in the table map are initialized as zero (0). The array size is the 
4765
5485
  same as the columns for the table on the slave.
4766
5486
 
4767
 
  Additionally, values saved for field metadata on the master are saved as a
 
5487
  Additionally, values saved for field metadata on the master are saved as a 
4768
5488
  string of bytes (unsigned char) in the binlog. A field may require 1 or more bytes
4769
 
  to store the information. In cases where values require multiple bytes
4770
 
  (e.g. values > 255), the endian-safe methods are used to properly encode
 
5489
  to store the information. In cases where values require multiple bytes 
 
5490
  (e.g. values > 255), the endian-safe methods are used to properly encode 
4771
5491
  the values on the master and decode them on the slave. When the field
4772
5492
  metadata values are captured on the slave, they are stored in an array of
4773
5493
  type uint16_t. This allows the least number of casts to prevent casting bugs
4774
5494
  when the field metadata is used in comparisons of field attributes. When
4775
5495
  the field metadata is used for calculating addresses in pointer math, the
4776
 
  type used is uint32_t.
 
5496
  type used is uint32_t. 
4777
5497
*/
4778
5498
 
4779
5499
/**
4781
5501
  The metadata saved depends on the type of the field. Some fields
4782
5502
  store a single byte for pack_length() while others store two bytes
4783
5503
  for field_length (max length).
4784
 
 
 
5504
  
4785
5505
  @retval  0  Ok.
4786
5506
 
4787
5507
  @todo
4788
5508
  We may want to consider changing the encoding of the information.
4789
 
  Currently, the code attempts to minimize the number of bytes written to
4790
 
  the tablemap. There are at least two other alternatives; 1) using
 
5509
  Currently, the code attempts to minimize the number of bytes written to 
 
5510
  the tablemap. There are at least two other alternatives; 1) using 
4791
5511
  net_store_length() to store the data allowing it to choose the number of
4792
 
  bytes that are appropriate thereby making the code much easier to
 
5512
  bytes that are appropriate thereby making the code much easier to 
4793
5513
  maintain (only 1 place to change the encoding), or 2) use a fixed number
4794
5514
  of bytes for each field. The problem with option 1 is that net_store_length()
4795
5515
  will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
4798
5518
  encoded using 2 parts (e.g., pack_length, field_length) will be numerically
4799
5519
  > 250 therefore will use 3 bytes for eah value. The problem with option 2
4800
5520
  is less wasteful for space but does waste 1 byte for every field that does
4801
 
  not encode 2 parts.
 
5521
  not encode 2 parts. 
4802
5522
*/
4803
5523
int Table_map_log_event::save_field_metadata()
4804
5524
{
4813
5533
  Mats says tbl->s lives longer than this event so it's ok to copy pointers
4814
5534
  (tbl->s->db etc) and not pointer content.
4815
5535
 */
4816
 
Table_map_log_event::Table_map_log_event(Session *session, Table *tbl,
4817
 
                                         ulong tid, bool, uint16_t flags)
 
5536
Table_map_log_event::Table_map_log_event(Session *session, Table *tbl, ulong tid,
 
5537
                                         bool is_transactional __attribute__((unused)),
 
5538
                                         uint16_t flags)
4818
5539
  : Log_event(session, 0, true),
4819
5540
    m_table(tbl),
4820
5541
    m_dbnam(tbl->s->db.str),
4849
5570
  m_data_size+= 1 + m_colcnt;   // COLCNT and column types
4850
5571
 
4851
5572
  /* If malloc fails, caught in is_valid() */
4852
 
  if ((m_memory= (unsigned char*) malloc(m_colcnt)))
 
5573
  if ((m_memory= (unsigned char*) my_malloc(m_colcnt, MYF(MY_WME))))
4853
5574
  {
4854
5575
    m_coltype= reinterpret_cast<unsigned char*>(m_memory);
4855
5576
    for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
4882
5603
    plus one or two bytes for number of elements in the field metadata array.
4883
5604
  */
4884
5605
  if (m_field_metadata_size > 255)
4885
 
    m_data_size+= m_field_metadata_size + 2;
 
5606
    m_data_size+= m_field_metadata_size + 2; 
4886
5607
  else
4887
 
    m_data_size+= m_field_metadata_size + 1;
 
5608
    m_data_size+= m_field_metadata_size + 1; 
4888
5609
 
4889
5610
  memset(m_null_bits, 0, num_null_bytes);
4890
5611
  for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
5007
5728
  RPL_TableList *table_list;
5008
5729
  char *db_mem, *tname_mem;
5009
5730
  Query_id &query_id= Query_id::get_query_id();
 
5731
  size_t dummy_len;
5010
5732
  void *memory;
5011
5733
  assert(rli->sql_session == session);
5012
5734
 
5027
5749
  table_list->next_global= table_list->next_local= 0;
5028
5750
  table_list->table_id= m_table_id;
5029
5751
  table_list->updating= 1;
5030
 
  strcpy(table_list->db, m_dbnam);
5031
 
  strcpy(table_list->table_name, m_tblnam);
 
5752
  my_stpcpy(table_list->db, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
 
5753
  my_stpcpy(table_list->table_name, m_tblnam);
5032
5754
 
5033
5755
  int error= 0;
5034
5756
 
 
5757
  if (!rpl_filter->db_ok(table_list->db) ||
 
5758
      (rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list)))
 
5759
  {
 
5760
    free(memory);
 
5761
  }
 
5762
  else
5035
5763
  {
5036
5764
    /*
5037
5765
      open_tables() reads the contents of session->lex, so they must be
5039
5767
      call mysql_init_query() which does a more complete set of inits.
5040
5768
    */
5041
5769
    lex_start(session);
5042
 
    session->reset_for_next_command();
 
5770
    mysql_reset_session_for_next_command(session);
 
5771
    /*
 
5772
      Check if the slave is set to use SBR.  If so, it should switch
 
5773
      to using RBR until the end of the "statement", i.e., next
 
5774
      STMT_END_F or next error.
 
5775
    */
 
5776
    if (!session->current_stmt_binlog_row_based &&
 
5777
        mysql_bin_log.is_open() && (session->options & OPTION_BIN_LOG))
 
5778
    {
 
5779
      session->set_current_stmt_binlog_row_based();
 
5780
    }
5043
5781
 
5044
5782
    /*
5045
5783
      Open the table if it is not already open and add the table to
5099
5837
      inside Relay_log_info::clear_tables_to_lock() by calling the
5100
5838
      table_def destructor explicitly.
5101
5839
    */
5102
 
    new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt,
 
5840
    new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt, 
5103
5841
         m_field_metadata, m_field_metadata_size, m_null_bits);
5104
5842
    table_list->m_tabledef_valid= true;
5105
5843
 
5218
5956
{
5219
5957
}
5220
5958
 
5221
 
int
 
5959
int 
5222
5960
Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
5223
5961
{
5224
5962
  int error= 0;
5234
5972
      when writing rows, that is: new rows replace old rows.  We need to
5235
5973
      inform the storage engine that it should use this behaviour.
5236
5974
    */
5237
 
 
 
5975
    
5238
5976
    /* Tell the storage engine that we are using REPLACE semantics. */
5239
5977
    session->lex->duplicates= DUP_REPLACE;
5240
 
 
 
5978
    
5241
5979
    /*
5242
5980
      Pretend we're executing a REPLACE command: this is needed for
5243
5981
      InnoDB since it is not (properly) checking the
5244
5982
      lex->duplicates flag.
5245
5983
    */
5246
5984
    session->lex->sql_command= SQLCOM_REPLACE;
5247
 
    /*
 
5985
    /* 
5248
5986
       Do not raise the error flag in case of hitting to an unique attribute
5249
5987
    */
5250
5988
    m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
5271
6009
  return error;
5272
6010
}
5273
6011
 
5274
 
int
 
6012
int 
5275
6013
Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
5276
6014
                                              int error)
5277
6015
{
5281
6019
    m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
5282
6020
    m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
5283
6021
    /*
5284
 
      resetting the extra with
5285
 
      table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
 
6022
      resetting the extra with 
 
6023
      table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); 
5286
6024
      fires bug#27077
5287
6025
      explanation: file->reset() performs this duty
5288
6026
      ultimately. Still todo: fix
5337
6075
  Write the current row into event's table.
5338
6076
 
5339
6077
  The row is located in the row buffer, pointed by @c m_curr_row member.
5340
 
  Number of columns of the row is stored in @c m_width member (it can be
5341
 
  different from the number of columns in the table to which we insert).
5342
 
  Bitmap @c m_cols indicates which columns are present in the row. It is assumed
 
6078
  Number of columns of the row is stored in @c m_width member (it can be 
 
6079
  different from the number of columns in the table to which we insert). 
 
6080
  Bitmap @c m_cols indicates which columns are present in the row. It is assumed 
5343
6081
  that event's table is already open and pointed by @c m_table.
5344
6082
 
5345
 
  If the same record already exists in the table it can be either overwritten
5346
 
  or an error is reported depending on the value of @c overwrite flag
 
6083
  If the same record already exists in the table it can be either overwritten 
 
6084
  or an error is reported depending on the value of @c overwrite flag 
5347
6085
  (error reporting not yet implemented). Note that the matching record can be
5348
6086
  different from the row we insert if we use primary keys to identify records in
5349
6087
  the table.
5350
6088
 
5351
 
  The row to be inserted can contain values only for selected columns. The
5352
 
  missing columns are filled with default values using @c prepare_record()
 
6089
  The row to be inserted can contain values only for selected columns. The 
 
6090
  missing columns are filled with default values using @c prepare_record() 
5353
6091
  function. If a matching record is found in the table and @c overwritte is
5354
6092
  true, the missing columns are taken from it.
5355
6093
 
5356
6094
  @param  rli   Relay log info (needed for row unpacking).
5357
 
  @param  overwrite
5358
 
                Shall we overwrite if the row already exists or signal
 
6095
  @param  overwrite  
 
6096
                Shall we overwrite if the row already exists or signal 
5359
6097
                error (currently ignored).
5360
6098
 
5361
6099
  @returns Error code on failure, 0 on success.
5362
6100
 
5363
6101
  This method, if successful, sets @c m_curr_row_end pointer to point at the
5364
 
  next row in the rows buffer. This is done when unpacking the row to be
 
6102
  next row in the rows buffer. This is done when unpacking the row to be 
5365
6103
  inserted.
5366
6104
 
5367
 
  @note If a matching record is found, it is either updated using
 
6105
  @note If a matching record is found, it is either updated using 
5368
6106
  @c ha_update_row() or first deleted and then new record written.
5369
 
*/
 
6107
*/ 
5370
6108
 
5371
6109
int
5372
6110
Rows_log_event::write_row(const Relay_log_info *const rli,
5377
6115
  Table *table= m_table;  // pointer to event's table
5378
6116
  int error;
5379
6117
  int keynum;
5380
 
  basic_string<unsigned char> key;
 
6118
  auto_afree_ptr<char> key(NULL);
5381
6119
 
5382
6120
  /* fill table->record[0] with default values */
5383
6121
 
5393
6131
  */
5394
6132
  if ((error= prepare_record(table, &m_cols, m_width, true)))
5395
6133
    return(error);
5396
 
 
 
6134
  
5397
6135
  /* unpack row into table->record[0] */
5398
6136
  error= unpack_current_row(rli, &m_cols);
5399
6137
 
5400
6138
  // Temporary fix to find out why it fails [/Matz]
5401
6139
  memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
5402
6140
 
5403
 
  /*
 
6141
  /* 
5404
6142
    Try to write record. If a corresponding record already exists in the table,
5405
6143
    we try to change it using ha_update_row() if possible. Otherwise we delete
5406
 
    it and repeat the whole process again.
 
6144
    it and repeat the whole process again. 
5407
6145
 
5408
 
    TODO: Add safety measures against infinite looping.
 
6146
    TODO: Add safety measures against infinite looping. 
5409
6147
   */
5410
6148
 
5411
6149
  while ((error= table->file->ha_write_row(table->record[0])))
5457
6195
        return(my_errno);
5458
6196
      }
5459
6197
 
5460
 
      key.reserve(table->s->max_unique_length);
 
6198
      if (key.get() == NULL)
 
6199
      {
 
6200
        key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
 
6201
        if (key.get() == NULL)
 
6202
        {
 
6203
          return(ENOMEM);
 
6204
        }
 
6205
      }
5461
6206
 
5462
 
      key_copy(key, table->record[0], table->key_info + keynum, 0);
 
6207
      key_copy((unsigned char*)key.get(), table->record[0], table->key_info + keynum,
 
6208
               0);
5463
6209
      error= table->file->index_read_idx_map(table->record[1], keynum,
5464
 
                                             key.data(),
 
6210
                                             (const unsigned char*)key.get(),
5465
6211
                                             HA_WHOLE_KEY,
5466
6212
                                             HA_READ_KEY_EXACT);
5467
6213
      if (error)
5478
6224
     */
5479
6225
 
5480
6226
    /*
5481
 
      If row is incomplete we will use the record found to fill
5482
 
      missing columns.
 
6227
      If row is incomplete we will use the record found to fill 
 
6228
      missing columns.  
5483
6229
    */
5484
6230
    if (!get_flags(COMPLETE_ROWS_F))
5485
6231
    {
5508
6254
      error=table->file->ha_update_row(table->record[1],
5509
6255
                                       table->record[0]);
5510
6256
      switch (error) {
5511
 
 
 
6257
                
5512
6258
      case HA_ERR_RECORD_IS_THE_SAME:
5513
6259
        error= 0;
5514
 
 
 
6260
      
5515
6261
      case 0:
5516
6262
        break;
5517
 
 
5518
 
      default:
 
6263
        
 
6264
      default:    
5519
6265
        table->file->print_error(error, MYF(0));
5520
6266
      }
5521
 
 
 
6267
      
5522
6268
      return(error);
5523
6269
    }
5524
6270
    else
5550
6296
}
5551
6297
 
5552
6298
 
5553
 
int
 
6299
int 
5554
6300
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
5555
6301
{
5556
6302
  assert(m_table != NULL);
5557
6303
  int error=
5558
6304
    write_row(rli,        /* if 1 then overwrite */
5559
6305
              bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
5560
 
 
 
6306
    
5561
6307
  if (error && !session->is_error())
5562
6308
  {
5563
6309
    assert(0);
5564
6310
    my_error(ER_UNKNOWN_ERROR, MYF(0));
5565
6311
  }
5566
 
 
5567
 
  return error;
 
6312
  
 
6313
  return error; 
5568
6314
}
5569
6315
 
5570
6316
 
5652
6398
/**
5653
6399
  Locate the current row in event's table.
5654
6400
 
5655
 
  The current row is pointed by @c m_curr_row. Member @c m_width tells how many
5656
 
  columns are there in the row (this can be differnet from the number of columns
5657
 
  in the table). It is assumed that event's table is already open and pointed
 
6401
  The current row is pointed by @c m_curr_row. Member @c m_width tells how many 
 
6402
  columns are there in the row (this can be differnet from the number of columns 
 
6403
  in the table). It is assumed that event's table is already open and pointed 
5658
6404
  by @c m_table.
5659
6405
 
5660
 
  If a corresponding record is found in the table it is stored in
5661
 
  @c m_table->record[0]. Note that when record is located based on a primary
 
6406
  If a corresponding record is found in the table it is stored in 
 
6407
  @c m_table->record[0]. Note that when record is located based on a primary 
5662
6408
  key, it is possible that the record found differs from the row being located.
5663
6409
 
5664
 
  If no key is specified or table does not have keys, a table scan is used to
 
6410
  If no key is specified or table does not have keys, a table scan is used to 
5665
6411
  find the row. In that case the row should be complete and contain values for
5666
 
  all columns. However, it can still be shorter than the table, i.e. the table
5667
 
  can contain extra columns not present in the row. It is also possible that
5668
 
  the table has fewer columns than the row being located.
5669
 
 
5670
 
  @returns Error code on failure, 0 on success.
5671
 
 
5672
 
  @post In case of success @c m_table->record[0] contains the record found.
 
6412
  all columns. However, it can still be shorter than the table, i.e. the table 
 
6413
  can contain extra columns not present in the row. It is also possible that 
 
6414
  the table has fewer columns than the row being located. 
 
6415
 
 
6416
  @returns Error code on failure, 0 on success. 
 
6417
  
 
6418
  @post In case of success @c m_table->record[0] contains the record found. 
5673
6419
  Also, the internal "cursor" of the table is positioned at the record found.
5674
6420
 
5675
6421
  @note If the engine allows random access of the records, a combination of
5676
 
  @c position() and @c rnd_pos() will be used.
 
6422
  @c position() and @c rnd_pos() will be used. 
5677
6423
 */
5678
6424
 
5679
6425
int Rows_log_event::find_row(const Relay_log_info *rli)
5684
6430
  int error;
5685
6431
 
5686
6432
  /* unpack row - missing fields get default values */
5687
 
  prepare_record(table, &m_cols, m_width, false/* don't check errors */);
 
6433
  prepare_record(table, &m_cols, m_width, false/* don't check errors */); 
5688
6434
  error= unpack_current_row(rli, &m_cols);
5689
6435
 
5690
6436
  // Temporary fix to find out why it fails [/Matz]
5721
6467
  }
5722
6468
 
5723
6469
  // We can't use position() - try other methods.
5724
 
 
 
6470
  
5725
6471
  /*
5726
 
    Save copy of the record in table->record[1]. It might be needed
 
6472
    Save copy of the record in table->record[1]. It might be needed 
5727
6473
    later if linear search is used to find exact match.
5728
 
   */
5729
 
  store_record(table,record[1]);
 
6474
   */ 
 
6475
  store_record(table,record[1]);    
5730
6476
 
5731
6477
  if (table->s->keys > 0)
5732
6478
  {
5751
6497
    my_ptrdiff_t const pos=
5752
6498
      table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
5753
6499
    table->record[0][pos]= 0xFF;
5754
 
 
5755
 
    if ((error= table->file->index_read_map(table->record[0], m_key,
 
6500
    
 
6501
    if ((error= table->file->index_read_map(table->record[0], m_key, 
5756
6502
                                            HA_WHOLE_KEY,
5757
6503
                                            HA_READ_KEY_EXACT)))
5758
6504
    {
5783
6529
 
5784
6530
    /*
5785
6531
      In case key is not unique, we still have to iterate over records found
5786
 
      and find the one which is identical to the row given. A copy of the
 
6532
      and find the one which is identical to the row given. A copy of the 
5787
6533
      record we are looking for is stored in record[1].
5788
 
     */
 
6534
     */ 
5789
6535
    while (record_compare(table))
5790
6536
    {
5791
6537
      /*
5850
6596
      }
5851
6597
    }
5852
6598
    while (restart_count < 2 && record_compare(table));
5853
 
 
5854
 
    /*
5855
 
      Note: above record_compare will take into accout all record fields
 
6599
    
 
6600
    /* 
 
6601
      Note: above record_compare will take into accout all record fields 
5856
6602
      which might be incorrect in case a partial row was given in the event
5857
6603
     */
5858
6604
    table->file->ha_rnd_end();
5891
6637
}
5892
6638
 
5893
6639
 
5894
 
int
 
6640
int 
5895
6641
Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
5896
6642
{
5897
6643
  if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
5906
6652
  if (m_table->s->keys > 0)
5907
6653
  {
5908
6654
    // Allocate buffer for key searches
5909
 
    m_key= (unsigned char*)malloc(m_table->key_info->key_length);
 
6655
    m_key= (unsigned char*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
5910
6656
    if (!m_key)
5911
6657
      return HA_ERR_OUT_OF_MEM;
5912
6658
  }
5914
6660
  return 0;
5915
6661
}
5916
6662
 
5917
 
int
5918
 
Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
 
6663
int 
 
6664
Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, 
5919
6665
                                               int error)
5920
6666
{
5921
6667
  /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
5931
6677
  int error;
5932
6678
  assert(m_table != NULL);
5933
6679
 
5934
 
  if (!(error= find_row(rli)))
5935
 
  {
 
6680
  if (!(error= find_row(rli))) 
 
6681
  { 
5936
6682
    /*
5937
6683
      Delete the record found, located in record[0]
5938
6684
    */
5977
6723
 
5978
6724
Update_rows_log_event::~Update_rows_log_event()
5979
6725
{
5980
 
  if (m_cols_ai.bitmap == m_bitbuf_ai) // no malloc happened
 
6726
  if (m_cols_ai.bitmap == m_bitbuf_ai) // no my_malloc happened
5981
6727
    m_cols_ai.bitmap= 0; // so no free in bitmap_free
5982
6728
  bitmap_free(&m_cols_ai); // To pair with bitmap_init().
5983
6729
}
5995
6741
}
5996
6742
 
5997
6743
 
5998
 
int
 
6744
int 
5999
6745
Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6000
6746
{
6001
6747
  if (m_table->s->keys > 0)
6002
6748
  {
6003
6749
    // Allocate buffer for key searches
6004
 
    m_key= (unsigned char*)malloc(m_table->key_info->key_length);
 
6750
    m_key= (unsigned char*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
6005
6751
    if (!m_key)
6006
6752
      return HA_ERR_OUT_OF_MEM;
6007
6753
  }
6011
6757
  return 0;
6012
6758
}
6013
6759
 
6014
 
int
6015
 
Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
 
6760
int 
 
6761
Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, 
6016
6762
                                               int error)
6017
6763
{
6018
6764
  /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6023
6769
  return error;
6024
6770
}
6025
6771
 
6026
 
int
 
6772
int 
6027
6773
Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6028
6774
{
6029
6775
  assert(m_table != NULL);
6030
6776
 
6031
 
  int error= find_row(rli);
 
6777
  int error= find_row(rli); 
6032
6778
  if (error)
6033
6779
  {
6034
6780
    /*