1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008 Sun Microsystems
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; version 2 of the License.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20
#include <drizzled/server_includes.h>
21
#include <drizzled/log_event.h>
22
#include <drizzled/replication/rli.h>
23
#include <drizzled/replication/mi.h>
24
#include <libdrizzle/libdrizzle.h>
25
#include <mysys/hash.h>
26
#include <drizzled/replication/utility.h>
27
#include <drizzled/replication/record.h>
28
#include <mysys/my_dir.h>
29
#include <drizzled/error.h>
30
#include <libdrizzle/pack.h>
31
#include <drizzled/sql_parse.h>
32
#include <drizzled/sql_base.h>
33
#include <drizzled/sql_load.h>
34
#include <drizzled/item/return_int.h>
35
#include <drizzled/item/empty_string.h>
40
#include <mysys/base64.h>
41
#include <mysys/my_bitmap.h>
43
#include <drizzled/gettext.h>
44
#include <libdrizzle/libdrizzle.h>
45
#include <drizzled/error.h>
46
#include <drizzled/query_id.h>
47
#include <drizzled/tztime.h>
48
#include <drizzled/slave.h>
49
#include <drizzled/lock.h>
53
static const char *HA_ERR(int i)
56
case HA_ERR_KEY_NOT_FOUND: return "HA_ERR_KEY_NOT_FOUND";
57
case HA_ERR_FOUND_DUPP_KEY: return "HA_ERR_FOUND_DUPP_KEY";
58
case HA_ERR_RECORD_CHANGED: return "HA_ERR_RECORD_CHANGED";
59
case HA_ERR_WRONG_INDEX: return "HA_ERR_WRONG_INDEX";
60
case HA_ERR_CRASHED: return "HA_ERR_CRASHED";
61
case HA_ERR_WRONG_IN_RECORD: return "HA_ERR_WRONG_IN_RECORD";
62
case HA_ERR_OUT_OF_MEM: return "HA_ERR_OUT_OF_MEM";
63
case HA_ERR_NOT_A_TABLE: return "HA_ERR_NOT_A_TABLE";
64
case HA_ERR_WRONG_COMMAND: return "HA_ERR_WRONG_COMMAND";
65
case HA_ERR_OLD_FILE: return "HA_ERR_OLD_FILE";
66
case HA_ERR_NO_ACTIVE_RECORD: return "HA_ERR_NO_ACTIVE_RECORD";
67
case HA_ERR_RECORD_DELETED: return "HA_ERR_RECORD_DELETED";
68
case HA_ERR_RECORD_FILE_FULL: return "HA_ERR_RECORD_FILE_FULL";
69
case HA_ERR_INDEX_FILE_FULL: return "HA_ERR_INDEX_FILE_FULL";
70
case HA_ERR_END_OF_FILE: return "HA_ERR_END_OF_FILE";
71
case HA_ERR_UNSUPPORTED: return "HA_ERR_UNSUPPORTED";
72
case HA_ERR_TO_BIG_ROW: return "HA_ERR_TO_BIG_ROW";
73
case HA_WRONG_CREATE_OPTION: return "HA_WRONG_CREATE_OPTION";
74
case HA_ERR_FOUND_DUPP_UNIQUE: return "HA_ERR_FOUND_DUPP_UNIQUE";
75
case HA_ERR_UNKNOWN_CHARSET: return "HA_ERR_UNKNOWN_CHARSET";
76
case HA_ERR_WRONG_MRG_TABLE_DEF: return "HA_ERR_WRONG_MRG_TABLE_DEF";
77
case HA_ERR_CRASHED_ON_REPAIR: return "HA_ERR_CRASHED_ON_REPAIR";
78
case HA_ERR_CRASHED_ON_USAGE: return "HA_ERR_CRASHED_ON_USAGE";
79
case HA_ERR_LOCK_WAIT_TIMEOUT: return "HA_ERR_LOCK_WAIT_TIMEOUT";
80
case HA_ERR_LOCK_TABLE_FULL: return "HA_ERR_LOCK_TABLE_FULL";
81
case HA_ERR_READ_ONLY_TRANSACTION: return "HA_ERR_READ_ONLY_TRANSACTION";
82
case HA_ERR_LOCK_DEADLOCK: return "HA_ERR_LOCK_DEADLOCK";
83
case HA_ERR_CANNOT_ADD_FOREIGN: return "HA_ERR_CANNOT_ADD_FOREIGN";
84
case HA_ERR_NO_REFERENCED_ROW: return "HA_ERR_NO_REFERENCED_ROW";
85
case HA_ERR_ROW_IS_REFERENCED: return "HA_ERR_ROW_IS_REFERENCED";
86
case HA_ERR_NO_SAVEPOINT: return "HA_ERR_NO_SAVEPOINT";
87
case HA_ERR_NON_UNIQUE_BLOCK_SIZE: return "HA_ERR_NON_UNIQUE_BLOCK_SIZE";
88
case HA_ERR_NO_SUCH_TABLE: return "HA_ERR_NO_SUCH_TABLE";
89
case HA_ERR_TABLE_EXIST: return "HA_ERR_TABLE_EXIST";
90
case HA_ERR_NO_CONNECTION: return "HA_ERR_NO_CONNECTION";
91
case HA_ERR_NULL_IN_SPATIAL: return "HA_ERR_NULL_IN_SPATIAL";
92
case HA_ERR_TABLE_DEF_CHANGED: return "HA_ERR_TABLE_DEF_CHANGED";
93
case HA_ERR_NO_PARTITION_FOUND: return "HA_ERR_NO_PARTITION_FOUND";
94
case HA_ERR_RBR_LOGGING_FAILED: return "HA_ERR_RBR_LOGGING_FAILED";
95
case HA_ERR_DROP_INDEX_FK: return "HA_ERR_DROP_INDEX_FK";
96
case HA_ERR_FOREIGN_DUPLICATE_KEY: return "HA_ERR_FOREIGN_DUPLICATE_KEY";
97
case HA_ERR_TABLE_NEEDS_UPGRADE: return "HA_ERR_TABLE_NEEDS_UPGRADE";
98
case HA_ERR_TABLE_READONLY: return "HA_ERR_TABLE_READONLY";
99
case HA_ERR_AUTOINC_READ_FAILED: return "HA_ERR_AUTOINC_READ_FAILED";
100
case HA_ERR_AUTOINC_ERANGE: return "HA_ERR_AUTOINC_ERANGE";
101
case HA_ERR_GENERIC: return "HA_ERR_GENERIC";
102
case HA_ERR_RECORD_IS_THE_SAME: return "HA_ERR_RECORD_IS_THE_SAME";
103
case HA_ERR_LOGGING_IMPOSSIBLE: return "HA_ERR_LOGGING_IMPOSSIBLE";
104
case HA_ERR_CORRUPT_EVENT: return "HA_ERR_CORRUPT_EVENT";
105
case HA_ERR_ROWS_EVENT_APPLY : return "HA_ERR_ROWS_EVENT_APPLY";
111
Error reporting facility for Rows_log_event::do_apply_event
113
@param level error, warning or info
114
@param ha_error HA_ERR_ code
115
@param rli pointer to the active Relay_log_info instance
116
@param session pointer to the slave thread's session
117
@param table pointer to the event's table object
118
@param type the type of the event
119
@param log_name the master binlog file name
120
@param pos the master binlog file pos (the next after the event)
123
static void inline slave_rows_error_report(enum loglevel level, int ha_error,
124
Relay_log_info const *rli, Session *session,
125
Table *table, const char * type,
126
const char *log_name, ulong pos)
128
const char *handler_error= HA_ERR(ha_error);
129
char buff[MAX_SLAVE_ERRMSG], *slider;
130
const char *buff_end= buff + sizeof(buff);
132
List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
136
for (err= it++, slider= buff; err && slider < buff_end - 1;
137
slider += len, err= it++)
139
len= snprintf(slider, buff_end - slider,
140
_(" %s, Error_code: %d;"), err->msg, err->code);
143
rli->report(level, session->is_error()? session->main_da.sql_errno() : 0,
144
_("Could not execute %s event on table %s.%s;"
145
"%s handler error %s; "
146
"the event's master log %s, end_log_pos %lu"),
147
type, table->s->db.str,
148
table->s->table_name.str,
150
handler_error == NULL? _("<unknown>") : handler_error,
156
Cache that will automatically be written to a dedicated file on
162
class Write_on_release_cache
170
typedef unsigned short flag_set;
176
Write_on_release_cache
177
cache Pointer to cache to use
178
file File to write cache to upon destruction
179
flags Flags for the cache
183
Class used to guarantee copy of cache to file before exiting the
184
current block. On successful copy of the cache, the cache will
185
be reinited as a WRITE_CACHE.
187
Currently, a pointer to the cache is provided in the
188
constructor, but it would be possible to create a subclass
189
holding the IO_CACHE itself.
191
Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0)
192
: m_cache(cache), m_file(file), m_flags(flags)
194
reinit_io_cache(m_cache, WRITE_CACHE, 0L, false, true);
197
~Write_on_release_cache()
199
copy_event_cache_to_file_and_reinit(m_cache, m_file);
200
if (m_flags | FLUSH_F)
205
Return a pointer to the internal IO_CACHE.
212
Function to return a pointer to the internal cache, so that the
213
object can be treated as a IO_CACHE and used with the my_b_*
217
A pointer to the internal IO_CACHE.
219
IO_CACHE *operator&()
225
// Hidden, to prevent usage.
226
Write_on_release_cache(Write_on_release_cache const&);
233
uint32_t debug_not_change_ts_if_art_event= 1; // bug#29309 simulation
239
static void clear_all_errors(Session *session, Relay_log_info *rli)
241
session->is_slave_error = 0;
242
session->clear_error();
248
Ignore error code specified on command line.
251
inline int ignored_error_code(int err_code)
253
return ((err_code == ER_SLAVE_IGNORED_TABLE) ||
254
(use_slave_mask && bitmap_is_set(&slave_error_mask, err_code)));
262
static char *pretty_print_str(char *packet, const char *str, int len)
264
const char *end= str + len;
270
switch ((c=*str++)) {
271
case '\n': *pos++= '\\'; *pos++= 'n'; break;
272
case '\r': *pos++= '\\'; *pos++= 'r'; break;
273
case '\\': *pos++= '\\'; *pos++= '\\'; break;
274
case '\b': *pos++= '\\'; *pos++= 'b'; break;
275
case '\t': *pos++= '\\'; *pos++= 't'; break;
276
case '\'': *pos++= '\\'; *pos++= '\''; break;
277
case 0 : *pos++= '\\'; *pos++= '0'; break;
289
Creates a temporary name for load data infile:.
291
@param buf Store new filename here
292
@param file_id File_id (part of file name)
293
@param event_server_id Event_id (part of file name)
294
@param ext Extension for file name
297
Pointer to start of extension
300
static char *slave_load_file_stem(char *buf, uint32_t file_id,
301
int event_server_id, const char *ext)
304
fn_format(buf,"SQL_LOAD-",slave_load_tmpdir, "", MY_UNPACK_FILENAME);
307
buf= strchr(buf, '\0');
308
buf= int10_to_str(::server_id, buf, 10);
310
buf= int10_to_str(event_server_id, buf, 10);
312
res= int10_to_str(file_id, buf, 10);
313
strcpy(res, ext); // Add extension last
314
return res; // Pointer to extension
319
Delete all temporary files used for SQL_LOAD.
322
static void cleanup_load_tmpdir()
327
char fname[FN_REFLEN], prefbuf[31], *p;
329
if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
333
When we are deleting temporary files, we should only remove
334
the files associated with the server id of our server.
335
We don't use event_server_id here because since we've disabled
336
direct binlogging of Create_file/Append_file/Exec_load events
337
we cannot meet Start_log event in the middle of events from one
340
p= strncpy(prefbuf, STRING_WITH_LEN("SQL_LOAD-")) + 9;
341
p= int10_to_str(::server_id, p, 10);
345
for (i=0 ; i < (uint)dirp->number_off_files; i++)
347
file=dirp->dir_entry+i;
348
if (is_prefix(file->name, prefbuf))
350
fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME);
351
my_delete(fname, MYF(0));
363
static bool write_str(IO_CACHE *file, const char *str, uint32_t length)
365
unsigned char tmp[1];
366
tmp[0]= (unsigned char) length;
367
return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
368
my_b_safe_write(file, (unsigned char*) str, length));
376
static inline int read_str(const char **buf, const char *buf_end,
377
const char **str, uint8_t *len)
379
if (*buf + ((uint) (unsigned char) **buf) >= buf_end)
381
*len= (uint8_t) **buf;
383
(*buf)+= (uint) *len+1;
389
Transforms a string into "" or its expression in 0x... form.
392
char *str_to_hex(char *to, const char *from, uint32_t len)
398
to= octet2hex(to, from, len);
401
to= strcpy(to, "\"\"")+2;
402
return to; // pointer to end 0 of 'to'
407
Append a version of the 'from' string suitable for use in a query to
408
the 'to' string. To generate a correct escaping, the character set
409
information in 'csinfo' is used.
413
append_query_string(const CHARSET_INFO * const csinfo,
414
String const *from, String *to)
417
uint32_t const orig_len= to->length();
418
if (to->reserve(orig_len + from->length()*2+3))
421
beg= to->c_ptr_quick() + to->length();
423
if (csinfo->escape_with_backslash_is_dangerous)
424
ptr= str_to_hex(ptr, from->ptr(), from->length());
428
ptr+= drizzle_escape_string(ptr, from->ptr(), from->length());
431
to->length(orig_len + ptr - beg);
436
/**************************************************************************
437
Log_event methods (= the parent class of all events)
438
**************************************************************************/
442
returns the human readable name of the event's type
445
const char* Log_event::get_type_str(Log_event_type type)
448
case START_EVENT_V3: return "Start_v3";
449
case STOP_EVENT: return "Stop";
450
case QUERY_EVENT: return "Query";
451
case ROTATE_EVENT: return "Rotate";
452
case LOAD_EVENT: return "Load";
453
case NEW_LOAD_EVENT: return "New_load";
454
case SLAVE_EVENT: return "Slave";
455
case CREATE_FILE_EVENT: return "Create_file";
456
case APPEND_BLOCK_EVENT: return "Append_block";
457
case DELETE_FILE_EVENT: return "Delete_file";
458
case EXEC_LOAD_EVENT: return "Exec_load";
459
case XID_EVENT: return "Xid";
460
case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
461
case TABLE_MAP_EVENT: return "Table_map";
462
case WRITE_ROWS_EVENT: return "Write_rows";
463
case UPDATE_ROWS_EVENT: return "Update_rows";
464
case DELETE_ROWS_EVENT: return "Delete_rows";
465
case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
466
case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
467
case INCIDENT_EVENT: return "Incident";
468
default: return "Unknown"; /* impossible */
472
const char* Log_event::get_type_str()
474
return get_type_str(get_type_code());
479
Log_event::Log_event()
482
Log_event::Log_event(Session* session_arg, uint16_t flags_arg, bool using_trans)
483
:log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), session(session_arg)
485
server_id= session->server_id;
486
when= session->start_time;
487
cache_stmt= using_trans;
492
This minimal constructor is for when you are not even sure that there
493
is a valid Session. For example in the server when we are shutting down or
494
flushing logs after receiving a SIGHUP (then we must write a Rotate to
495
the binlog but we have no Session, so we need this minimal constructor).
498
Log_event::Log_event()
499
:temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
502
server_id= ::server_id;
504
We can't call my_time() here as this would cause a call before
513
Log_event::Log_event()
516
Log_event::Log_event(const char* buf,
517
const Format_description_log_event* description_event)
518
:temp_buf(0), cache_stmt(0)
521
when= uint4korr(buf);
522
server_id= uint4korr(buf + SERVER_ID_OFFSET);
523
data_written= uint4korr(buf + EVENT_LEN_OFFSET);
524
if (description_event->binlog_version==1)
531
log_pos= uint4korr(buf + LOG_POS_OFFSET);
533
If the log is 4.0 (so here it can only be a 4.0 relay log read by
534
the SQL thread or a 4.0 master binlog read by the I/O thread),
535
log_pos is the beginning of the event: we transform it into the end
536
of the event, which is more useful.
537
But how do you know that the log is 4.0: you know it if
538
description_event is version 3 *and* you are not reading a
539
Format_desc (remember that mysqlbinlog starts by assuming that 5.0
540
logs are in 4.0 format, until it finds a Format_desc).
542
if (description_event->binlog_version==3 &&
543
buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
546
If log_pos=0, don't change it. log_pos==0 is a marker to mean
547
"don't change rli->group_master_log_pos" (see
548
inc_group_relay_log_pos()). As it is unreal log_pos, adding the
549
event len's is nonsense. For example, a fake Rotate event should
550
not have its log_pos (which is 0) changed or it will modify
551
Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense
552
value of (a non-zero offset which does not exist in the master's
553
binlog, so which will cause problems if the user uses this value
556
log_pos+= data_written; /* purecov: inspected */
559
flags= uint2korr(buf + FLAGS_OFFSET);
560
if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
561
(buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
564
These events always have a header which stops here (i.e. their
568
Initialization to zero of all other Log_event members as they're
569
not specified. Currently there are no such members; in the future
570
there will be an event UID (but Format_description and Rotate
571
don't need this UID, as they are not propagated through
572
--log-slave-updates (remember the UID is used to not play a query
573
twice when you have two masters which are slaves of a 3rd master).
578
/* otherwise, go on with reading the header from buf (nothing now) */
582
int Log_event::do_update_pos(Relay_log_info *rli)
585
rli is null when (as far as I (Guilhem) know) the caller is
586
Load_log_event::do_apply_event *and* that one is called from
587
Execute_load_log_event::do_apply_event. In this case, we don't
588
do anything here ; Execute_load_log_event::do_apply_event will
589
call Log_event::do_apply_event again later with the proper rli.
590
Strictly speaking, if we were sure that rli is null only in the
591
case discussed above, 'if (rli)' is useless here. But as we are
592
not 100% sure, keep it for now.
594
Matz: I don't think we will need this check with this refactoring.
599
bug#29309 simulation: resetting the flag to force
600
wrong behaviour of artificial event to update
601
rli->last_master_timestamp for only one time -
602
the first FLUSH LOGS in the test.
604
if (debug_not_change_ts_if_art_event == 1
605
&& is_artificial_event())
606
debug_not_change_ts_if_art_event= 0;
607
rli->stmt_done(log_pos,
608
is_artificial_event() &&
609
debug_not_change_ts_if_art_event > 0 ? 0 : when);
610
if (debug_not_change_ts_if_art_event == 0)
611
debug_not_change_ts_if_art_event= 2;
613
return 0; // Cannot fail currently
617
Log_event::enum_skip_reason
618
Log_event::do_shall_skip(Relay_log_info *rli)
620
if ((server_id == ::server_id && !rli->replicate_same_server_id) || (rli->slave_skip_counter == 1 && rli->is_in_group()))
621
return EVENT_SKIP_IGNORE;
622
else if (rli->slave_skip_counter > 0)
623
return EVENT_SKIP_COUNT;
625
return EVENT_SKIP_NOT;
630
Log_event::pack_info()
633
void Log_event::pack_info(Protocol *protocol)
635
protocol->store("", &my_charset_bin);
639
const char* Log_event::get_db()
641
return session ? session->db : 0;
646
init_show_field_list() prepares the column names and types for the
647
output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
651
void Log_event::init_show_field_list(List<Item>* field_list)
653
field_list->push_back(new Item_empty_string("Log_name", 20));
654
field_list->push_back(new Item_return_int("Pos", MY_INT32_NUM_DECIMAL_DIGITS,
655
DRIZZLE_TYPE_LONGLONG));
656
field_list->push_back(new Item_empty_string("Event_type", 20));
657
field_list->push_back(new Item_return_int("Server_id", 10,
659
field_list->push_back(new Item_return_int("End_log_pos",
660
MY_INT32_NUM_DECIMAL_DIGITS,
661
DRIZZLE_TYPE_LONGLONG));
662
field_list->push_back(new Item_empty_string("Info", 20));
669
bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
671
unsigned char header[LOG_EVENT_HEADER_LEN];
674
/* Store number of bytes that will be written by this event */
675
data_written= event_data_length + sizeof(header);
678
log_pos != 0 if this is relay-log event. In this case we should not
682
if (is_artificial_event())
685
We should not do any cleanup on slave when reading this. We
686
mark this by setting log_pos to 0. Start_log_event_v3() will
687
detect this on reading and set artificial_event=1 for the event.
694
Calculate position of end of event
696
Note that with a SEQ_READ_APPEND cache, my_b_tell() does not
697
work well. So this will give slightly wrong positions for the
698
Format_desc/Rotate/Stop events which the slave writes to its
699
relay log. For example, the initial Format_desc will have
700
end_log_pos=91 instead of 95. Because after writing the first 4
701
bytes of the relay log, my_b_tell() still reports 0. Because
702
my_b_append() does not update the counter which my_b_tell()
703
later uses (one should probably use my_b_append_tell() to work
704
around this). To get right positions even when writing to the
705
relay log, we use the (new) my_b_safe_tell().
707
Note that this raises a question on the correctness of all these
708
assert(my_b_tell()=rli->event_relay_log_pos).
710
If in a transaction, the log_pos which we calculate below is not
711
very good (because then my_b_safe_tell() returns start position
712
of the BEGIN, so it's like the statement was at the BEGIN's
713
place), but it's not a very serious problem (as the slave, when
714
it is in a transaction, does not take those end_log_pos into
715
account (as it calls inc_event_relay_log_pos()). To be fixed
716
later, so that it looks less strange. But not bug.
719
log_pos= my_b_safe_tell(file)+data_written;
722
now= (ulong) get_time(); // Query start time
725
Header will be of size LOG_EVENT_HEADER_LEN for all events, except for
726
FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT, where it will be
727
LOG_EVENT_MINIMAL_HEADER_LEN (remember these 2 have a frozen header,
728
because we read them before knowing the format).
731
int4store(header, now); // timestamp
732
header[EVENT_TYPE_OFFSET]= get_type_code();
733
int4store(header+ SERVER_ID_OFFSET, server_id);
734
int4store(header+ EVENT_LEN_OFFSET, data_written);
735
int4store(header+ LOG_POS_OFFSET, log_pos);
736
int2store(header+ FLAGS_OFFSET, flags);
738
return(my_b_safe_write(file, header, sizeof(header)) != 0);
742
time_t Log_event::get_time()
744
Session *tmp_session;
748
return session->start_time;
749
if ((tmp_session= current_session))
750
return tmp_session->start_time;
756
This needn't be format-tolerant, because we only read
757
LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length).
760
int Log_event::read_log_event(IO_CACHE* file, String* packet,
761
pthread_mutex_t* log_lock)
765
char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
768
pthread_mutex_lock(log_lock);
769
if (my_b_read(file, (unsigned char*) buf, sizeof(buf)))
772
If the read hits eof, we must report it as eof so the caller
773
will know it can go into cond_wait to be woken up on the next
777
result= LOG_READ_EOF;
779
result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
782
data_len= uint4korr(buf + EVENT_LEN_OFFSET);
783
if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
784
data_len > current_session->variables.max_allowed_packet)
786
result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
791
/* Append the log event header to packet */
792
if (packet->append(buf, sizeof(buf)))
794
/* Failed to allocate packet */
795
result= LOG_READ_MEM;
798
data_len-= LOG_EVENT_MINIMAL_HEADER_LEN;
801
/* Append rest of event, read directly from file into packet */
802
if (packet->append(file, data_len))
805
Fatal error occured when appending rest of the event
806
to packet, possible failures:
807
1. EOF occured when reading from file, it's really an error
808
as data_len is >=0 there's supposed to be more bytes available.
809
file->error will have been set to number of bytes left to read
810
2. Read was interrupted, file->error would normally be set to -1
811
3. Failed to allocate memory for packet, my_errno
812
will be ENOMEM(file->error shuold be 0, but since the
813
memory allocation occurs before the call to read it might
816
result= (my_errno == ENOMEM ? LOG_READ_MEM :
817
(file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
818
/* Implicit goto end; */
824
pthread_mutex_unlock(log_lock);
828
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
829
#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
833
Allocates memory; The caller is responsible for clean-up.
835
Log_event* Log_event::read_log_event(IO_CACHE* file,
836
pthread_mutex_t* log_lock,
837
const Format_description_log_event
840
assert(description_event != 0);
841
char head[LOG_EVENT_MINIMAL_HEADER_LEN];
843
First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
844
check the event for sanity and to know its length; no need to really parse
845
it. We say "at most" because this could be a 3.23 master, which has header
846
of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's
847
"minimal" over the set {MySQL >=4.0}).
849
uint32_t header_size= cmin(description_event->common_header_len,
850
LOG_EVENT_MINIMAL_HEADER_LEN);
853
if (my_b_read(file, (unsigned char *) head, header_size))
857
No error here; it could be that we are at the file's end. However
858
if the next my_b_read() fails (below), it will be an error as we
859
were able to read the first bytes.
863
uint32_t data_len = uint4korr(head + EVENT_LEN_OFFSET);
865
const char *error= 0;
867
#ifndef max_allowed_packet
868
Session *session=current_session;
869
uint32_t max_allowed_packet= session ? session->variables.max_allowed_packet : ~(ulong)0;
872
if (data_len > max_allowed_packet)
874
error = "Event too big";
878
if (data_len < header_size)
880
error = "Event too small";
884
// some events use the extra byte to null-terminate strings
885
if (!(buf = (char*) malloc(data_len+1)))
887
error = "Out of memory";
891
memcpy(buf, head, header_size);
892
if (my_b_read(file, (unsigned char*) buf + header_size, data_len - header_size))
894
error = "read error";
897
if ((res= read_log_event(buf, data_len, &error, description_event)))
898
res->register_temp_buf(buf);
905
sql_print_error(_("Error in Log_event::read_log_event(): "
906
"'%s', data_len: %d, event_type: %d"),
907
error,data_len,head[EVENT_TYPE_OFFSET]);
910
The SQL slave thread will check if file->error<0 to know
911
if there was an I/O error. Even if there is no "low-level" I/O errors
912
with 'file', any of the high-level above errors is worrying
913
enough to stop the SQL thread now ; as we are skipping the current event,
914
going on with reading and successfully executing other events can
915
only corrupt the slave's databases. So stop.
924
Binlog format tolerance is in (buf, event_len, description_event)
928
Log_event* Log_event::read_log_event(const char* buf, uint32_t event_len,
930
const Format_description_log_event *description_event)
933
assert(description_event != 0);
935
/* Check the integrity */
936
if (event_len < EVENT_LEN_OFFSET ||
937
buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
938
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
940
*error="Sanity check failed"; // Needed to free buffer
941
return(NULL); // general sanity check - will fail on a partial read
944
uint32_t event_type= buf[EVENT_TYPE_OFFSET];
945
if (event_type > description_event->number_of_event_types &&
946
event_type != FORMAT_DESCRIPTION_EVENT)
949
It is unsafe to use the description_event if its post_header_len
950
array does not include the event type.
957
In some previuos versions (see comment in
958
Format_description_log_event::Format_description_log_event(char*,...)),
959
event types were assigned different id numbers than in the
960
present version. In order to replicate from such versions to the
961
present version, we must map those event type id's to our event
962
type id's. The mapping is done with the event_type_permutation
963
array, which was set up when the Format_description_log_event
966
if (description_event->event_type_permutation)
967
event_type= description_event->event_type_permutation[event_type];
971
ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
974
ev = new Load_log_event(buf, event_len, description_event);
977
ev = new Load_log_event(buf, event_len, description_event);
980
ev = new Rotate_log_event(buf, event_len, description_event);
982
case CREATE_FILE_EVENT:
983
ev = new Create_file_log_event(buf, event_len, description_event);
985
case APPEND_BLOCK_EVENT:
986
ev = new Append_block_log_event(buf, event_len, description_event);
988
case DELETE_FILE_EVENT:
989
ev = new Delete_file_log_event(buf, event_len, description_event);
991
case EXEC_LOAD_EVENT:
992
ev = new Execute_load_log_event(buf, event_len, description_event);
994
case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
995
ev = new Start_log_event_v3(buf, description_event);
998
ev = new Stop_log_event(buf, description_event);
1001
ev = new Xid_log_event(buf, description_event);
1003
case FORMAT_DESCRIPTION_EVENT:
1004
ev = new Format_description_log_event(buf, event_len, description_event);
1006
case WRITE_ROWS_EVENT:
1007
ev = new Write_rows_log_event(buf, event_len, description_event);
1009
case UPDATE_ROWS_EVENT:
1010
ev = new Update_rows_log_event(buf, event_len, description_event);
1012
case DELETE_ROWS_EVENT:
1013
ev = new Delete_rows_log_event(buf, event_len, description_event);
1015
case TABLE_MAP_EVENT:
1016
ev = new Table_map_log_event(buf, event_len, description_event);
1018
case BEGIN_LOAD_QUERY_EVENT:
1019
ev = new Begin_load_query_log_event(buf, event_len, description_event);
1021
case EXECUTE_LOAD_QUERY_EVENT:
1022
ev= new Execute_load_query_log_event(buf, event_len, description_event);
1024
case INCIDENT_EVENT:
1025
ev = new Incident_log_event(buf, event_len, description_event);
1034
is_valid() are small event-specific sanity tests which are
1035
important; for example there are some malloc() in constructors
1036
(e.g. Query_log_event::Query_log_event(char*...)); when these
1037
malloc() fail we can't return an error out of the constructor
1038
(because constructor is "void") ; so instead we leave the pointer we
1039
wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
1040
Same for Format_description_log_event, member 'post_header_len'.
1042
if (!ev || !ev->is_valid())
1045
*error= "Found invalid event in binary log";
1051
inline Log_event::enum_skip_reason
1052
Log_event::continue_group(Relay_log_info *rli)
1054
if (rli->slave_skip_counter == 1)
1055
return Log_event::EVENT_SKIP_IGNORE;
1056
return Log_event::do_shall_skip(rli);
1059
/**************************************************************************
1060
Query_log_event methods
1061
**************************************************************************/
1064
This (which is used only for SHOW BINLOG EVENTS) could be updated to
1065
print SET @@session_var=. But this is not urgent, as SHOW BINLOG EVENTS is
1066
only an information, it does not produce suitable queries to replay (for
1067
example it does not print LOAD DATA INFILE).
1072
void Query_log_event::pack_info(Protocol *protocol)
1074
// TODO: show the catalog ??
1076
if (!(buf= (char*) malloc(9 + db_len + q_len)))
1079
if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
1082
pos= strcpy(buf, "use `")+5;
1083
memcpy(pos, db, db_len);
1084
pos= strcpy(pos+db_len, "`; ")+3;
1088
memcpy(pos, query, q_len);
1091
protocol->store(buf, pos-buf, &my_charset_bin);
1097
Query_log_event::write().
1100
In this event we have to modify the header to have the correct
1101
EVENT_LEN_OFFSET as we don't yet know how many status variables we
1105
bool Query_log_event::write(IO_CACHE* file)
1108
@todo if catalog can be of length FN_REFLEN==512, then we are not
1109
replicating it correctly, since the length is stored in a byte
1112
unsigned char buf[QUERY_HEADER_LEN+
1113
1+4+ // code of flags2 and flags2
1114
1+8+ // code of sql_mode and sql_mode
1115
1+1+FN_REFLEN+ // code of catalog and catalog length and catalog
1116
1+4+ // code of autoinc and the 2 autoinc variables
1117
1+6+ // code of charset and charset
1118
1+1+MAX_TIME_ZONE_NAME_LENGTH+ // code of tz and tz length and tz name
1119
1+2+ // code of lc_time_names and lc_time_names_number
1120
1+2 // code of charset_database and charset_database_number
1121
], *start, *start_of_status;
1125
return 1; // Something wrong with event
1128
We want to store the thread id:
1129
(- as an information for the user when he reads the binlog)
1130
- if the query uses temporary table: for the slave SQL thread to know to
1131
which master connection the temp table belongs.
1132
Now imagine we (write()) are called by the slave SQL thread (we are
1133
logging a query executed by this thread; the slave runs with
1134
--log-slave-updates). Then this query will be logged with
1135
thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of
1136
the same name were created simultaneously on the master (in the master
1138
CREATE TEMPORARY TABLE t; (thread 1)
1139
CREATE TEMPORARY TABLE t; (thread 2)
1141
then in the slave's binlog there will be
1142
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1143
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1144
which is bad (same thread id!).
1146
To avoid this, we log the thread's thread id EXCEPT for the SQL
1147
slave thread for which we log the original (master's) thread id.
1148
Now this moves the bug: what happens if the thread id on the
1149
master was 10 and when the slave replicates the query, a
1150
connection number 10 is opened by a normal client on the slave,
1151
and updates a temp table of the same name? We get a problem
1152
again. To avoid this, in the handling of temp tables (sql_base.cc)
1153
we use thread_id AND server_id. TODO when this is merged into
1154
4.1: in 4.1, slave_proxy_id has been renamed to pseudo_thread_id
1155
and is a session variable: that's to make mysqlbinlog work with
1156
temp tables. We probably need to introduce
1158
SET PSEUDO_SERVER_ID
1159
for mysqlbinlog in 4.1. mysqlbinlog would print:
1160
SET PSEUDO_SERVER_ID=
1161
SET PSEUDO_THREAD_ID=
1162
for each query using temp tables.
1164
int4store(buf + Q_THREAD_ID_OFFSET, slave_proxy_id);
1165
int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
1166
buf[Q_DB_LEN_OFFSET] = (char) db_len;
1167
int2store(buf + Q_ERR_CODE_OFFSET, error_code);
1170
You MUST always write status vars in increasing order of code. This
1171
guarantees that a slightly older slave will be able to parse those he
1174
start_of_status= start= buf+QUERY_HEADER_LEN;
1177
*start++= Q_FLAGS2_CODE;
1178
int4store(start, flags2);
1181
if (lc_time_names_number)
1183
assert(lc_time_names_number <= 0xFFFF);
1184
*start++= Q_LC_TIME_NAMES_CODE;
1185
int2store(start, lc_time_names_number);
1188
if (charset_database_number)
1190
assert(charset_database_number <= 0xFFFF);
1191
*start++= Q_CHARSET_DATABASE_CODE;
1192
int2store(start, charset_database_number);
1196
Here there could be code like
1197
if (command-line-option-which-says-"log_this_variable" && inited)
1199
*start++= Q_THIS_VARIABLE_CODE;
1200
int4store(start, this_variable);
1205
/* Store length of status variables */
1206
status_vars_len= (uint) (start-start_of_status);
1207
assert(status_vars_len <= MAX_SIZE_LOG_EVENT_STATUS);
1208
int2store(buf + Q_STATUS_VARS_LEN_OFFSET, status_vars_len);
1211
Calculate length of whole event
1212
The "1" below is the \0 in the db's length
1214
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
1216
return (write_header(file, event_length) ||
1217
my_b_safe_write(file, (unsigned char*) buf, QUERY_HEADER_LEN) ||
1218
write_post_header_for_derived(file) ||
1219
my_b_safe_write(file, (unsigned char*) start_of_status,
1220
(uint) (start-start_of_status)) ||
1221
my_b_safe_write(file, (db) ? (unsigned char*) db : (unsigned char*)"", db_len + 1) ||
1222
my_b_safe_write(file, (unsigned char*) query, q_len)) ? 1 : 0;
1226
The simplest constructor that could possibly work. This is used for
1227
creating static objects that have a special meaning and are invisible
1230
Query_log_event::Query_log_event()
1231
:Log_event(), data_buf(0)
1238
Query_log_event::Query_log_event()
1239
session_arg - thread handle
1240
query_arg - array of char representing the query
1241
query_length - size of the `query_arg' array
1242
using_trans - there is a modified transactional table
1243
suppress_use - suppress the generation of 'USE' statements
1244
killed_status_arg - an optional with default to Session::KILLED_NO_VALUE
1245
if the value is different from the default, the arg
1246
is set to the current session->killed value.
1247
A caller might need to masquerade session->killed with
1248
Session::NOT_KILLED.
1250
Creates an event for binlogging
1251
The value for local `killed_status' can be supplied by caller.
1253
Query_log_event::Query_log_event(Session* session_arg, const char* query_arg,
1254
ulong query_length, bool using_trans,
1256
Session::killed_state killed_status_arg)
1257
:Log_event(session_arg,
1258
(session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0) |
1259
(suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1261
data_buf(0), query(query_arg), catalog(session_arg->catalog),
1262
db(session_arg->db), q_len((uint32_t) query_length),
1263
thread_id(session_arg->thread_id),
1264
/* save the original thread id; we already know the server id */
1265
slave_proxy_id(session_arg->variables.pseudo_thread_id),
1266
flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1268
auto_increment_increment(session_arg->variables.auto_increment_increment),
1269
auto_increment_offset(session_arg->variables.auto_increment_offset),
1270
lc_time_names_number(session_arg->variables.lc_time_names->number),
1271
charset_database_number(0)
1275
if (killed_status_arg == Session::KILLED_NO_VALUE)
1276
killed_status_arg= session_arg->killed;
1279
(killed_status_arg == Session::NOT_KILLED) ?
1280
(session_arg->is_error() ? session_arg->main_da.sql_errno() : 0) :
1281
(session_arg->killed_errno());
1284
exec_time = (ulong) (end_time - session_arg->start_time);
1286
@todo this means that if we have no catalog, then it is replicated
1287
as an existing catalog of length zero. is that safe? /sven
1289
catalog_len = (catalog) ? (uint32_t) strlen(catalog) : 0;
1290
/* status_vars_len is set just before writing the event */
1291
db_len = (db) ? (uint32_t) strlen(db) : 0;
1292
if (session_arg->variables.collation_database != session_arg->db_charset)
1293
charset_database_number= session_arg->variables.collation_database->number;
1296
If we don't use flags2 for anything else than options contained in
1297
session_arg->options, it would be more efficient to flags2=session_arg->options
1298
(OPTIONS_WRITTEN_TO_BIN_LOG would be used only at reading time).
1299
But it's likely that we don't want to use 32 bits for 3 bits; in the future
1300
we will probably want to reclaim the 29 bits. So we need the &.
1302
flags2= (uint32_t) (session_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
1303
assert(session_arg->variables.character_set_client->number < 256*256);
1304
assert(session_arg->variables.collation_connection->number < 256*256);
1305
assert(session_arg->variables.collation_server->number < 256*256);
1306
assert(session_arg->variables.character_set_client->mbminlen == 1);
1307
int2store(charset, session_arg->variables.character_set_client->number);
1308
int2store(charset+2, session_arg->variables.collation_connection->number);
1309
int2store(charset+4, session_arg->variables.collation_server->number);
1313
static void copy_str_and_move(const char **src,
1314
Log_event::Byte **dst,
1317
memcpy(*dst, *src, len);
1318
*src= (const char *)*dst;
1325
Macro to check that there is enough space to read from memory.
1327
@param PTR Pointer to memory
1328
@param END End of memory
1329
@param CNT Number of bytes that should be read.
1331
#define CHECK_SPACE(PTR,END,CNT) \
1333
assert((PTR) + (CNT) <= (END)); \
1334
if ((PTR) + (CNT) > (END)) { \
1342
This is used by the SQL slave thread to prepare the event before execution.
1344
Query_log_event::Query_log_event(const char* buf, uint32_t event_len,
1345
const Format_description_log_event
1347
Log_event_type event_type)
1348
:Log_event(buf, description_event), data_buf(0), query(NULL),
1349
db(NULL), catalog_len(0), status_vars_len(0),
1350
flags2_inited(0), sql_mode_inited(0), charset_inited(0),
1351
auto_increment_increment(1), auto_increment_offset(1),
1352
time_zone_len(0), lc_time_names_number(0), charset_database_number(0)
1356
uint8_t common_header_len, post_header_len;
1357
Log_event::Byte *start;
1358
const Log_event::Byte *end;
1361
common_header_len= description_event->common_header_len;
1362
post_header_len= description_event->post_header_len[event_type-1];
1365
We test if the event's length is sensible, and if so we compute data_len.
1366
We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
1367
We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
1369
if (event_len < (uint)(common_header_len + post_header_len))
1371
data_len = event_len - (common_header_len + post_header_len);
1372
buf+= common_header_len;
1374
slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
1375
exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
1376
db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
1377
error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
1380
5.0 format starts here.
1381
Depending on the format, we may or not have affected/warnings etc
1382
The remnent post-header to be parsed has length:
1384
tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
1387
status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
1389
Check if status variable length is corrupt and will lead to very
1390
wrong data. We could be even more strict and require data_len to
1391
be even bigger, but this will suffice to catch most corruption
1392
errors that can lead to a crash.
1394
if (status_vars_len > cmin(data_len, (uint32_t)MAX_SIZE_LOG_EVENT_STATUS))
1399
data_len-= status_vars_len;
1403
We have parsed everything we know in the post header for QUERY_EVENT,
1404
the rest of post header is either comes from older version MySQL or
1405
dedicated to derived events (e.g. Execute_load_query...)
1408
/* variable-part: the status vars; only in MySQL 5.0 */
1410
start= (Log_event::Byte*) (buf+post_header_len);
1411
end= (const Log_event::Byte*) (start+status_vars_len);
1412
for (const Log_event::Byte* pos= start; pos < end;)
1416
CHECK_SPACE(pos, end, 4);
1418
flags2= uint4korr(pos);
1421
case Q_LC_TIME_NAMES_CODE:
1422
CHECK_SPACE(pos, end, 2);
1423
lc_time_names_number= uint2korr(pos);
1426
case Q_CHARSET_DATABASE_CODE:
1427
CHECK_SPACE(pos, end, 2);
1428
charset_database_number= uint2korr(pos);
1432
/* That's why you must write status vars in growing order of code */
1433
pos= (const unsigned char*) end; // Break loop
1437
if (!(start= data_buf = (Log_event::Byte*) malloc(catalog_len + 1 +
1441
if (catalog_len) // If catalog is given
1444
@todo we should clean up and do only copy_str_and_move; it
1445
works for both cases. Then we can remove the catalog_nz
1448
if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
1449
copy_str_and_move(&catalog, &start, catalog_len);
1452
memcpy(start, catalog, catalog_len+1); // copy end 0
1453
catalog= (const char *)start;
1454
start+= catalog_len+1;
1458
copy_str_and_move(&time_zone_str, &start, time_zone_len);
1461
if time_zone_len or catalog_len are 0, then time_zone and catalog
1462
are uninitialized at this point. shouldn't they point to the
1463
zero-length null-terminated strings we allocated space for in the
1464
my_alloc call above? /sven
1467
/* A 2nd variable part; this is common to all versions */
1468
memcpy(start, end, data_len); // Copy db and query
1469
start[data_len]= '\0'; // End query with \0 (For safetly)
1471
query= (char *)(start + db_len + 1);
1472
q_len= data_len - db_len -1;
1478
Query_log_event::do_apply_event()
1480
int Query_log_event::do_apply_event(Relay_log_info const *rli)
1482
return do_apply_event(rli, query, q_len);
1488
Compare the values of "affected rows" around here. Something
1491
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1493
sql_print_error("Slave: did not get the expected number of affected \
1494
rows running query from master - expected %d, got %d (this numbers \
1495
should have matched modulo 4294967296).", 0, ...);
1496
session->query_error = 1;
1499
We may also want an option to tell the slave to ignore "affected"
1500
mismatch. This mismatch could be implemented with a new ER_ code, and
1501
to ignore it you would use --slave-skip-errors...
1503
int Query_log_event::do_apply_event(Relay_log_info const *rli,
1504
const char *query_arg, uint32_t q_len_arg)
1506
int expected_error,actual_error= 0;
1507
Query_id &query_id= Query_id::get_query_id();
1509
Colleagues: please never free(session->catalog) in MySQL. This would
1510
lead to bugs as here session->catalog is a part of an alloced block,
1511
not an entire alloced block (see
1512
Query_log_event::do_apply_event()). Same for session->db. Thank
1515
session->catalog= catalog_len ? (char *) catalog : (char *)"";
1516
session->set_db(db, strlen(db)); /* allocates a copy of 'db' */
1517
session->variables.auto_increment_increment= auto_increment_increment;
1518
session->variables.auto_increment_offset= auto_increment_offset;
1521
InnoDB internally stores the master log position it has executed so far,
1522
i.e. the position just after the COMMIT event.
1523
When InnoDB will want to store, the positions in rli won't have
1524
been updated yet, so group_master_log_* will point to old BEGIN
1525
and event_master_log* will point to the beginning of current COMMIT.
1526
But log_pos of the COMMIT Query event is what we want, i.e. the pos of the
1527
END of the current log event (COMMIT). We save it in rli so that InnoDB can
1530
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
1532
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
1533
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
1536
Note: We do not need to execute reset_one_shot_variables() if this
1538
Reason: The db stored in binlog events is the same for SET and for
1539
its companion query. If the SET is ignored because of
1540
db_ok(), the companion query will also be ignored, and if
1541
the companion query is ignored in the db_ok() test of
1542
::do_apply_event(), then the companion SET also have so
1543
we don't need to reset_one_shot_variables().
1547
session->set_time((time_t)when);
1548
session->query_length= q_len_arg;
1549
session->query= (char*)query_arg;
1550
session->query_id= query_id.next();
1551
session->variables.pseudo_thread_id= thread_id; // for temp tables
1553
if (ignored_error_code((expected_error= error_code)) ||
1554
!check_expected_error(session,rli,expected_error))
1558
all bits of session->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
1559
must take their value from flags2.
1561
session->options= flags2|(session->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
1564
String tmp(time_zone_str, time_zone_len, &my_charset_bin);
1565
if (!(session->variables.time_zone= my_tz_find(session, &tmp)))
1567
my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr());
1568
session->variables.time_zone= global_system_variables.time_zone;
1569
goto compare_errors;
1572
if (lc_time_names_number)
1574
if (!(session->variables.lc_time_names=
1575
my_locale_by_number(lc_time_names_number)))
1577
my_printf_error(ER_UNKNOWN_ERROR,
1578
"Unknown locale: '%d'", MYF(0), lc_time_names_number);
1579
session->variables.lc_time_names= &my_locale_en_US;
1580
goto compare_errors;
1584
session->variables.lc_time_names= &my_locale_en_US;
1585
if (charset_database_number)
1587
const CHARSET_INFO *cs;
1588
if (!(cs= get_charset(charset_database_number, MYF(0))))
1591
int10_to_str((int) charset_database_number, buf, -10);
1592
my_error(ER_UNKNOWN_COLLATION, MYF(0), buf);
1593
goto compare_errors;
1595
session->variables.collation_database= cs;
1598
session->variables.collation_database= session->db_charset;
1600
/* Execute the query (note that we bypass dispatch_command()) */
1601
const char* found_semicolon= NULL;
1602
mysql_parse(session, session->query, session->query_length, &found_semicolon);
1603
log_slow_statement(session);
1608
The query got a really bad error on the master (thread killed etc),
1609
which could be inconsistent. Parse it to test the table names: if the
1610
replicate-*-do|ignore-table rules say "this query must be ignored" then
1611
we exit gracefully; otherwise we warn about the bad error and tell DBA
1614
if (mysql_test_parse_for_slave(session, session->query, session->query_length))
1615
clear_all_errors(session, const_cast<Relay_log_info*>(rli)); /* Can ignore query */
1618
rli->report(ERROR_LEVEL, expected_error,
1619
_("Query partially completed on the master "
1620
"(error on master: %d) and was aborted. There is a "
1621
"chance that your master is inconsistent at this "
1622
"point. If you are sure that your master is ok, run "
1623
"this query manually on the slave and then restart the "
1624
"slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1; "
1625
"START SLAVE; . Query: '%s'"),
1626
expected_error, session->query);
1627
session->is_slave_error= 1;
1635
If we expected a non-zero error code, and we don't get the same error
1636
code, and none of them should be ignored.
1638
actual_error= session->is_error() ? session->main_da.sql_errno() : 0;
1639
if ((expected_error != actual_error) &&
1641
!ignored_error_code(actual_error) &&
1642
!ignored_error_code(expected_error))
1644
rli->report(ERROR_LEVEL, 0,
1645
_("Query caused differenxt errors on master and slave.\n"
1646
"Error on master: '%s' (%d), Error on slave: '%s' (%d).\n"
1647
"Default database: '%s'. Query: '%s'"),
1650
actual_error ? session->main_da.message() : _("no error"),
1652
print_slave_db_safe(db), query_arg);
1653
session->is_slave_error= 1;
1656
If we get the same error code as expected, or they should be ignored.
1658
else if (expected_error == actual_error ||
1659
ignored_error_code(actual_error))
1661
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
1662
session->killed= Session::NOT_KILLED;
1665
Other cases: mostly we expected no error and get one.
1667
else if (session->is_slave_error || session->is_fatal_error)
1669
rli->report(ERROR_LEVEL, actual_error,
1670
_("Error '%s' on query. Default database: '%s'. Query: '%s'"),
1671
(actual_error ? session->main_da.message() :
1672
_("unexpected success or fatal error")),
1673
print_slave_db_safe(session->db), query_arg);
1674
session->is_slave_error= 1;
1678
TODO: compare the values of "affected rows" around here. Something
1680
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1682
sql_print_error("Slave: did not get the expected number of affected \
1683
rows running query from master - expected %d, got %d (this numbers \
1684
should have matched modulo 4294967296).", 0, ...);
1685
session->is_slave_error = 1;
1687
We may also want an option to tell the slave to ignore "affected"
1688
mismatch. This mismatch could be implemented with a new ER_ code, and
1689
to ignore it you would use --slave-skip-errors...
1691
To do the comparison we need to know the value of "affected" which the
1692
above mysql_parse() computed. And we need to know the value of
1693
"affected" in the master's binlog. Both will be implemented later. The
1694
important thing is that we now have the format ready to log the values
1695
of "affected" in the binlog. So we can release 5.0.0 before effectively
1696
logging "affected" and effectively comparing it.
1698
} /* End of if (db_ok(... */
1701
pthread_mutex_lock(&LOCK_thread_count);
1703
Probably we have set session->query, session->db, session->catalog to point to places
1704
in the data_buf of this event. Now the event is going to be deleted
1705
probably, so data_buf will be freed, so the session->... listed above will be
1706
pointers to freed memory.
1707
So we must set them to 0, so that those bad pointers values are not later
1708
used. Note that "cleanup" queries like automatic DROP TEMPORARY Table
1709
don't suffer from these assignments to 0 as DROP TEMPORARY
1710
Table uses the db.table syntax.
1712
session->catalog= 0;
1713
session->set_db(NULL, 0); /* will free the current database */
1714
session->query= 0; // just to be sure
1715
session->query_length= 0;
1716
pthread_mutex_unlock(&LOCK_thread_count);
1717
close_thread_tables(session);
1718
session->first_successful_insert_id_in_prev_stmt= 0;
1719
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
1720
return session->is_slave_error;
1723
int Query_log_event::do_update_pos(Relay_log_info *rli)
1725
return Log_event::do_update_pos(rli);
1729
Log_event::enum_skip_reason
1730
Query_log_event::do_shall_skip(Relay_log_info *rli)
1732
assert(query && q_len > 0);
1734
if (rli->slave_skip_counter > 0)
1736
if (strcmp("BEGIN", query) == 0)
1738
session->options|= OPTION_BEGIN;
1739
return(Log_event::continue_group(rli));
1742
if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
1744
session->options&= ~OPTION_BEGIN;
1745
return(Log_event::EVENT_SKIP_COUNT);
1748
return(Log_event::do_shall_skip(rli));
1752
/**************************************************************************
1753
Start_log_event_v3 methods
1754
**************************************************************************/
1756
Start_log_event_v3::Start_log_event_v3()
1757
:Log_event(), created(0), binlog_version(BINLOG_VERSION),
1758
artificial_event(0), dont_set_created(0)
1760
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
1764
Start_log_event_v3::pack_info()
1767
void Start_log_event_v3::pack_info(Protocol *protocol)
1769
char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
1770
pos= strcpy(buf, "Server ver: ")+12;
1771
pos= strcpy(pos, server_version)+strlen(server_version);
1772
pos= strcpy(pos, ", Binlog ver: ")+14;
1773
pos= int10_to_str(binlog_version, pos, 10);
1774
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
1779
Start_log_event_v3::Start_log_event_v3()
1782
Start_log_event_v3::Start_log_event_v3(const char* buf,
1783
const Format_description_log_event
1785
:Log_event(buf, description_event)
1787
buf+= description_event->common_header_len;
1788
binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET);
1789
memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
1791
// prevent overrun if log is corrupted on disk
1792
server_version[ST_SERVER_VER_LEN-1]= 0;
1793
created= uint4korr(buf+ST_CREATED_OFFSET);
1794
/* We use log_pos to mark if this was an artificial event or not */
1795
artificial_event= (log_pos == 0);
1796
dont_set_created= 1;
1801
Start_log_event_v3::write()
1804
bool Start_log_event_v3::write(IO_CACHE* file)
1806
char buff[START_V3_HEADER_LEN];
1807
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
1808
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
1809
if (!dont_set_created)
1810
created= when= get_time();
1811
int4store(buff + ST_CREATED_OFFSET,created);
1812
return (write_header(file, sizeof(buff)) ||
1813
my_b_safe_write(file, (unsigned char*) buff, sizeof(buff)));
1818
Start_log_event_v3::do_apply_event() .
1822
- To handle the case where the master died without having time to write
1823
DROP TEMPORARY Table, DO RELEASE_LOCK (prepared statements' deletion is
1824
TODO), we clean up all temporary tables that we got, if we are sure we
1828
- Remove all active user locks.
1829
Guilhem 2003-06: this is true but not urgent: the worst it can cause is
1830
the use of a bit of memory for a user lock which will not be used
1831
anymore. If the user lock is later used, the old one will be released. In
1832
other words, no deadlock problem.
1835
int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
1837
switch (binlog_version)
1842
This can either be 4.x (then a Start_log_event_v3 is only at master
1843
startup so we are sure the master has restarted and cleared his temp
1844
tables; the event always has 'created'>0) or 5.0 (then we have to test
1849
close_temporary_tables(session);
1850
cleanup_load_tmpdir();
1855
Now the older formats; in that case load_tmpdir is cleaned up by the I/O
1859
if (strncmp(rli->relay_log.description_event_for_exec->server_version,
1860
"3.23.57",7) >= 0 && created)
1863
Can distinguish, based on the value of 'created': this event was
1864
generated at master startup.
1866
close_temporary_tables(session);
1869
Otherwise, can't distinguish a Start_log_event generated at
1870
master startup and one generated by master FLUSH LOGS, so cannot
1871
be sure temp tables have to be dropped. So do nothing.
1875
/* this case is impossible */
1881
/***************************************************************************
1882
Format_description_log_event methods
1883
****************************************************************************/
1886
Format_description_log_event 1st ctor.
1888
Ctor. Can be used to create the event to write to the binary log (when the
1889
server starts or when FLUSH LOGS), or to create artificial events to parse
1890
binlogs from MySQL 3.23 or 4.x.
1891
When in a client, only the 2nd use is possible.
1893
@param binlog_version the binlog version for which we want to build
1894
an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x
1895
x>=2 and 4.1) or 4 (MySQL 5.0). Note that the
1896
old 4.0 (binlog version 2) is not supported;
1897
it should not be used for replication with
1901
Format_description_log_event::
1902
Format_description_log_event(uint8_t binlog_ver, const char*)
1903
:Start_log_event_v3(), event_type_permutation(0)
1905
binlog_version= binlog_ver;
1906
switch (binlog_ver) {
1907
case 4: /* MySQL 5.0 */
1908
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
1909
common_header_len= LOG_EVENT_HEADER_LEN;
1910
number_of_event_types= LOG_EVENT_TYPES;
1911
/* we'll catch malloc() error in is_valid() */
1912
post_header_len=(uint8_t*) malloc(number_of_event_types*sizeof(uint8_t));
1913
memset(post_header_len, 0, number_of_event_types*sizeof(uint8_t));
1915
This long list of assignments is not beautiful, but I see no way to
1916
make it nicer, as the right members are #defines, not array members, so
1917
it's impossible to write a loop.
1919
if (post_header_len)
1921
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
1922
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN;
1923
post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
1924
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
1925
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
1926
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
1927
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
1928
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
1929
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
1930
post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
1931
post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN;
1932
post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
1933
post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
1934
post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
1935
post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1];
1936
post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
1937
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
1938
post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
1942
default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
1945
calc_server_version_split();
1950
The problem with this constructor is that the fixed header may have a
1951
length different from this version, but we don't know this length as we
1952
have not read the Format_description_log_event which says it, yet. This
1953
length is in the post-header of the event, but we don't know where the
1956
So this type of event HAS to:
1957
- either have the header's length at the beginning (in the header, at a
1958
fixed position which will never be changed), not in the post-header. That
1959
would make the header be "shifted" compared to other events.
1960
- or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future
1961
versions, so that we know for sure.
1963
I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because
1964
it is sent before Format_description_log_event).
1967
Format_description_log_event::
1968
Format_description_log_event(const char* buf,
1971
Format_description_log_event*
1973
:Start_log_event_v3(buf, description_event), event_type_permutation(0)
1975
buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
1976
if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
1977
return; /* sanity check */
1978
number_of_event_types=
1979
event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1);
1980
post_header_len= (uint8_t*) malloc(number_of_event_types*
1981
sizeof(*post_header_len));
1982
/* If alloc fails, we'll detect it in is_valid() */
1983
if (post_header_len != NULL)
1984
memcpy(post_header_len, buf+ST_COMMON_HEADER_LEN_OFFSET+1,
1985
number_of_event_types* sizeof(*post_header_len));
1986
calc_server_version_split();
1989
In some previous versions, the events were given other event type
1990
id numbers than in the present version. When replicating from such
1991
a version, we therefore set up an array that maps those id numbers
1992
to the id numbers of the present server.
1994
If post_header_len is null, it means malloc failed, and is_valid
1995
will fail, so there is no need to do anything.
1997
The trees in which events have wrong id's are:
1999
mysql-5.1-wl1012.old mysql-5.1-wl2325-5.0-drop6p13-alpha
2000
mysql-5.1-wl2325-5.0-drop6 mysql-5.1-wl2325-5.0
2001
mysql-5.1-wl2325-no-dd
2003
(this was found by grepping for two lines in sequence where the
2004
first matches "FORMAT_DESCRIPTION_EVENT," and the second matches
2005
"TABLE_MAP_EVENT," in log_event.h in all trees)
2007
In these trees, the following server_versions existed since
2008
TABLE_MAP_EVENT was introduced:
2010
5.1.1-a_drop5p3 5.1.1-a_drop5p4 5.1.1-alpha
2011
5.1.2-a_drop5p10 5.1.2-a_drop5p11 5.1.2-a_drop5p12
2012
5.1.2-a_drop5p13 5.1.2-a_drop5p14 5.1.2-a_drop5p15
2013
5.1.2-a_drop5p16 5.1.2-a_drop5p16b 5.1.2-a_drop5p16c
2014
5.1.2-a_drop5p17 5.1.2-a_drop5p4 5.1.2-a_drop5p5
2015
5.1.2-a_drop5p6 5.1.2-a_drop5p7 5.1.2-a_drop5p8
2016
5.1.2-a_drop5p9 5.1.3-a_drop5p17 5.1.3-a_drop5p17b
2017
5.1.3-a_drop5p17c 5.1.4-a_drop5p18 5.1.4-a_drop5p19
2018
5.1.4-a_drop5p20 5.1.4-a_drop6p0 5.1.4-a_drop6p1
2019
5.1.4-a_drop6p2 5.1.5-a_drop5p20 5.2.0-a_drop6p3
2020
5.2.0-a_drop6p4 5.2.0-a_drop6p5 5.2.0-a_drop6p6
2021
5.2.1-a_drop6p10 5.2.1-a_drop6p11 5.2.1-a_drop6p12
2022
5.2.1-a_drop6p6 5.2.1-a_drop6p7 5.2.1-a_drop6p8
2023
5.2.2-a_drop6p13 5.2.2-a_drop6p13-alpha 5.2.2-a_drop6p13b
2026
(this was found by grepping for "mysql," in all historical
2027
versions of configure.in in the trees listed above).
2029
There are 5.1.1-alpha versions that use the new event id's, so we
2030
do not test that version string. So replication from 5.1.1-alpha
2031
with the other event id's to a new version does not work.
2032
Moreover, we can safely ignore the part after drop[56]. This
2033
allows us to simplify the big list above to the following regexes:
2035
5\.1\.[1-5]-a_drop5.*
2037
5\.2\.[0-2]-a_drop6.*
2039
This is what we test for in the 'if' below.
2041
if (post_header_len &&
2042
server_version[0] == '5' && server_version[1] == '.' &&
2043
server_version[3] == '.' &&
2044
strncmp(server_version + 5, "-a_drop", 7) == 0 &&
2045
((server_version[2] == '1' &&
2046
server_version[4] >= '1' && server_version[4] <= '5' &&
2047
server_version[12] == '5') ||
2048
(server_version[2] == '1' &&
2049
server_version[4] == '4' &&
2050
server_version[12] == '6') ||
2051
(server_version[2] == '2' &&
2052
server_version[4] >= '0' && server_version[4] <= '2' &&
2053
server_version[12] == '6')))
2055
if (number_of_event_types != 22)
2057
/* this makes is_valid() return false. */
2058
free(post_header_len);
2059
post_header_len= NULL;
2062
static const uint8_t perm[23]=
2064
UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
2065
LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
2066
APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
2068
FORMAT_DESCRIPTION_EVENT,
2071
BEGIN_LOAD_QUERY_EVENT,
2072
EXECUTE_LOAD_QUERY_EVENT,
2074
event_type_permutation= perm;
2076
Since we use (permuted) event id's to index the post_header_len
2077
array, we need to permute the post_header_len array too.
2079
uint8_t post_header_len_temp[23];
2080
for (int i= 1; i < 23; i++)
2081
post_header_len_temp[perm[i] - 1]= post_header_len[i - 1];
2082
for (int i= 0; i < 22; i++)
2083
post_header_len[i] = post_header_len_temp[i];
2088
bool Format_description_log_event::write(IO_CACHE* file)
2091
We don't call Start_log_event_v3::write() because this would make 2
2094
unsigned char buff[FORMAT_DESCRIPTION_HEADER_LEN];
2095
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
2096
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
2097
if (!dont_set_created)
2098
created= when= get_time();
2099
int4store(buff + ST_CREATED_OFFSET,created);
2100
buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
2101
memcpy(buff+ST_COMMON_HEADER_LEN_OFFSET+1, post_header_len,
2103
return (write_header(file, sizeof(buff)) ||
2104
my_b_safe_write(file, buff, sizeof(buff)));
2108
int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
2111
As a transaction NEVER spans on 2 or more binlogs:
2112
if we have an active transaction at this point, the master died
2113
while writing the transaction to the binary log, i.e. while
2114
flushing the binlog cache to the binlog. XA guarantees that master has
2115
rolled back. So we roll back.
2116
Note: this event could be sent by the master to inform us of the
2117
format of its binlog; in other words maybe it is not at its
2118
original place when it comes to us; we'll know this by checking
2119
log_pos ("artificial" events have log_pos == 0).
2121
if (!artificial_event && created && session->transaction.all.ha_list)
2123
/* This is not an error (XA is safe), just an information */
2124
rli->report(INFORMATION_LEVEL, 0,
2125
_("Rolling back unfinished transaction (no COMMIT "
2126
"or ROLLBACK in relay log). A probable cause is that "
2127
"the master died while writing the transaction to "
2128
"its binary log, thus rolled back too."));
2129
const_cast<Relay_log_info*>(rli)->cleanup_context(session, 1);
2132
If this event comes from ourselves, there is no cleaning task to
2133
perform, we don't call Start_log_event_v3::do_apply_event()
2134
(this was just to update the log's description event).
2136
if (server_id != ::server_id)
2139
If the event was not requested by the slave i.e. the master sent
2140
it while the slave asked for a position >4, the event will make
2141
rli->group_master_log_pos advance. Say that the slave asked for
2142
position 1000, and the Format_desc event's end is 96. Then in
2143
the beginning of replication rli->group_master_log_pos will be
2144
0, then 96, then jump to first really asked event (which is
2145
>96). So this is ok.
2147
return(Start_log_event_v3::do_apply_event(rli));
2152
int Format_description_log_event::do_update_pos(Relay_log_info *rli)
2154
/* save the information describing this binlog */
2155
delete rli->relay_log.description_event_for_exec;
2156
rli->relay_log.description_event_for_exec= this;
2158
if (server_id == ::server_id)
2161
We only increase the relay log position if we are skipping
2162
events and do not touch any group_* variables, nor flush the
2163
relay log info. If there is a crash, we will have to re-skip
2164
the events again, but that is a minor issue.
2166
If we do not skip stepping the group log position (and the
2167
server id was changed when restarting the server), it might well
2168
be that we start executing at a position that is invalid, e.g.,
2169
at a Rows_log_event or a Query_log_event preceeded by a
2170
Intvar_log_event instead of starting at a Table_map_log_event or
2171
the Intvar_log_event respectively.
2173
rli->inc_event_relay_log_pos();
2178
return Log_event::do_update_pos(rli);
2182
Log_event::enum_skip_reason
2183
Format_description_log_event::do_shall_skip(Relay_log_info *)
2185
return Log_event::EVENT_SKIP_NOT;
2190
Splits the event's 'server_version' string into three numeric pieces stored
2191
into 'server_version_split':
2192
X.Y.Zabc (X,Y,Z numbers, a not a digit) -> {X,Y,Z}
2195
'server_version_split' is then used for lookups to find if the server which
2196
created this event has some known bug.
2198
void Format_description_log_event::calc_server_version_split()
2200
char *p= server_version, *r;
2202
for (uint32_t i= 0; i<=2; i++)
2204
number= strtoul(p, &r, 10);
2205
server_version_split[i]= (unsigned char)number;
2206
assert(number < 256); // fit in unsigned char
2208
assert(!((i == 0) && (*r != '.'))); // should be true in practice
2210
p++; // skip the dot
2215
/**************************************************************************
2216
Load_log_event methods
2217
General note about Load_log_event: the binlogging of LOAD DATA INFILE is
2218
going to be changed in 5.0 (or maybe in 5.1; not decided yet).
2219
However, the 5.0 slave could still have to read such events (from a 4.x
2220
master), convert them (which just means maybe expand the header, when 5.0
2221
servers have a UID in events) (remember that whatever is after the header
2222
will be like in 4.x, as this event's format is not modified in 5.0 as we
2223
will use new types of events to log the new LOAD DATA INFILE features).
2224
To be able to read/convert, we just need to not assume that the common
2225
header is of length LOG_EVENT_HEADER_LEN (we must use the description
2227
Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE
2228
between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the
2229
positions displayed in SHOW SLAVE STATUS then are fine too).
2230
**************************************************************************/
2233
Load_log_event::pack_info()
2236
uint32_t Load_log_event::get_query_buffer_length()
2239
5 + db_len + 3 + // "use DB; "
2240
18 + fname_len + 2 + // "LOAD DATA INFILE 'file''"
2242
9 + // " REPLACE or IGNORE "
2243
13 + table_name_len*2 + // "INTO Table `table`"
2244
21 + sql_ex.field_term_len*4 + 2 + // " FIELDS TERMINATED BY 'str'"
2245
23 + sql_ex.enclosed_len*4 + 2 + // " OPTIONALLY ENCLOSED BY 'str'"
2246
12 + sql_ex.escaped_len*4 + 2 + // " ESCAPED BY 'str'"
2247
21 + sql_ex.line_term_len*4 + 2 + // " LINES TERMINATED BY 'str'"
2248
19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'"
2249
15 + 22 + // " IGNORE xxx LINES"
2250
3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)"
2254
void Load_log_event::print_query(bool need_db, char *buf,
2255
char **end, char **fn_start, char **fn_end)
2259
if (need_db && db && db_len)
2261
pos= strcpy(pos, "use `")+5;
2262
memcpy(pos, db, db_len);
2263
pos= strcpy(pos+db_len, "`; ")+3;
2266
pos= strcpy(pos, "LOAD DATA ")+10;
2271
if (check_fname_outside_temp_buf())
2272
pos= strcpy(pos, "LOCAL ")+6;
2273
pos= strcpy(pos, "INFILE '")+8;
2274
memcpy(pos, fname, fname_len);
2275
pos= strcpy(pos+fname_len, "' ")+2;
2277
if (sql_ex.opt_flags & REPLACE_FLAG)
2278
pos= strcpy(pos, " REPLACE ")+9;
2279
else if (sql_ex.opt_flags & IGNORE_FLAG)
2280
pos= strcpy(pos, " IGNORE ")+8;
2282
pos= strcpy(pos ,"INTO")+4;
2287
pos= strcpy(pos ," Table `")+8;
2288
memcpy(pos, table_name, table_name_len);
2289
pos+= table_name_len;
2291
/* We have to create all optinal fields as the default is not empty */
2292
pos= strcpy(pos, "` FIELDS TERMINATED BY ")+23;
2293
pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len);
2294
if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
2295
pos= strcpy(pos, " OPTIONALLY ")+12;
2296
pos= strcpy(pos, " ENCLOSED BY ")+13;
2297
pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len);
2299
pos= strcpy(pos, " ESCAPED BY ")+12;
2300
pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
2302
pos= strcpy(pos, " LINES TERMINATED BY ")+21;
2303
pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
2304
if (sql_ex.line_start_len)
2306
pos= strcpy(pos, " STARTING BY ")+13;
2307
pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
2310
if ((long) skip_lines > 0)
2312
pos= strcpy(pos, " IGNORE ")+8;
2313
pos= int64_t10_to_str((int64_t) skip_lines, pos, 10);
2314
pos= strcpy(pos," LINES ")+7;
2320
const char *field= fields;
2321
pos= strcpy(pos, " (")+2;
2322
for (i = 0; i < num_fields; i++)
2329
memcpy(pos, field, field_lens[i]);
2330
pos+= field_lens[i];
2331
field+= field_lens[i] + 1;
2340
void Load_log_event::pack_info(Protocol *protocol)
2344
if (!(buf= (char*) malloc(get_query_buffer_length())))
2346
print_query(true, buf, &end, 0, 0);
2347
protocol->store(buf, end-buf, &my_charset_bin);
2353
Load_log_event::write_data_header()
2356
bool Load_log_event::write_data_header(IO_CACHE* file)
2358
char buf[LOAD_HEADER_LEN];
2359
int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
2360
int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
2361
int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
2362
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
2363
buf[L_DB_LEN_OFFSET] = (char)db_len;
2364
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
2365
return my_b_safe_write(file, (unsigned char*)buf, LOAD_HEADER_LEN) != 0;
2370
Load_log_event::write_data_body()
2373
bool Load_log_event::write_data_body(IO_CACHE* file)
2375
if (sql_ex.write_data(file))
2377
if (num_fields && fields && field_lens)
2379
if (my_b_safe_write(file, (unsigned char*)field_lens, num_fields) ||
2380
my_b_safe_write(file, (unsigned char*)fields, field_block_len))
2383
return (my_b_safe_write(file, (unsigned char*)table_name, table_name_len + 1) ||
2384
my_b_safe_write(file, (unsigned char*)db, db_len + 1) ||
2385
my_b_safe_write(file, (unsigned char*)fname, fname_len));
2390
Load_log_event::Load_log_event()
2393
Load_log_event::Load_log_event(Session *session_arg, sql_exchange *ex,
2394
const char *db_arg, const char *table_name_arg,
2395
List<Item> &fields_arg,
2396
enum enum_duplicates handle_dup,
2397
bool ignore, bool using_trans)
2398
:Log_event(session_arg,
2399
session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
2401
thread_id(session_arg->thread_id),
2402
slave_proxy_id(session_arg->variables.pseudo_thread_id),
2403
num_fields(0),fields(0),
2404
field_lens(0),field_block_len(0),
2405
table_name(table_name_arg ? table_name_arg : ""),
2406
db(db_arg), fname(ex->file_name), local_fname(false)
2410
exec_time = (ulong) (end_time - session_arg->start_time);
2411
/* db can never be a zero pointer in 4.0 */
2412
db_len = (uint32_t) strlen(db);
2413
table_name_len = (uint32_t) strlen(table_name);
2414
fname_len = (fname) ? (uint) strlen(fname) : 0;
2415
sql_ex.field_term = (char*) ex->field_term->ptr();
2416
sql_ex.field_term_len = (uint8_t) ex->field_term->length();
2417
sql_ex.enclosed = (char*) ex->enclosed->ptr();
2418
sql_ex.enclosed_len = (uint8_t) ex->enclosed->length();
2419
sql_ex.line_term = (char*) ex->line_term->ptr();
2420
sql_ex.line_term_len = (uint8_t) ex->line_term->length();
2421
sql_ex.line_start = (char*) ex->line_start->ptr();
2422
sql_ex.line_start_len = (uint8_t) ex->line_start->length();
2423
sql_ex.escaped = (char*) ex->escaped->ptr();
2424
sql_ex.escaped_len = (uint8_t) ex->escaped->length();
2425
sql_ex.opt_flags = 0;
2426
sql_ex.cached_new_format = -1;
2429
sql_ex.opt_flags|= DUMPFILE_FLAG;
2430
if (ex->opt_enclosed)
2431
sql_ex.opt_flags|= OPT_ENCLOSED_FLAG;
2433
sql_ex.empty_flags= 0;
2435
switch (handle_dup) {
2437
sql_ex.opt_flags|= REPLACE_FLAG;
2439
case DUP_UPDATE: // Impossible here
2444
sql_ex.opt_flags|= IGNORE_FLAG;
2446
if (!ex->field_term->length())
2447
sql_ex.empty_flags |= FIELD_TERM_EMPTY;
2448
if (!ex->enclosed->length())
2449
sql_ex.empty_flags |= ENCLOSED_EMPTY;
2450
if (!ex->line_term->length())
2451
sql_ex.empty_flags |= LINE_TERM_EMPTY;
2452
if (!ex->line_start->length())
2453
sql_ex.empty_flags |= LINE_START_EMPTY;
2454
if (!ex->escaped->length())
2455
sql_ex.empty_flags |= ESCAPED_EMPTY;
2457
skip_lines = ex->skip_lines;
2459
List_iterator<Item> li(fields_arg);
2460
field_lens_buf.length(0);
2461
fields_buf.length(0);
2463
while ((item = li++))
2466
unsigned char len = (unsigned char) strlen(item->name);
2467
field_block_len += len + 1;
2468
fields_buf.append(item->name, len + 1);
2469
field_lens_buf.append((char*)&len, 1);
2472
field_lens = (const unsigned char*)field_lens_buf.ptr();
2473
fields = fields_buf.ptr();
2479
The caller must do buf[event_len] = 0 before he starts using the
2482
Load_log_event::Load_log_event(const char *buf, uint32_t event_len,
2483
const Format_description_log_event *description_event)
2484
:Log_event(buf, description_event), num_fields(0), fields(0),
2485
field_lens(0),field_block_len(0),
2486
table_name(0), db(0), fname(0), local_fname(false)
2489
I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0,
2490
4.0->5.0 and 5.0->5.0 and it works.
2493
copy_log_event(buf, event_len,
2494
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
2496
description_event->common_header_len :
2497
LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
2499
/* otherwise it's a derived class, will call copy_log_event() itself */
2505
Load_log_event::copy_log_event()
2508
int Load_log_event::copy_log_event(const char *buf, ulong event_len,
2510
const Format_description_log_event *description_event)
2513
char* buf_end = (char*)buf + event_len;
2514
/* this is the beginning of the post-header */
2515
const char* data_head = buf + description_event->common_header_len;
2516
slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
2517
exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
2518
skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
2519
table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
2520
db_len = (uint)data_head[L_DB_LEN_OFFSET];
2521
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
2523
if ((int) event_len < body_offset)
2526
Sql_ex.init() on success returns the pointer to the first byte after
2527
the sql_ex structure, which is the start of field lengths array.
2529
if (!(field_lens= (unsigned char*)sql_ex.init((char*)buf + body_offset,
2531
buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
2534
data_len = event_len - body_offset;
2535
if (num_fields > data_len) // simple sanity check against corruption
2537
for (uint32_t i = 0; i < num_fields; i++)
2538
field_block_len += (uint)field_lens[i] + 1;
2540
fields = (char*)field_lens + num_fields;
2541
table_name = fields + field_block_len;
2542
db = table_name + table_name_len + 1;
2543
fname = db + db_len + 1;
2544
fname_len = strlen(fname);
2545
// null termination is accomplished by the caller doing buf[event_len]=0
2552
Load_log_event::set_fields()
2555
This function can not use the member variable
2556
for the database, since LOAD DATA INFILE on the slave
2557
can be for a different database than the current one.
2558
This is the reason for the affected_db argument to this method.
2561
void Load_log_event::set_fields(const char* affected_db,
2562
List<Item> &field_list,
2563
Name_resolution_context *context)
2566
const char* field = fields;
2567
for (i= 0; i < num_fields; i++)
2569
field_list.push_back(new Item_field(context,
2570
affected_db, table_name, field));
2571
field+= field_lens[i] + 1;
2577
Does the data loading job when executing a LOAD DATA on the slave.
2581
@param use_rli_only_for_errors If set to 1, rli is provided to
2582
Load_log_event::exec_event only for this
2583
function to have RPL_LOG_NAME and
2584
rli->last_slave_error, both being used by
2585
error reports. rli's position advancing
2586
is skipped (done by the caller which is
2587
Execute_load_log_event::exec_event).
2588
If set to 0, rli is provided for full use,
2589
i.e. for error reports and position
2593
fix this; this can be done by testing rules in
2594
Create_file_log_event::exec_event() and then discarding Append_block and
2597
this is a bug - this needs to be moved to the I/O thread
2605
int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
2606
bool use_rli_only_for_errors)
2608
Query_id &query_id= Query_id::get_query_id();
2609
session->set_db(db, strlen(db));
2610
assert(session->query == 0);
2611
session->query_length= 0; // Should not be needed
2612
session->is_slave_error= 0;
2613
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
2615
/* see Query_log_event::do_apply_event() and BUG#13360 */
2616
assert(!rli->m_table_map.count());
2618
Usually lex_start() is called by mysql_parse(), but we need it here
2619
as the present method does not call mysql_parse().
2622
mysql_reset_session_for_next_command(session);
2624
if (!use_rli_only_for_errors)
2627
Saved for InnoDB, see comment in
2628
Query_log_event::do_apply_event()
2630
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
2634
We test replicate_*_db rules. Note that we have already prepared
2635
the file to load, even if we are going to ignore and delete it
2636
now. So it is possible that we did a lot of disk writes for
2637
nothing. In other words, a big LOAD DATA INFILE on the master will
2638
still consume a lot of space on the slave (space in the relay log
2639
+ space of temp files: twice the space of the file to load...)
2640
even if it will finally be ignored. TODO: fix this; this can be
2641
done by testing rules in Create_file_log_event::do_apply_event()
2642
and then discarding Append_block and al. Another way is do the
2643
filtering in the I/O thread (more efficient: no disk writes at
2647
Note: We do not need to execute reset_one_shot_variables() if this
2649
Reason: The db stored in binlog events is the same for SET and for
2650
its companion query. If the SET is ignored because of
2651
db_ok(), the companion query will also be ignored, and if
2652
the companion query is ignored in the db_ok() test of
2653
::do_apply_event(), then the companion SET also have so
2654
we don't need to reset_one_shot_variables().
2658
session->set_time((time_t)when);
2659
session->query_id = query_id.next();
2661
Initing session->row_count is not necessary in theory as this variable has no
2662
influence in the case of the slave SQL thread (it is used to generate a
2663
"data truncated" warning but which is absorbed and never gets to the
2664
error log); still we init it to avoid a Valgrind message.
2666
drizzle_reset_errors(session, 0);
2669
memset(&tables, 0, sizeof(tables));
2670
tables.db= session->strmake(session->db, session->db_length);
2671
tables.alias = tables.table_name = (char*) table_name;
2672
tables.lock_type = TL_WRITE;
2675
// the table will be opened in mysql_load
2679
enum enum_duplicates handle_dup;
2681
char *load_data_query;
2684
Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST
2685
and written to slave's binlog if binlogging is on.
2687
if (!(load_data_query= (char *)session->alloc(get_query_buffer_length() + 1)))
2690
This will set session->fatal_error in case of OOM. So we surely will notice
2691
that something is wrong.
2696
print_query(false, load_data_query, &end, (char **)&session->lex->fname_start,
2697
(char **)&session->lex->fname_end);
2699
session->query_length= end - load_data_query;
2700
session->query= load_data_query;
2702
if (sql_ex.opt_flags & REPLACE_FLAG)
2704
handle_dup= DUP_REPLACE;
2706
else if (sql_ex.opt_flags & IGNORE_FLAG)
2709
handle_dup= DUP_ERROR;
2714
When replication is running fine, if it was DUP_ERROR on the
2715
master then we could choose IGNORE here, because if DUP_ERROR
2716
suceeded on master, and data is identical on the master and slave,
2717
then there should be no uniqueness errors on slave, so IGNORE is
2718
the same as DUP_ERROR. But in the unlikely case of uniqueness errors
2719
(because the data on the master and slave happen to be different
2720
(user error or bug), we want LOAD DATA to print an error message on
2721
the slave to discover the problem.
2723
If reading from net (a 3.23 master), mysql_load() will change this
2726
handle_dup= DUP_ERROR;
2729
We need to set session->lex->sql_command and session->lex->duplicates
2730
since InnoDB tests these variables to decide if this is a LOAD
2731
DATA ... REPLACE INTO ... statement even though mysql_parse()
2732
is not called. This is not needed in 5.0 since there the LOAD
2733
DATA ... statement is replicated using mysql_parse(), which
2734
sets the session->lex fields correctly.
2736
session->lex->sql_command= SQLCOM_LOAD;
2737
session->lex->duplicates= handle_dup;
2739
sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
2740
String field_term(sql_ex.field_term,sql_ex.field_term_len,&my_charset_utf8_general_ci);
2741
String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,&my_charset_utf8_general_ci);
2742
String line_term(sql_ex.line_term,sql_ex.line_term_len,&my_charset_utf8_general_ci);
2743
String line_start(sql_ex.line_start,sql_ex.line_start_len,&my_charset_utf8_general_ci);
2744
String escaped(sql_ex.escaped,sql_ex.escaped_len, &my_charset_utf8_general_ci);
2745
ex.field_term= &field_term;
2746
ex.enclosed= &enclosed;
2747
ex.line_term= &line_term;
2748
ex.line_start= &line_start;
2749
ex.escaped= &escaped;
2751
ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
2752
if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
2753
ex.field_term->length(0);
2755
ex.skip_lines = skip_lines;
2756
List<Item> field_list;
2757
session->lex->select_lex.context.resolve_in_table_list_only(&tables);
2758
set_fields(tables.db, field_list, &session->lex->select_lex.context);
2759
session->variables.pseudo_thread_id= thread_id;
2762
// mysql_load will use session->net to read the file
2763
session->net.vio = net->vio;
2765
Make sure the client does not get confused about the packet sequence
2767
session->net.pkt_nr = net->pkt_nr;
2770
It is safe to use tmp_list twice because we are not going to
2771
update it inside mysql_load().
2773
List<Item> tmp_list;
2774
if (mysql_load(session, &ex, &tables, field_list, tmp_list, tmp_list,
2775
handle_dup, ignore, net != 0))
2776
session->is_slave_error= 1;
2777
if (session->cuted_fields)
2779
/* log_pos is the position of the LOAD event in the master log */
2780
sql_print_warning(_("Slave: load data infile on table '%s' at "
2781
"log position %s in log '%s' produced %ld "
2782
"warning(s). Default database: '%s'"),
2784
llstr(log_pos,llbuff), RPL_LOG_NAME,
2785
(ulong) session->cuted_fields,
2786
print_slave_db_safe(session->db));
2789
net->pkt_nr= session->net.pkt_nr;
2795
We will just ask the master to send us /dev/null if we do not
2796
want to load the data.
2797
TODO: this a bug - needs to be done in I/O thread
2800
skip_load_data_infile(net);
2804
session->net.vio = 0;
2805
const char *remember_db= session->db;
2806
pthread_mutex_lock(&LOCK_thread_count);
2807
session->catalog= 0;
2808
session->set_db(NULL, 0); /* will free the current database */
2810
session->query_length= 0;
2811
pthread_mutex_unlock(&LOCK_thread_count);
2812
close_thread_tables(session);
2814
if (session->is_slave_error)
2816
/* this err/sql_errno code is copy-paste from net_send_error() */
2819
if (session->is_error())
2821
err= session->main_da.message();
2822
sql_errno= session->main_da.sql_errno();
2826
sql_errno=ER_UNKNOWN_ERROR;
2829
rli->report(ERROR_LEVEL, sql_errno,
2830
_("Error '%s' running LOAD DATA INFILE on table '%s'. "
2831
"Default database: '%s'"),
2832
err, (char*)table_name, print_slave_db_safe(remember_db));
2833
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
2836
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
2838
if (session->is_fatal_error)
2841
snprintf(buf, sizeof(buf),
2842
_("Running LOAD DATA INFILE on table '%-.64s'."
2843
" Default database: '%-.64s'"),
2845
print_slave_db_safe(remember_db));
2847
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2848
ER(ER_SLAVE_FATAL_ERROR), buf);
2852
return ( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) );
2856
/**************************************************************************
2857
Rotate_log_event methods
2858
**************************************************************************/
2861
Rotate_log_event::pack_info()
2864
void Rotate_log_event::pack_info(Protocol *protocol)
2866
char buf1[256], buf[22];
2867
String tmp(buf1, sizeof(buf1), &my_charset_utf8_general_ci);
2869
tmp.append(new_log_ident.c_str(), ident_len);
2870
tmp.append(STRING_WITH_LEN(";pos="));
2871
tmp.append(llstr(pos,buf));
2872
protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
2877
Rotate_log_event::Rotate_log_event() (2 constructors)
2881
Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
2882
uint32_t ident_len_arg, uint64_t pos_arg,
2884
:Log_event(), pos(pos_arg),
2885
ident_len(ident_len_arg
2887
: strlen(new_log_ident_arg)),
2890
new_log_ident.assign(new_log_ident_arg, ident_len);
2895
Rotate_log_event::Rotate_log_event(const char* buf, uint32_t event_len,
2896
const Format_description_log_event* description_event)
2897
:Log_event(buf, description_event), flags(DUP_NAME)
2899
// The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
2900
uint8_t header_size= description_event->common_header_len;
2901
uint8_t post_header_len= description_event->post_header_len[ROTATE_EVENT-1];
2902
uint32_t ident_offset;
2903
if (event_len < header_size)
2906
pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
2907
ident_len = (uint)(event_len -
2908
(header_size+post_header_len));
2909
ident_offset = post_header_len;
2910
set_if_smaller(ident_len,FN_REFLEN-1);
2911
new_log_ident.assign(buf + ident_offset, ident_len);
2917
Rotate_log_event::write()
2920
bool Rotate_log_event::write(IO_CACHE* file)
2922
char buf[ROTATE_HEADER_LEN];
2923
int8store(buf + R_POS_OFFSET, pos);
2924
return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
2925
my_b_safe_write(file, (unsigned char*)buf, ROTATE_HEADER_LEN) ||
2926
my_b_safe_write(file, (const unsigned char*)new_log_ident.c_str(),
2932
Got a rotate log event from the master.
2934
This is mainly used so that we can later figure out the logname and
2935
position for the master.
2937
We can't rotate the slave's BINlog as this will cause infinitive rotations
2938
in a A -> B -> A setup.
2939
The NOTES below is a wrong comment which will disappear when 4.1 is merged.
2944
int Rotate_log_event::do_update_pos(Relay_log_info *rli)
2946
pthread_mutex_lock(&rli->data_lock);
2947
rli->event_relay_log_pos= my_b_tell(rli->cur_log);
2949
If we are in a transaction or in a group: the only normal case is
2950
when the I/O thread was copying a big transaction, then it was
2951
stopped and restarted: we have this in the relay log:
2959
In that case, we don't want to touch the coordinates which
2960
correspond to the beginning of the transaction. Starting from
2961
5.0.0, there also are some rotates from the slave itself, in the
2962
relay log, which shall not change the group positions.
2964
if ((server_id != ::server_id || rli->replicate_same_server_id) &&
2965
!rli->is_in_group())
2967
rli->group_master_log_name.assign(new_log_ident);
2968
rli->notify_group_master_log_name_update();
2969
rli->group_master_log_pos= pos;
2970
rli->group_relay_log_name.assign(rli->event_relay_log_name);
2971
rli->notify_group_relay_log_name_update();
2972
rli->group_relay_log_pos= rli->event_relay_log_pos;
2974
Reset session->options and sql_mode etc, because this could be the signal of
2975
a master's downgrade from 5.0 to 4.0.
2976
However, no need to reset description_event_for_exec: indeed, if the next
2977
master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
2978
master is 4.0 then the events are in the slave's format (conversion).
2980
set_slave_thread_options(session);
2981
session->variables.auto_increment_increment=
2982
session->variables.auto_increment_offset= 1;
2984
pthread_mutex_unlock(&rli->data_lock);
2985
pthread_cond_broadcast(&rli->data_cond);
2986
flush_relay_log_info(rli);
2992
Log_event::enum_skip_reason
2993
Rotate_log_event::do_shall_skip(Relay_log_info *rli)
2995
enum_skip_reason reason= Log_event::do_shall_skip(rli);
2998
case Log_event::EVENT_SKIP_NOT:
2999
case Log_event::EVENT_SKIP_COUNT:
3000
return Log_event::EVENT_SKIP_NOT;
3002
case Log_event::EVENT_SKIP_IGNORE:
3003
return Log_event::EVENT_SKIP_IGNORE;
3006
return Log_event::EVENT_SKIP_NOT; // To keep compiler happy
3010
/**************************************************************************
3011
Xid_log_event methods
3012
**************************************************************************/
3014
void Xid_log_event::pack_info(Protocol *protocol)
3016
char buf[128], *pos;
3017
pos= strcpy(buf, "COMMIT /* xid=")+14;
3018
pos= int64_t10_to_str(xid, pos, 10);
3019
pos= strcpy(pos, " */")+3;
3020
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3025
It's ok not to use int8store here,
3026
as long as XID::set(uint64_t) and
3027
XID::get_my_xid doesn't do it either.
3028
We don't care about actual values of xids as long as
3029
identical numbers compare identically
3033
Xid_log_event(const char* buf,
3034
const Format_description_log_event *description_event)
3035
:Log_event(buf, description_event)
3037
buf+= description_event->common_header_len;
3038
memcpy(&xid, buf, sizeof(xid));
3042
bool Xid_log_event::write(IO_CACHE* file)
3044
return write_header(file, sizeof(xid)) ||
3045
my_b_safe_write(file, (unsigned char*) &xid, sizeof(xid));
3049
int Xid_log_event::do_apply_event(const Relay_log_info *)
3051
return end_trans(session, COMMIT);
3054
Log_event::enum_skip_reason
3055
Xid_log_event::do_shall_skip(Relay_log_info *rli)
3057
if (rli->slave_skip_counter > 0) {
3058
session->options&= ~OPTION_BEGIN;
3059
return(Log_event::EVENT_SKIP_COUNT);
3061
return(Log_event::do_shall_skip(rli));
3065
/**************************************************************************
3066
Slave_log_event methods
3067
**************************************************************************/
3069
void Slave_log_event::pack_info(Protocol *protocol)
3071
ostringstream stream;
3072
stream << "host=" << master_host << ",port=" << master_port;
3073
stream << ",log=" << master_log << ",pos=" << master_pos;
3075
protocol->store(stream.str().c_str(), stream.str().length(),
3082
re-write this better without holding both locks at the same time
3084
Slave_log_event::Slave_log_event(Session* session_arg,
3085
Relay_log_info* rli)
3086
:Log_event(session_arg, 0, 0) , mem_pool(0), master_host(0)
3088
if (!rli->inited) // QQ When can this happen ?
3091
Master_info* mi = rli->mi;
3092
// TODO: re-write this better without holding both locks at the same time
3093
pthread_mutex_lock(&mi->data_lock);
3094
pthread_mutex_lock(&rli->data_lock);
3095
// on OOM, just do not initialize the structure and print the error
3096
if ((mem_pool = (char*)malloc(get_data_size() + 1)))
3098
master_host.assign(mi->getHostname());
3099
master_log.assign(rli->group_master_log_name);
3100
master_port = mi->getPort();
3101
master_pos = rli->group_master_log_pos;
3104
sql_print_error(_("Out of memory while recording slave event"));
3105
pthread_mutex_unlock(&rli->data_lock);
3106
pthread_mutex_unlock(&mi->data_lock);
3111
Slave_log_event::~Slave_log_event()
3117
int Slave_log_event::get_data_size()
3119
return master_host.length() + master_log.length() + 1 + SL_MASTER_HOST_OFFSET;
3123
bool Slave_log_event::write(IO_CACHE* file)
3125
ulong event_length= get_data_size();
3126
int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
3127
int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
3128
// log and host are already there
3130
return (write_header(file, event_length) ||
3131
my_b_safe_write(file, (unsigned char*) mem_pool, event_length));
3135
void Slave_log_event::init_from_mem_pool()
3137
master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
3138
master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
3140
/* Assign these correctly */
3141
master_host.assign(mem_pool + SL_MASTER_HOST_OFFSET);
3142
master_log.assign();
3147
int Slave_log_event::do_apply_event(const Relay_log_info *)
3149
if (drizzle_bin_log.is_open())
3150
drizzle_bin_log.write(this);
3155
/**************************************************************************
3156
Stop_log_event methods
3157
**************************************************************************/
3160
The master stopped. We used to clean up all temporary tables but
3161
this is useless as, as the master has shut down properly, it has
3162
written all DROP TEMPORARY Table (prepared statements' deletion is
3163
TODO only when we binlog prep stmts). We used to clean up
3164
slave_load_tmpdir, but this is useless as it has been cleared at the
3165
end of LOAD DATA INFILE. So we have nothing to do here. The place
3166
were we must do this cleaning is in
3167
Start_log_event_v3::do_apply_event(), not here. Because if we come
3168
here, the master was sane.
3170
int Stop_log_event::do_update_pos(Relay_log_info *rli)
3173
We do not want to update master_log pos because we get a rotate event
3174
before stop, so by now group_master_log_name is set to the next log.
3175
If we updated it, we will have incorrect master coordinates and this
3176
could give false triggers in MASTER_POS_WAIT() that we have reached
3177
the target position when in fact we have not.
3179
if (session->options & OPTION_BEGIN)
3180
rli->inc_event_relay_log_pos();
3183
rli->inc_group_relay_log_pos(0);
3184
flush_relay_log_info(rli);
3190
/**************************************************************************
3191
Create_file_log_event methods
3192
**************************************************************************/
3195
Create_file_log_event ctor
3198
Create_file_log_event::
3199
Create_file_log_event(Session* session_arg, sql_exchange* ex,
3200
const char* db_arg, const char* table_name_arg,
3201
List<Item>& fields_arg, enum enum_duplicates handle_dup,
3203
unsigned char* block_arg, uint32_t block_len_arg, bool using_trans)
3204
:Load_log_event(session_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
3206
fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
3207
file_id(session_arg->file_id = drizzle_bin_log.next_file_id())
3209
sql_ex.force_new_format();
3215
Create_file_log_event::write_data_body()
3218
bool Create_file_log_event::write_data_body(IO_CACHE* file)
3221
if ((res= Load_log_event::write_data_body(file)) || fake_base)
3223
return (my_b_safe_write(file, (unsigned char*) "", 1) ||
3224
my_b_safe_write(file, (unsigned char*) block, block_len));
3229
Create_file_log_event::write_data_header()
3232
bool Create_file_log_event::write_data_header(IO_CACHE* file)
3235
unsigned char buf[CREATE_FILE_HEADER_LEN];
3236
if ((res= Load_log_event::write_data_header(file)) || fake_base)
3238
int4store(buf + CF_FILE_ID_OFFSET, file_id);
3239
return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
3244
Create_file_log_event::write_base()
3247
bool Create_file_log_event::write_base(IO_CACHE* file)
3250
fake_base= 1; // pretend we are Load event
3257
Create_file_log_event ctor
3260
Create_file_log_event::Create_file_log_event(const char* buf, uint32_t len,
3261
const Format_description_log_event* description_event)
3262
:Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0)
3264
uint32_t block_offset;
3265
uint32_t header_len= description_event->common_header_len;
3266
uint8_t load_header_len= description_event->post_header_len[LOAD_EVENT-1];
3267
uint8_t create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
3268
if (!(event_buf= (const char*)malloc(len)) ||
3269
memcpy((char *)event_buf, buf, len) ||
3270
copy_log_event(event_buf,len,
3271
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
3272
load_header_len + header_len :
3273
(fake_base ? (header_len+load_header_len) :
3274
(header_len+load_header_len) +
3275
create_file_header_len)),
3278
if (description_event->binlog_version!=1)
3280
file_id= uint4korr(buf +
3282
load_header_len + CF_FILE_ID_OFFSET);
3284
Note that it's ok to use get_data_size() below, because it is computed
3285
with values we have already read from this event (because we called
3286
copy_log_event()); we are not using slave's format info to decode
3287
master's format, we are really using master's format info.
3288
Anyway, both formats should be identical (except the common_header_len)
3289
as these Load events are not changed between 4.0 and 5.0 (as logging of
3290
LOAD DATA INFILE does not use Load_log_event in 5.0).
3292
The + 1 is for \0 terminating fname
3294
block_offset= (description_event->common_header_len +
3295
Load_log_event::get_data_size() +
3296
create_file_header_len + 1);
3297
if (len < block_offset)
3299
block = (unsigned char*)buf + block_offset;
3300
block_len = len - block_offset;
3304
sql_ex.force_new_format();
3305
inited_from_old = 1;
3312
Create_file_log_event::pack_info()
3315
void Create_file_log_event::pack_info(Protocol *protocol)
3317
char buf[NAME_LEN*2 + 30 + 21*2], *pos;
3318
pos= strcpy(buf, "db=")+3;
3319
memcpy(pos, db, db_len);
3320
pos= strcpy(pos + db_len, ";table=")+7;
3321
memcpy(pos, table_name, table_name_len);
3322
pos= strcpy(pos + table_name_len, ";file_id=")+9;
3323
pos= int10_to_str((long) file_id, pos, 10);
3324
pos= strcpy(pos, ";block_len=")+11;
3325
pos= int10_to_str((long) block_len, pos, 10);
3326
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3331
Create_file_log_event::do_apply_event()
3334
int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
3336
char proc_info[17+FN_REFLEN+10], *fname_buf;
3342
memset(&file, 0, sizeof(file));
3343
fname_buf= strcpy(proc_info, "Making temp file ")+17;
3344
ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
3345
session->set_proc_info(proc_info);
3346
my_delete(fname_buf, MYF(0)); // old copy may exist already
3347
if ((fd= my_create(fname_buf, CREATE_MODE,
3349
MYF(MY_WME))) < 0 ||
3350
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
3351
MYF(MY_WME|MY_NABP)))
3353
rli->report(ERROR_LEVEL, my_errno,
3354
_("Error in Create_file event: could not open file '%s'"),
3359
// a trick to avoid allocating another buffer
3361
fname_len= (uint) ((strcpy(ext, ".data") + 5) - fname);
3362
if (write_base(&file))
3364
strcpy(ext, ".info"); // to have it right in the error message
3365
rli->report(ERROR_LEVEL, my_errno,
3366
_("Error in Create_file event: could not write to file '%s'"),
3370
end_io_cache(&file);
3371
my_close(fd, MYF(0));
3373
// fname_buf now already has .data, not .info, because we did our trick
3374
my_delete(fname_buf, MYF(0)); // old copy may exist already
3375
if ((fd= my_create(fname_buf, CREATE_MODE,
3379
rli->report(ERROR_LEVEL, my_errno,
3380
_("Error in Create_file event: could not open file '%s'"),
3384
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
3386
rli->report(ERROR_LEVEL, my_errno,
3387
_("Error in Create_file event: write to '%s' failed"),
3391
error=0; // Everything is ok
3395
end_io_cache(&file);
3397
my_close(fd, MYF(0));
3398
session->set_proc_info(0);
3403
/**************************************************************************
3404
Append_block_log_event methods
3405
**************************************************************************/
3408
Append_block_log_event ctor
3411
Append_block_log_event::Append_block_log_event(Session *session_arg,
3413
unsigned char *block_arg,
3414
uint32_t block_len_arg,
3416
:Log_event(session_arg,0, using_trans), block(block_arg),
3417
block_len(block_len_arg), file_id(session_arg->file_id), db(db_arg)
3423
Append_block_log_event ctor
3426
Append_block_log_event::Append_block_log_event(const char* buf, uint32_t len,
3427
const Format_description_log_event* description_event)
3428
:Log_event(buf, description_event),block(0)
3430
uint8_t common_header_len= description_event->common_header_len;
3431
uint8_t append_block_header_len=
3432
description_event->post_header_len[APPEND_BLOCK_EVENT-1];
3433
uint32_t total_header_len= common_header_len+append_block_header_len;
3434
if (len < total_header_len)
3436
file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
3437
block= (unsigned char*)buf + total_header_len;
3438
block_len= len - total_header_len;
3444
Append_block_log_event::write()
3447
bool Append_block_log_event::write(IO_CACHE* file)
3449
unsigned char buf[APPEND_BLOCK_HEADER_LEN];
3450
int4store(buf + AB_FILE_ID_OFFSET, file_id);
3451
return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
3452
my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
3453
my_b_safe_write(file, (unsigned char*) block, block_len));
3458
Append_block_log_event::pack_info()
3461
void Append_block_log_event::pack_info(Protocol *protocol)
3465
length= (uint) sprintf(buf, ";file_id=%u;block_len=%u", file_id,
3467
protocol->store(buf, length, &my_charset_bin);
3472
Append_block_log_event::get_create_or_append()
3475
int Append_block_log_event::get_create_or_append() const
3477
return 0; /* append to the file, fail if not exists */
3481
Append_block_log_event::do_apply_event()
3484
int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
3486
char proc_info[17+FN_REFLEN+10], *fname= proc_info+17;
3490
fname= strcpy(proc_info, "Making temp file ")+17;
3491
slave_load_file_stem(fname, file_id, server_id, ".data");
3492
session->set_proc_info(proc_info);
3493
if (get_create_or_append())
3495
my_delete(fname, MYF(0)); // old copy may exist already
3496
if ((fd= my_create(fname, CREATE_MODE,
3500
rli->report(ERROR_LEVEL, my_errno,
3501
_("Error in %s event: could not create file '%s'"),
3502
get_type_str(), fname);
3506
else if ((fd = my_open(fname, O_WRONLY | O_APPEND,
3509
rli->report(ERROR_LEVEL, my_errno,
3510
_("Error in %s event: could not open file '%s'"),
3511
get_type_str(), fname);
3514
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
3516
rli->report(ERROR_LEVEL, my_errno,
3517
_("Error in %s event: write to '%s' failed"),
3518
get_type_str(), fname);
3525
my_close(fd, MYF(0));
3526
session->set_proc_info(0);
3531
/**************************************************************************
3532
Delete_file_log_event methods
3533
**************************************************************************/
3536
Delete_file_log_event ctor
3539
Delete_file_log_event::Delete_file_log_event(Session *session_arg, const char* db_arg,
3541
:Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
3546
Delete_file_log_event ctor
3549
Delete_file_log_event::Delete_file_log_event(const char* buf, uint32_t len,
3550
const Format_description_log_event* description_event)
3551
:Log_event(buf, description_event),file_id(0)
3553
uint8_t common_header_len= description_event->common_header_len;
3554
uint8_t delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1];
3555
if (len < (uint)(common_header_len + delete_file_header_len))
3557
file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET);
3562
Delete_file_log_event::write()
3565
bool Delete_file_log_event::write(IO_CACHE* file)
3567
unsigned char buf[DELETE_FILE_HEADER_LEN];
3568
int4store(buf + DF_FILE_ID_OFFSET, file_id);
3569
return (write_header(file, sizeof(buf)) ||
3570
my_b_safe_write(file, buf, sizeof(buf)));
3575
Delete_file_log_event::pack_info()
3578
void Delete_file_log_event::pack_info(Protocol *protocol)
3582
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
3583
protocol->store(buf, (int32_t) length, &my_charset_bin);
3587
Delete_file_log_event::do_apply_event()
3590
int Delete_file_log_event::do_apply_event(const Relay_log_info *)
3592
char fname[FN_REFLEN+10];
3593
char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
3594
(void) my_delete(fname, MYF(MY_WME));
3595
strcpy(ext, ".info");
3596
(void) my_delete(fname, MYF(MY_WME));
3601
/**************************************************************************
3602
Execute_load_log_event methods
3603
**************************************************************************/
3606
Execute_load_log_event ctor
3609
Execute_load_log_event::Execute_load_log_event(Session *session_arg,
3612
:Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
3618
Execute_load_log_event ctor
3621
Execute_load_log_event::Execute_load_log_event(const char* buf, uint32_t len,
3622
const Format_description_log_event* description_event)
3623
:Log_event(buf, description_event), file_id(0)
3625
uint8_t common_header_len= description_event->common_header_len;
3626
uint8_t exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1];
3627
if (len < (uint)(common_header_len+exec_load_header_len))
3629
file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET);
3634
Execute_load_log_event::write()
3637
bool Execute_load_log_event::write(IO_CACHE* file)
3639
unsigned char buf[EXEC_LOAD_HEADER_LEN];
3640
int4store(buf + EL_FILE_ID_OFFSET, file_id);
3641
return (write_header(file, sizeof(buf)) ||
3642
my_b_safe_write(file, buf, sizeof(buf)));
3647
Execute_load_log_event::pack_info()
3650
void Execute_load_log_event::pack_info(Protocol *protocol)
3654
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
3655
protocol->store(buf, (int32_t) length, &my_charset_bin);
3660
Execute_load_log_event::do_apply_event()
3663
int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
3665
char fname[FN_REFLEN+10];
3670
Load_log_event *lev= 0;
3672
ext= slave_load_file_stem(fname, file_id, server_id, ".info");
3673
if ((fd = my_open(fname, O_RDONLY,
3674
MYF(MY_WME))) < 0 ||
3675
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
3676
MYF(MY_WME|MY_NABP)))
3678
rli->report(ERROR_LEVEL, my_errno,
3679
_("Error in Exec_load event: could not open file '%s'"),
3683
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
3684
(pthread_mutex_t*)0,
3685
rli->relay_log.description_event_for_exec)) ||
3686
lev->get_type_code() != NEW_LOAD_EVENT)
3688
rli->report(ERROR_LEVEL, 0,
3689
_("Error in Exec_load event: "
3690
"file '%s' appears corrupted"),
3695
lev->session = session;
3697
lev->do_apply_event should use rli only for errors i.e. should
3698
not advance rli's position.
3700
lev->do_apply_event is the place where the table is loaded (it
3701
calls mysql_load()).
3704
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
3705
if (lev->do_apply_event(0,rli,1))
3708
We want to indicate the name of the file that could not be loaded
3710
But as we are here we are sure the error is in rli->last_slave_error and
3711
rli->last_slave_errno (example of error: duplicate entry for key), so we
3712
don't want to overwrite it with the filename.
3713
What we want instead is add the filename to the current error message.
3715
char *tmp= strdup(rli->last_error().message);
3718
rli->report(ERROR_LEVEL, rli->last_error().number,
3719
_("%s. Failed executing load from '%s'"),
3726
We have an open file descriptor to the .info file; we need to close it
3727
or Windows will refuse to delete the file in my_delete().
3731
my_close(fd, MYF(0));
3732
end_io_cache(&file);
3735
(void) my_delete(fname, MYF(MY_WME));
3736
memcpy(ext, ".data", 6);
3737
(void) my_delete(fname, MYF(MY_WME));
3744
my_close(fd, MYF(0));
3745
end_io_cache(&file);
3751
/**************************************************************************
3752
Begin_load_query_log_event methods
3753
**************************************************************************/
3755
Begin_load_query_log_event::
3756
Begin_load_query_log_event(Session* session_arg, const char* db_arg, unsigned char* block_arg,
3757
uint32_t block_len_arg, bool using_trans)
3758
:Append_block_log_event(session_arg, db_arg, block_arg, block_len_arg,
3761
file_id= session_arg->file_id= drizzle_bin_log.next_file_id();
3765
Begin_load_query_log_event::
3766
Begin_load_query_log_event(const char* buf, uint32_t len,
3767
const Format_description_log_event* desc_event)
3768
:Append_block_log_event(buf, len, desc_event)
3773
int Begin_load_query_log_event::get_create_or_append() const
3775
return 1; /* create the file */
3779
Log_event::enum_skip_reason
3780
Begin_load_query_log_event::do_shall_skip(Relay_log_info *rli)
3783
If the slave skip counter is 1, then we should not start executing
3786
return continue_group(rli);
3790
/**************************************************************************
3791
Execute_load_query_log_event methods
3792
**************************************************************************/
3795
Execute_load_query_log_event::
3796
Execute_load_query_log_event(Session *session_arg, const char* query_arg,
3797
ulong query_length_arg, uint32_t fn_pos_start_arg,
3798
uint32_t fn_pos_end_arg,
3799
enum_load_dup_handling dup_handling_arg,
3800
bool using_trans, bool suppress_use,
3801
Session::killed_state killed_err_arg):
3802
Query_log_event(session_arg, query_arg, query_length_arg, using_trans,
3803
suppress_use, killed_err_arg),
3804
file_id(session_arg->file_id), fn_pos_start(fn_pos_start_arg),
3805
fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
3810
Execute_load_query_log_event::
3811
Execute_load_query_log_event(const char* buf, uint32_t event_len,
3812
const Format_description_log_event* desc_event):
3813
Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT),
3814
file_id(0), fn_pos_start(0), fn_pos_end(0)
3816
if (!Query_log_event::is_valid())
3819
buf+= desc_event->common_header_len;
3821
fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET);
3822
fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET);
3823
dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET));
3825
if (fn_pos_start > q_len || fn_pos_end > q_len ||
3826
dup_handling > LOAD_DUP_REPLACE)
3829
file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET);
3833
ulong Execute_load_query_log_event::get_post_header_size_for_derived()
3835
return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN;
3840
Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
3842
unsigned char buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
3843
int4store(buf, file_id);
3844
int4store(buf + 4, fn_pos_start);
3845
int4store(buf + 4 + 4, fn_pos_end);
3846
*(buf + 4 + 4 + 4)= (unsigned char) dup_handling;
3847
return my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
3851
void Execute_load_query_log_event::pack_info(Protocol *protocol)
3854
if (!(buf= (char*) malloc(9 + db_len + q_len + 10 + 21)))
3859
pos= strcpy(buf, "use `")+5;
3860
memcpy(pos, db, db_len);
3861
pos= strcpy(pos+db_len, "`; ")+3;
3865
memcpy(pos, query, q_len);
3868
pos= strcpy(pos, " ;file_id=")+10;
3869
pos= int10_to_str((long) file_id, pos, 10);
3870
protocol->store(buf, pos-buf, &my_charset_bin);
3876
Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli)
3884
buf= (char*) malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
3885
(FN_REFLEN + 10) + 10 + 8 + 5);
3887
/* Replace filename and LOCAL keyword in query before executing it */
3890
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
3891
ER(ER_SLAVE_FATAL_ERROR),
3892
_("Not enough memory"));
3897
memcpy(p, query, fn_pos_start);
3899
fname= (p= strncpy(p, STRING_WITH_LEN(" INFILE \'")) + 9);
3900
p= slave_load_file_stem(p, file_id, server_id, ".data");
3901
fname_end= p= strchr(p, '\0'); // Safer than p=p+5
3903
switch (dup_handling) {
3904
case LOAD_DUP_IGNORE:
3905
p= strncpy(p, STRING_WITH_LEN(" IGNORE")) + 7;
3907
case LOAD_DUP_REPLACE:
3908
p= strncpy(p, STRING_WITH_LEN(" REPLACE")) + 8;
3911
/* Ordinary load data */
3914
size_t end_len = q_len-fn_pos_end;
3915
p= strncpy(p, STRING_WITH_LEN(" INTO")) + 5;
3916
p= strncpy(p, query+fn_pos_end, end_len);
3919
error= Query_log_event::do_apply_event(rli, buf, p-buf);
3921
/* Forging file name for deletion in same buffer */
3925
If there was an error the slave is going to stop, leave the
3926
file so that we can re-execute this event at START SLAVE.
3929
(void) my_delete(fname, MYF(MY_WME));
3936
/**************************************************************************
3938
**************************************************************************/
3941
sql_ex_info::write_data()
3944
bool sql_ex_info::write_data(IO_CACHE* file)
3948
return (write_str(file, field_term, (uint) field_term_len) ||
3949
write_str(file, enclosed, (uint) enclosed_len) ||
3950
write_str(file, line_term, (uint) line_term_len) ||
3951
write_str(file, line_start, (uint) line_start_len) ||
3952
write_str(file, escaped, (uint) escaped_len) ||
3953
my_b_safe_write(file,(unsigned char*) &opt_flags,1));
3965
const char *sql_ex_info::init(const char *buf, const char *buf_end,
3966
bool use_new_format)
3968
cached_new_format = use_new_format;
3973
The code below assumes that buf will not disappear from
3974
under our feet during the lifetime of the event. This assumption
3975
holds true in the slave thread if the log is in new format, but is not
3976
the case when we have old format because we will be reusing net buffer
3977
to read the actual file before we write out the Create_file event.
3979
if (read_str(&buf, buf_end, &field_term, &field_term_len) ||
3980
read_str(&buf, buf_end, &enclosed, &enclosed_len) ||
3981
read_str(&buf, buf_end, &line_term, &line_term_len) ||
3982
read_str(&buf, buf_end, &line_start, &line_start_len) ||
3983
read_str(&buf, buf_end, &escaped, &escaped_len))
3989
field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
3990
field_term = buf++; // Use first byte in string
3996
empty_flags= *buf++;
3997
if (empty_flags & FIELD_TERM_EMPTY)
3999
if (empty_flags & ENCLOSED_EMPTY)
4001
if (empty_flags & LINE_TERM_EMPTY)
4003
if (empty_flags & LINE_START_EMPTY)
4005
if (empty_flags & ESCAPED_EMPTY)
4012
/**************************************************************************
4013
Rows_log_event member functions
4014
**************************************************************************/
4016
Rows_log_event::Rows_log_event(Session *session_arg, Table *tbl_arg, ulong tid,
4017
MY_BITMAP const *cols, bool is_transactional)
4018
: Log_event(session_arg, 0, is_transactional),
4022
m_width(tbl_arg ? tbl_arg->s->fields : 1),
4023
m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
4024
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4027
We allow a special form of dummy event when the table, and cols
4028
are null and the table id is UINT32_MAX. This is a temporary
4029
solution, to be able to terminate a started statement in the
4030
binary log: the extraneous events will be removed in the future.
4032
assert((tbl_arg && tbl_arg->s && tid != UINT32_MAX) || (!tbl_arg && !cols && tid == UINT32_MAX));
4034
if (session_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
4035
set_flags(NO_FOREIGN_KEY_CHECKS_F);
4036
if (session_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
4037
set_flags(RELAXED_UNIQUE_CHECKS_F);
4038
/* if bitmap_init fails, caught in is_valid() */
4039
if (likely(!bitmap_init(&m_cols,
4040
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4044
/* Cols can be zero if this is a dummy binrows event */
4045
if (likely(cols != NULL))
4047
memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
4048
create_last_word_mask(&m_cols);
4053
// Needed because bitmap_init() does not set it to null on failure
4059
Rows_log_event::Rows_log_event(const char *buf, uint32_t event_len,
4060
Log_event_type event_type,
4061
const Format_description_log_event
4063
: Log_event(buf, description_event),
4066
m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
4067
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4069
uint8_t const common_header_len= description_event->common_header_len;
4070
uint8_t const post_header_len= description_event->post_header_len[event_type-1];
4072
const char *post_start= buf + common_header_len;
4073
post_start+= RW_MAPID_OFFSET;
4074
if (post_header_len == 6)
4076
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
4077
m_table_id= uint4korr(post_start);
4082
m_table_id= (ulong) uint6korr(post_start);
4083
post_start+= RW_FLAGS_OFFSET;
4086
m_flags= uint2korr(post_start);
4088
unsigned char const *const var_start=
4089
(const unsigned char *)buf + common_header_len + post_header_len;
4090
unsigned char const *const ptr_width= var_start;
4091
unsigned char *ptr_after_width= (unsigned char*) ptr_width;
4092
m_width = net_field_length(&ptr_after_width);
4093
/* if bitmap_init fails, catched in is_valid() */
4094
if (likely(!bitmap_init(&m_cols,
4095
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4099
memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
4100
create_last_word_mask(&m_cols);
4101
ptr_after_width+= (m_width + 7) / 8;
4105
// Needed because bitmap_init() does not set it to null on failure
4106
m_cols.bitmap= NULL;
4110
m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
4112
if (event_type == UPDATE_ROWS_EVENT)
4114
/* if bitmap_init fails, caught in is_valid() */
4115
if (likely(!bitmap_init(&m_cols_ai,
4116
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
4120
memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
4121
create_last_word_mask(&m_cols_ai);
4122
ptr_after_width+= (m_width + 7) / 8;
4126
// Needed because bitmap_init() does not set it to null on failure
4127
m_cols_ai.bitmap= 0;
4132
const unsigned char* const ptr_rows_data= (const unsigned char*) ptr_after_width;
4134
size_t const data_size= event_len - (ptr_rows_data - (const unsigned char *) buf);
4136
m_rows_buf= (unsigned char*) malloc(data_size);
4137
if (likely((bool)m_rows_buf))
4139
m_curr_row= m_rows_buf;
4140
m_rows_end= m_rows_buf + data_size;
4141
m_rows_cur= m_rows_end;
4142
memcpy(m_rows_buf, ptr_rows_data, data_size);
4145
m_cols.bitmap= 0; // to not free it
4150
Rows_log_event::~Rows_log_event()
4152
if (m_cols.bitmap == m_bitbuf) // no malloc happened
4153
m_cols.bitmap= 0; // so no free in bitmap_free
4154
bitmap_free(&m_cols); // To pair with bitmap_init().
4155
free((unsigned char*)m_rows_buf);
4158
int Rows_log_event::get_data_size()
4160
int const type_code= get_type_code();
4162
unsigned char buf[sizeof(m_width)+1];
4163
unsigned char *end= net_store_length(buf, (m_width + 7) / 8);
4165
int data_size= ROWS_HEADER_LEN;
4166
data_size+= no_bytes_in_map(&m_cols);
4167
data_size+= end - buf;
4169
if (type_code == UPDATE_ROWS_EVENT)
4170
data_size+= no_bytes_in_map(&m_cols_ai);
4172
data_size+= (m_rows_cur - m_rows_buf);
4177
int Rows_log_event::do_add_row_data(unsigned char *row_data, size_t length)
4180
When the table has a primary key, we would probably want, by default, to
4181
log only the primary key value instead of the entire "before image". This
4182
would save binlog space. TODO
4186
If length is zero, there is nothing to write, so we just
4187
return. Note that this is not an optimization, since calling
4188
realloc() with size 0 means free().
4196
assert(m_rows_buf <= m_rows_cur);
4197
assert(!m_rows_buf || (m_rows_end && m_rows_buf <= m_rows_end));
4198
assert(m_rows_cur <= m_rows_end);
4200
/* The cast will always work since m_rows_cur <= m_rows_end */
4201
if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
4203
size_t const block_size= 1024;
4204
const size_t cur_size= m_rows_cur - m_rows_buf;
4205
const size_t new_alloc=
4206
block_size * ((cur_size + length + block_size - 1) / block_size);
4208
unsigned char* new_buf= (unsigned char*)realloc(m_rows_buf, new_alloc);
4209
if (unlikely(!new_buf))
4210
return(HA_ERR_OUT_OF_MEM);
4212
/* If the memory moved, we need to move the pointers */
4213
if (new_buf != m_rows_buf)
4215
m_rows_buf= new_buf;
4216
m_rows_cur= m_rows_buf + cur_size;
4220
The end pointer should always be changed to point to the end of
4221
the allocated memory.
4223
m_rows_end= m_rows_buf + new_alloc;
4226
assert(m_rows_cur + length <= m_rows_end);
4227
memcpy(m_rows_cur, row_data, length);
4228
m_rows_cur+= length;
4233
int Rows_log_event::do_apply_event(Relay_log_info const *rli)
4237
If m_table_id == UINT32_MAX, then we have a dummy event that does not
4238
contain any data. In that case, we just remove all tables in the
4239
tables_to_lock list, close the thread tables, and return with
4242
if (m_table_id == UINT32_MAX)
4245
This one is supposed to be set: just an extra check so that
4246
nothing strange has happened.
4248
assert(get_flags(STMT_END_F));
4250
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4251
close_thread_tables(session);
4252
session->clear_error();
4257
'session' has been set by exec_relay_log_event(), just before calling
4258
do_apply_event(). We still check here to prevent future coding
4261
assert(rli->sql_session == session);
4264
If there is no locks taken, this is the first binrow event seen
4265
after the table map events. We should then lock all the tables
4266
used in the transaction and proceed with execution of the actual
4271
bool need_reopen= 1; /* To execute the first lap of the loop below */
4274
lock_tables() reads the contents of session->lex, so they must be
4275
initialized. Contrary to in
4276
Table_map_log_event::do_apply_event() we don't call
4277
mysql_init_query() as that may reset the binlog format.
4282
There are a few flags that are replicated with each row event.
4283
Make sure to set/clear them before executing the main body of
4286
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
4287
session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
4289
session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
4291
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
4292
session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
4294
session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
4295
/* A small test to verify that objects have consistent types */
4296
assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
4299
while ((error= lock_tables(session, rli->tables_to_lock,
4300
rli->tables_to_lock_count, &need_reopen)))
4304
if (session->is_slave_error || session->is_fatal_error)
4307
Error reporting borrowed from Query_log_event with many excessive
4308
simplifications (we don't honour --slave-skip-errors)
4310
uint32_t actual_error= session->main_da.sql_errno();
4311
rli->report(ERROR_LEVEL, actual_error,
4312
_("Error '%s' in %s event: when locking tables"),
4314
? session->main_da.message()
4315
: _("unexpected success or fatal error")),
4317
session->is_fatal_error= 1;
4321
rli->report(ERROR_LEVEL, error,
4322
_("Error in %s event: when locking tables"),
4325
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4329
TableList *tables= rli->tables_to_lock;
4330
close_tables_for_reopen(session, &tables);
4332
uint32_t tables_count= rli->tables_to_lock_count;
4333
if ((error= open_tables(session, &tables, &tables_count, 0)))
4335
if (session->is_slave_error || session->is_fatal_error)
4338
Error reporting borrowed from Query_log_event with many excessive
4339
simplifications (we don't honour --slave-skip-errors)
4341
uint32_t actual_error= session->main_da.sql_errno();
4342
rli->report(ERROR_LEVEL, actual_error,
4343
_("Error '%s' on reopening tables"),
4345
? session->main_da.message()
4346
: _("unexpected success or fatal error")));
4347
session->is_slave_error= 1;
4349
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4355
When the open and locking succeeded, we check all tables to
4356
ensure that they still have the correct type.
4358
We can use a down cast here since we know that every table added
4359
to the tables_to_lock is a RPL_TableList.
4363
RPL_TableList *ptr= rli->tables_to_lock;
4364
for ( ; ptr ; ptr= static_cast<RPL_TableList*>(ptr->next_global))
4366
if (ptr->m_tabledef.compatible_with(rli, ptr->table))
4368
mysql_unlock_tables(session, session->lock);
4370
session->is_slave_error= 1;
4371
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4372
return(ERR_BAD_TABLE_DEF);
4378
... and then we add all the tables to the table map and remove
4379
them from tables to lock.
4381
We also invalidate the query cache for all the tables, since
4382
they will now be changed.
4384
TODO [/Matz]: Maybe the query cache should not be invalidated
4385
here? It might be that a table is not changed, even though it
4386
was locked for the statement. We do know that each
4387
Rows_log_event contain at least one row, so after processing one
4388
Rows_log_event, we can invalidate the query cache for the
4391
for (TableList *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
4393
const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
4399
m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
4404
table == NULL means that this table should not be replicated
4405
(this was set up by Table_map_log_event::do_apply_event()
4406
which tested replicate-* rules).
4410
It's not needed to set_time() but
4411
1) it continues the property that "Time" in SHOW PROCESSLIST shows how
4412
much slave is behind
4413
2) it will be needed when we allow replication from a table with no
4414
TIMESTAMP column to a table with one.
4415
So we call set_time(), like in SBR. Presently it changes nothing.
4417
session->set_time((time_t)when);
4419
There are a few flags that are replicated with each row event.
4420
Make sure to set/clear them before executing the main body of
4423
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
4424
session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
4426
session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
4428
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
4429
session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
4431
session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
4433
if (slave_allow_batching)
4434
session->options|= OPTION_ALLOW_BATCH;
4436
session->options&= ~OPTION_ALLOW_BATCH;
4438
/* A small test to verify that objects have consistent types */
4439
assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
4442
Now we are in a statement and will stay in a statement until we
4445
We set this flag here, before actually applying any rows, in
4446
case the SQL thread is stopped and we need to detect that we're
4447
inside a statement and halting abruptly might cause problems
4450
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
4452
if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
4453
set_flags(COMPLETE_ROWS_F);
4456
Set tables write and read sets.
4458
Read_set contains all slave columns (in case we are going to fetch
4459
a complete record from slave)
4461
Write_set equals the m_cols bitmap sent from master but it can be
4462
longer if slave has extra columns.
4465
bitmap_set_all(table->read_set);
4466
bitmap_set_all(table->write_set);
4467
if (!get_flags(COMPLETE_ROWS_F))
4468
bitmap_intersect(table->write_set,&m_cols);
4470
this->slave_exec_mode= slave_exec_mode_options; // fix the mode
4472
// Do event specific preparations
4473
error= do_before_row_operations(rli);
4475
// row processing loop
4477
while (error == 0 && m_curr_row < m_rows_end)
4479
/* in_use can have been set to NULL in close_tables_for_reopen */
4480
Session* old_session= table->in_use;
4482
table->in_use= session;
4484
error= do_exec_row(rli);
4486
table->in_use = old_session;
4492
The following list of "idempotent" errors
4493
means that an error from the list might happen
4494
because of idempotent (more than once)
4495
applying of a binlog file.
4496
Notice, that binlog has a ddl operation its
4497
second applying may cause
4499
case HA_ERR_TABLE_DEF_CHANGED:
4500
case HA_ERR_CANNOT_ADD_FOREIGN:
4502
which are not included into to the list.
4504
case HA_ERR_RECORD_CHANGED:
4505
case HA_ERR_RECORD_DELETED:
4506
case HA_ERR_KEY_NOT_FOUND:
4507
case HA_ERR_END_OF_FILE:
4508
case HA_ERR_FOUND_DUPP_KEY:
4509
case HA_ERR_FOUND_DUPP_UNIQUE:
4510
case HA_ERR_FOREIGN_DUPLICATE_KEY:
4511
case HA_ERR_NO_REFERENCED_ROW:
4512
case HA_ERR_ROW_IS_REFERENCED:
4513
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
4515
if (global_system_variables.log_warnings)
4516
slave_rows_error_report(WARNING_LEVEL, error, rli, session, table,
4518
RPL_LOG_NAME, (ulong) log_pos);
4524
session->is_slave_error= 1;
4529
If m_curr_row_end was not set during event execution (e.g., because
4530
of errors) we can't proceed to the next row. If the error is transient
4531
(i.e., error==0 at this point) we must call unpack_current_row() to set
4534
if (!m_curr_row_end && !error)
4535
unpack_current_row(rli, &m_cols);
4537
// at this moment m_curr_row_end should be set
4538
assert(error || m_curr_row_end != NULL);
4539
assert(error || m_curr_row < m_curr_row_end);
4540
assert(error || m_curr_row_end <= m_rows_end);
4542
m_curr_row= m_curr_row_end;
4544
} // row processing loop
4546
error= do_after_row_operations(rli, error);
4549
session->options|= OPTION_KEEP_LOG;
4554
We need to delay this clear until here bacause unpack_current_row() uses
4555
master-side table definitions stored in rli.
4557
if (rli->tables_to_lock && get_flags(STMT_END_F))
4558
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4559
/* reset OPTION_ALLOW_BATCH as not affect later events */
4560
session->options&= ~OPTION_ALLOW_BATCH;
4563
{ /* error has occured during the transaction */
4564
slave_rows_error_report(ERROR_LEVEL, error, rli, session, table,
4565
get_type_str(), RPL_LOG_NAME, (ulong) log_pos);
4570
If one day we honour --skip-slave-errors in row-based replication, and
4571
the error should be skipped, then we would clear mappings, rollback,
4572
close tables, but the slave SQL thread would not stop and then may
4573
assume the mapping is still available, the tables are still open...
4574
So then we should clear mappings/rollback/close here only if this is a
4576
For now we code, knowing that error is not skippable and so slave SQL
4577
thread is certainly going to stop.
4578
rollback at the caller along with sbr.
4580
const_cast<Relay_log_info*>(rli)->cleanup_context(session, error);
4581
session->is_slave_error= 1;
4586
This code would ideally be placed in do_update_pos() instead, but
4587
since we have no access to table there, we do the setting of
4588
last_event_start_time here instead.
4590
if (table && (table->s->primary_key == MAX_KEY) &&
4591
!cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS)
4594
------------ Temporary fix until WL#2975 is implemented ---------
4596
This event is not the last one (no STMT_END_F). If we stop now
4597
(in case of terminate_slave_thread()), how will we restart? We
4598
have to restart from Table_map_log_event, but as this table is
4599
not transactional, the rows already inserted will still be
4600
present, and idempotency is not guaranteed (no PK) so we risk
4601
that repeating leads to double insert. So we desperately try to
4602
continue, hope we'll eventually leave this buggy situation (by
4603
executing the final Rows_log_event). If we are in a hopeless
4604
wait (reached end of last relay log and nothing gets appended
4605
there), we timeout after one minute, and notify DBA about the
4606
problem. When WL#2975 is implemented, just remove the member
4607
Relay_log_info::last_event_start_time and all its occurrences.
4609
const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
4615
Log_event::enum_skip_reason
4616
Rows_log_event::do_shall_skip(Relay_log_info *rli)
4619
If the slave skip counter is 1 and this event does not end a
4620
statement, then we should not start executing on the next event.
4621
Otherwise, we defer the decision to the normal skipping logic.
4623
if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
4624
return Log_event::EVENT_SKIP_IGNORE;
4626
return Log_event::do_shall_skip(rli);
4630
Rows_log_event::do_update_pos(Relay_log_info *rli)
4634
if (get_flags(STMT_END_F))
4637
If this event is not in a transaction, the call below will, if some
4638
transactional storage engines are involved, commit the statement into
4639
them and flush the pending event to binlog.
4640
If this event is in a transaction, the call will do nothing, but a
4641
Xid_log_event will come next which will, if some transactional engines
4642
are involved, commit the transaction and flush the pending event to the
4645
error= ha_autocommit_or_rollback(session, 0);
4648
Now what if this is not a transactional engine? we still need to
4649
flush the pending event to the binlog; we did it with
4650
session->binlog_flush_pending_rows_event(). Note that we imitate
4651
what is done for real queries: a call to
4652
ha_autocommit_or_rollback() (sometimes only if involves a
4653
transactional engine), and a call to be sure to have the pending
4657
rli->cleanup_context(session, 0);
4661
Indicate that a statement is finished.
4662
Step the group log position if we are not in a transaction,
4663
otherwise increase the event log position.
4665
rli->stmt_done(log_pos, when);
4668
Clear any errors pushed in session->net.last_err* if for example "no key
4669
found" (as this is allowed). This is a safety measure; apparently
4670
those errors (e.g. when executing a Delete_rows_log_event of a
4671
non-existing row, like in rpl_row_mystery22.test,
4672
session->net.last_error = "Can't find record in 't1'" and last_errno=1032)
4673
do not become visible. We still prefer to wipe them out.
4675
session->clear_error();
4678
rli->report(ERROR_LEVEL, error,
4679
_("Error in %s event: commit of row events failed, "
4681
get_type_str(), m_table->s->db.str,
4682
m_table->s->table_name.str);
4686
rli->inc_event_relay_log_pos();
4692
bool Rows_log_event::write_data_header(IO_CACHE *file)
4694
unsigned char buf[ROWS_HEADER_LEN]; // No need to init the buffer
4695
assert(m_table_id != UINT32_MAX);
4696
int6store(buf + RW_MAPID_OFFSET, (uint64_t)m_table_id);
4697
int2store(buf + RW_FLAGS_OFFSET, m_flags);
4698
return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
4701
bool Rows_log_event::write_data_body(IO_CACHE*file)
4704
Note that this should be the number of *bits*, not the number of
4707
unsigned char sbuf[sizeof(m_width)];
4708
my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
4710
unsigned char *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
4711
assert(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
4713
res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
4715
res= res || my_b_safe_write(file, (unsigned char*) m_cols.bitmap,
4716
no_bytes_in_map(&m_cols));
4718
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
4720
if (get_type_code() == UPDATE_ROWS_EVENT)
4722
res= res || my_b_safe_write(file, (unsigned char*) m_cols_ai.bitmap,
4723
no_bytes_in_map(&m_cols_ai));
4725
res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
4732
void Rows_log_event::pack_info(Protocol *protocol)
4735
char const *const flagstr=
4736
get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
4737
size_t bytes= snprintf(buf, sizeof(buf),
4738
"table_id: %lu%s", m_table_id, flagstr);
4739
protocol->store(buf, bytes, &my_charset_bin);
4743
/**************************************************************************
4744
Table_map_log_event member functions and support functions
4745
**************************************************************************/
4748
@page How replication of field metadata works.
4750
When a table map is created, the master first calls
4751
Table_map_log_event::save_field_metadata() which calculates how many
4752
values will be in the field metadata. Only those fields that require the
4753
extra data are added. The method also loops through all of the fields in
4754
the table calling the method Field::save_field_metadata() which returns the
4755
values for the field that will be saved in the metadata and replicated to
4756
the slave. Once all fields have been processed, the table map is written to
4757
the binlog adding the size of the field metadata and the field metadata to
4758
the end of the body of the table map.
4760
When a table map is read on the slave, the field metadata is read from the
4761
table map and passed to the table_def class constructor which saves the
4762
field metadata from the table map into an array based on the type of the
4763
field. Field metadata values not present (those fields that do not use extra
4764
data) in the table map are initialized as zero (0). The array size is the
4765
same as the columns for the table on the slave.
4767
Additionally, values saved for field metadata on the master are saved as a
4768
string of bytes (unsigned char) in the binlog. A field may require 1 or more bytes
4769
to store the information. In cases where values require multiple bytes
4770
(e.g. values > 255), the endian-safe methods are used to properly encode
4771
the values on the master and decode them on the slave. When the field
4772
metadata values are captured on the slave, they are stored in an array of
4773
type uint16_t. This allows the least number of casts to prevent casting bugs
4774
when the field metadata is used in comparisons of field attributes. When
4775
the field metadata is used for calculating addresses in pointer math, the
4776
type used is uint32_t.
4780
Save the field metadata based on the real_type of the field.
4781
The metadata saved depends on the type of the field. Some fields
4782
store a single byte for pack_length() while others store two bytes
4783
for field_length (max length).
4788
We may want to consider changing the encoding of the information.
4789
Currently, the code attempts to minimize the number of bytes written to
4790
the tablemap. There are at least two other alternatives; 1) using
4791
net_store_length() to store the data allowing it to choose the number of
4792
bytes that are appropriate thereby making the code much easier to
4793
maintain (only 1 place to change the encoding), or 2) use a fixed number
4794
of bytes for each field. The problem with option 1 is that net_store_length()
4795
will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
4796
for fields like CHAR which can be no larger than 255 characters, the method
4797
will use 3 bytes when the value is > 250. Further, every value that is
4798
encoded using 2 parts (e.g., pack_length, field_length) will be numerically
4799
> 250 therefore will use 3 bytes for eah value. The problem with option 2
4800
is less wasteful for space but does waste 1 byte for every field that does
4803
int Table_map_log_event::save_field_metadata()
4806
for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
4807
index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
4812
Constructor used to build an event for writing to the binary log.
4813
Mats says tbl->s lives longer than this event so it's ok to copy pointers
4814
(tbl->s->db etc) and not pointer content.
4816
Table_map_log_event::Table_map_log_event(Session *session, Table *tbl,
4817
ulong tid, bool, uint16_t flags)
4818
: Log_event(session, 0, true),
4820
m_dbnam(tbl->s->db.str),
4821
m_dblen(m_dbnam ? tbl->s->db.length : 0),
4822
m_tblnam(tbl->s->table_name.str),
4823
m_tbllen(tbl->s->table_name.length),
4824
m_colcnt(tbl->s->fields),
4829
m_field_metadata(0),
4830
m_field_metadata_size(0),
4834
assert(m_table_id != UINT32_MAX);
4836
In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
4837
table.cc / alloc_table_share():
4838
Use the fact the key is db/0/table_name/0
4839
As we rely on this let's assert it.
4841
assert((tbl->s->db.str == 0) ||
4842
(tbl->s->db.str[tbl->s->db.length] == 0));
4843
assert(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
4846
m_data_size= TABLE_MAP_HEADER_LEN;
4847
m_data_size+= m_dblen + 2; // Include length and terminating \0
4848
m_data_size+= m_tbllen + 2; // Include length and terminating \0
4849
m_data_size+= 1 + m_colcnt; // COLCNT and column types
4851
/* If malloc fails, caught in is_valid() */
4852
if ((m_memory= (unsigned char*) malloc(m_colcnt)))
4854
m_coltype= reinterpret_cast<unsigned char*>(m_memory);
4855
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
4856
m_coltype[i]= m_table->field[i]->type();
4860
Calculate a bitmap for the results of maybe_null() for all columns.
4861
The bitmap is used to determine when there is a column from the master
4862
that is not on the slave and is null and thus not in the row data during
4865
uint32_t num_null_bytes= (m_table->s->fields + 7) / 8;
4866
m_data_size+= num_null_bytes;
4867
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
4868
&m_null_bits, num_null_bytes,
4869
&m_field_metadata, (m_colcnt * 2),
4872
memset(m_field_metadata, 0, (m_colcnt * 2));
4875
Create an array for the field metadata and store it.
4877
m_field_metadata_size= save_field_metadata();
4878
assert(m_field_metadata_size <= (m_colcnt * 2));
4881
Now set the size of the data to the size of the field metadata array
4882
plus one or two bytes for number of elements in the field metadata array.
4884
if (m_field_metadata_size > 255)
4885
m_data_size+= m_field_metadata_size + 2;
4887
m_data_size+= m_field_metadata_size + 1;
4889
memset(m_null_bits, 0, num_null_bytes);
4890
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
4891
if (m_table->field[i]->maybe_null())
4892
m_null_bits[(i / 8)]+= 1 << (i % 8);
4898
Constructor used by slave to read the event from the binary log.
4900
Table_map_log_event::Table_map_log_event(const char *buf, uint32_t event_len,
4901
const Format_description_log_event
4904
: Log_event(buf, description_event),
4906
m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
4907
m_colcnt(0), m_coltype(0),
4908
m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
4909
m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
4910
m_null_bits(0), m_meta_memory(NULL)
4912
unsigned int bytes_read= 0;
4914
uint8_t common_header_len= description_event->common_header_len;
4915
uint8_t post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
4917
/* Read the post-header */
4918
const char *post_start= buf + common_header_len;
4920
post_start+= TM_MAPID_OFFSET;
4921
if (post_header_len == 6)
4923
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
4924
m_table_id= uint4korr(post_start);
4929
assert(post_header_len == TABLE_MAP_HEADER_LEN);
4930
m_table_id= (ulong) uint6korr(post_start);
4931
post_start+= TM_FLAGS_OFFSET;
4934
assert(m_table_id != UINT32_MAX);
4936
m_flags= uint2korr(post_start);
4938
/* Read the variable part of the event */
4939
const char *const vpart= buf + common_header_len + post_header_len;
4941
/* Extract the length of the various parts from the buffer */
4942
unsigned char const *const ptr_dblen= (unsigned char const*)vpart + 0;
4943
m_dblen= *(unsigned char*) ptr_dblen;
4945
/* Length of database name + counter + terminating null */
4946
unsigned char const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
4947
m_tbllen= *(unsigned char*) ptr_tbllen;
4949
/* Length of table name + counter + terminating null */
4950
unsigned char const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
4951
unsigned char *ptr_after_colcnt= (unsigned char*) ptr_colcnt;
4952
m_colcnt= net_field_length(&ptr_after_colcnt);
4954
/* Allocate mem for all fields in one go. If fails, caught in is_valid() */
4955
m_memory= (unsigned char*) my_multi_malloc(MYF(MY_WME),
4956
&m_dbnam, (uint) m_dblen + 1,
4957
&m_tblnam, (uint) m_tbllen + 1,
4958
&m_coltype, (uint) m_colcnt,
4963
/* Copy the different parts into their memory */
4964
strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
4965
strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
4966
memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
4968
ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
4969
bytes_read= ptr_after_colcnt - (unsigned char *)buf;
4970
if (bytes_read < event_len)
4972
m_field_metadata_size= net_field_length(&ptr_after_colcnt);
4973
assert(m_field_metadata_size <= (m_colcnt * 2));
4974
uint32_t num_null_bytes= (m_colcnt + 7) / 8;
4975
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
4976
&m_null_bits, num_null_bytes,
4977
&m_field_metadata, m_field_metadata_size,
4979
memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
4980
ptr_after_colcnt= (unsigned char*)ptr_after_colcnt + m_field_metadata_size;
4981
memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
4988
Table_map_log_event::~Table_map_log_event()
4990
free(m_meta_memory);
4995
Return value is an error code, one of:
4997
-1 Failure to open table [from open_tables()]
4999
1 No room for more tables [from set_table()]
5000
2 Out of memory [from set_table()]
5001
3 Wrong table definition
5002
4 Daisy-chaining RBR with SBR not possible
5005
int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
5007
RPL_TableList *table_list;
5008
char *db_mem, *tname_mem;
5009
Query_id &query_id= Query_id::get_query_id();
5011
assert(rli->sql_session == session);
5013
/* Step the query id to mark what columns that are actually used. */
5014
session->query_id= query_id.next();
5016
if (!(memory= my_multi_malloc(MYF(MY_WME),
5017
&table_list, (uint) sizeof(RPL_TableList),
5018
&db_mem, (uint) NAME_LEN + 1,
5019
&tname_mem, (uint) NAME_LEN + 1,
5021
return(HA_ERR_OUT_OF_MEM);
5023
memset(table_list, 0, sizeof(*table_list));
5024
table_list->db = db_mem;
5025
table_list->alias= table_list->table_name = tname_mem;
5026
table_list->lock_type= TL_WRITE;
5027
table_list->next_global= table_list->next_local= 0;
5028
table_list->table_id= m_table_id;
5029
table_list->updating= 1;
5030
strcpy(table_list->db, m_dbnam);
5031
strcpy(table_list->table_name, m_tblnam);
5037
open_tables() reads the contents of session->lex, so they must be
5038
initialized, so we should call lex_start(); to be even safer, we
5039
call mysql_init_query() which does a more complete set of inits.
5042
mysql_reset_session_for_next_command(session);
5045
Open the table if it is not already open and add the table to
5046
table map. Note that for any table that should not be
5047
replicated, a filter is needed.
5049
The creation of a new TableList is used to up-cast the
5050
table_list consisting of RPL_TableList items. This will work
5051
since the only case where the argument to open_tables() is
5052
changed, is when session->lex->query_tables == table_list, i.e.,
5053
when the statement requires prelocking. Since this is not
5054
executed when a statement is executed, this case will not occur.
5055
As a precaution, an assertion is added to ensure that the bad
5058
Either way, the memory in the list is *never* released
5059
internally in the open_tables() function, hence we take a copy
5060
of the pointer to make sure that it's not lost.
5063
assert(session->lex->query_tables != table_list);
5064
TableList *tmp_table_list= table_list;
5065
if ((error= open_tables(session, &tmp_table_list, &count, 0)))
5067
if (session->is_slave_error || session->is_fatal_error)
5070
Error reporting borrowed from Query_log_event with many excessive
5071
simplifications (we don't honour --slave-skip-errors)
5073
uint32_t actual_error= session->main_da.sql_errno();
5074
rli->report(ERROR_LEVEL, actual_error,
5075
_("Error '%s' on opening table `%s`.`%s`"),
5077
? session->main_da.message()
5078
: _("unexpected success or fatal error")),
5079
table_list->db, table_list->table_name);
5080
session->is_slave_error= 1;
5085
m_table= table_list->table;
5088
This will fail later otherwise, the 'in_use' field should be
5089
set to the current thread.
5091
assert(m_table->in_use);
5094
Use placement new to construct the table_def instance in the
5095
memory allocated for it inside table_list.
5097
The memory allocated by the table_def structure (i.e., not the
5098
memory allocated *for* the table_def structure) is released
5099
inside Relay_log_info::clear_tables_to_lock() by calling the
5100
table_def destructor explicitly.
5102
new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt,
5103
m_field_metadata, m_field_metadata_size, m_null_bits);
5104
table_list->m_tabledef_valid= true;
5107
We record in the slave's information that the table should be
5108
locked by linking the table into the list of tables to lock.
5110
table_list->next_global= table_list->next_local= rli->tables_to_lock;
5111
const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
5112
const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
5113
/* 'memory' is freed in clear_tables_to_lock */
5123
Log_event::enum_skip_reason
5124
Table_map_log_event::do_shall_skip(Relay_log_info *rli)
5127
If the slave skip counter is 1, then we should not start executing
5130
return continue_group(rli);
5133
int Table_map_log_event::do_update_pos(Relay_log_info *rli)
5135
rli->inc_event_relay_log_pos();
5140
bool Table_map_log_event::write_data_header(IO_CACHE *file)
5142
assert(m_table_id != UINT32_MAX);
5143
unsigned char buf[TABLE_MAP_HEADER_LEN];
5144
int6store(buf + TM_MAPID_OFFSET, (uint64_t)m_table_id);
5145
int2store(buf + TM_FLAGS_OFFSET, m_flags);
5146
return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
5149
bool Table_map_log_event::write_data_body(IO_CACHE *file)
5151
assert(m_dbnam != NULL);
5152
assert(m_tblnam != NULL);
5153
/* We use only one byte per length for storage in event: */
5154
assert(m_dblen < 128);
5155
assert(m_tbllen < 128);
5157
unsigned char const dbuf[]= { (unsigned char) m_dblen };
5158
unsigned char const tbuf[]= { (unsigned char) m_tbllen };
5160
unsigned char cbuf[sizeof(m_colcnt)];
5161
unsigned char *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
5162
assert(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
5165
Store the size of the field metadata.
5167
unsigned char mbuf[sizeof(m_field_metadata_size)];
5168
unsigned char *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
5170
return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
5171
my_b_safe_write(file, (const unsigned char*)m_dbnam, m_dblen+1) ||
5172
my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
5173
my_b_safe_write(file, (const unsigned char*)m_tblnam, m_tbllen+1) ||
5174
my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
5175
my_b_safe_write(file, m_coltype, m_colcnt) ||
5176
my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
5177
my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
5178
my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
5183
Print some useful information for the SHOW BINARY LOG information
5187
void Table_map_log_event::pack_info(Protocol *protocol)
5190
size_t bytes= snprintf(buf, sizeof(buf),
5191
"table_id: %lu (%s.%s)",
5192
m_table_id, m_dbnam, m_tblnam);
5193
protocol->store(buf, bytes, &my_charset_bin);
5197
/**************************************************************************
5198
Write_rows_log_event member functions
5199
**************************************************************************/
5202
Constructor used to build an event for writing to the binary log.
5204
Write_rows_log_event::Write_rows_log_event(Session *session_arg, Table *tbl_arg,
5206
bool is_transactional)
5207
: Rows_log_event(session_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
5212
Constructor used by slave to read the event from the binary log.
5214
Write_rows_log_event::Write_rows_log_event(const char *buf, uint32_t event_len,
5215
const Format_description_log_event
5217
: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
5222
Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
5227
todo: to introduce a property for the event (handler?) which forces
5228
applying the event in the replace (idempotent) fashion.
5230
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5233
We are using REPLACE semantics and not INSERT IGNORE semantics
5234
when writing rows, that is: new rows replace old rows. We need to
5235
inform the storage engine that it should use this behaviour.
5238
/* Tell the storage engine that we are using REPLACE semantics. */
5239
session->lex->duplicates= DUP_REPLACE;
5242
Pretend we're executing a REPLACE command: this is needed for
5243
InnoDB since it is not (properly) checking the
5244
lex->duplicates flag.
5246
session->lex->sql_command= SQLCOM_REPLACE;
5248
Do not raise the error flag in case of hitting to an unique attribute
5250
m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
5253
m_table->file->ha_start_bulk_insert(0);
5255
We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
5256
any TIMESTAMP column with data from the row but instead will use
5257
the event's current time.
5258
As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
5259
columns, we know that all TIMESTAMP columns on slave will receive explicit
5260
data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
5261
When we allow a table without TIMESTAMP to be replicated to a table having
5262
more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
5263
column to be replicated into a BIGINT column and the slave's table has a
5264
TIMESTAMP column, then the slave's TIMESTAMP column will take its value
5265
from set_time() which we called earlier (consistent with SBR). And then in
5266
some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
5267
analyze if explicit data is provided for slave's TIMESTAMP columns).
5269
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
5275
Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
5279
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5281
m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
5282
m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
5284
resetting the extra with
5285
table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
5287
explanation: file->reset() performs this duty
5288
ultimately. Still todo: fix
5291
if ((local_error= m_table->file->ha_end_bulk_insert()))
5293
m_table->file->print_error(local_error, MYF(0));
5295
return error? error : local_error;
5300
Check if there are more UNIQUE keys after the given key.
5303
last_uniq_key(Table *table, uint32_t keyno)
5305
while (++keyno < table->s->keys)
5306
if (table->key_info[keyno].flags & HA_NOSAME)
5312
Check if an error is a duplicate key error.
5314
This function is used to check if an error code is one of the
5315
duplicate key error, i.e., and error code for which it is sensible
5316
to do a <code>get_dup_key()</code> to retrieve the duplicate key.
5318
@param errcode The error code to check.
5320
@return <code>true</code> if the error code is such that
5321
<code>get_dup_key()</code> will return true, <code>false</code>
5325
is_duplicate_key_error(int errcode)
5329
case HA_ERR_FOUND_DUPP_KEY:
5330
case HA_ERR_FOUND_DUPP_UNIQUE:
5337
Write the current row into event's table.
5339
The row is located in the row buffer, pointed by @c m_curr_row member.
5340
Number of columns of the row is stored in @c m_width member (it can be
5341
different from the number of columns in the table to which we insert).
5342
Bitmap @c m_cols indicates which columns are present in the row. It is assumed
5343
that event's table is already open and pointed by @c m_table.
5345
If the same record already exists in the table it can be either overwritten
5346
or an error is reported depending on the value of @c overwrite flag
5347
(error reporting not yet implemented). Note that the matching record can be
5348
different from the row we insert if we use primary keys to identify records in
5351
The row to be inserted can contain values only for selected columns. The
5352
missing columns are filled with default values using @c prepare_record()
5353
function. If a matching record is found in the table and @c overwritte is
5354
true, the missing columns are taken from it.
5356
@param rli Relay log info (needed for row unpacking).
5358
Shall we overwrite if the row already exists or signal
5359
error (currently ignored).
5361
@returns Error code on failure, 0 on success.
5363
This method, if successful, sets @c m_curr_row_end pointer to point at the
5364
next row in the rows buffer. This is done when unpacking the row to be
5367
@note If a matching record is found, it is either updated using
5368
@c ha_update_row() or first deleted and then new record written.
5372
Rows_log_event::write_row(const Relay_log_info *const rli,
5373
const bool overwrite)
5375
assert(m_table != NULL && session != NULL);
5377
Table *table= m_table; // pointer to event's table
5380
basic_string<unsigned char> key;
5382
/* fill table->record[0] with default values */
5385
We only check if the columns have default values for non-NDB
5386
engines, for NDB we ignore the check since updates are sent as
5387
writes, causing errors when trying to prepare the record.
5389
TODO[ndb]: Elimiate this hard-coded dependency on NDB. Ideally,
5390
the engine should be able to set a flag that it want the default
5391
values filled in and one flag to handle the case that the default
5392
values should be checked. Maybe these two flags can be combined.
5394
if ((error= prepare_record(table, &m_cols, m_width, true)))
5397
/* unpack row into table->record[0] */
5398
error= unpack_current_row(rli, &m_cols);
5400
// Temporary fix to find out why it fails [/Matz]
5401
memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
5404
Try to write record. If a corresponding record already exists in the table,
5405
we try to change it using ha_update_row() if possible. Otherwise we delete
5406
it and repeat the whole process again.
5408
TODO: Add safety measures against infinite looping.
5411
while ((error= table->file->ha_write_row(table->record[0])))
5413
if (error == HA_ERR_LOCK_DEADLOCK ||
5414
error == HA_ERR_LOCK_WAIT_TIMEOUT ||
5415
(keynum= table->file->get_dup_key(error)) < 0 ||
5419
Deadlock, waiting for lock or just an error from the handler
5420
such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
5421
Retrieval of the duplicate key number may fail
5422
- either because the error was not "duplicate key" error
5423
- or because the information which key is not available
5425
table->file->print_error(error, MYF(0));
5429
We need to retrieve the old row into record[1] to be able to
5430
either update or delete the offending record. We either:
5432
- use rnd_pos() with a row-id (available as dupp_row) to the
5433
offending row, if that is possible (MyISAM and Blackhole), or else
5435
- use index_read_idx() with the key that is duplicated, to
5436
retrieve the offending row.
5438
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
5440
if (table->file->inited && (error= table->file->ha_index_end()))
5442
if ((error= table->file->ha_rnd_init(false)))
5445
error= table->file->rnd_pos(table->record[1], table->file->dup_ref);
5446
table->file->ha_rnd_end();
5449
table->file->print_error(error, MYF(0));
5455
if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
5460
key.reserve(table->s->max_unique_length);
5462
key_copy(key, table->record[0], table->key_info + keynum, 0);
5463
error= table->file->index_read_idx_map(table->record[1], keynum,
5469
table->file->print_error(error, MYF(0));
5475
Now, record[1] should contain the offending row. That
5476
will enable us to update it or, alternatively, delete it (so
5477
that we can insert the new row afterwards).
5481
If row is incomplete we will use the record found to fill
5484
if (!get_flags(COMPLETE_ROWS_F))
5486
restore_record(table,record[1]);
5487
error= unpack_current_row(rli, &m_cols);
5491
REPLACE is defined as either INSERT or DELETE + INSERT. If
5492
possible, we can replace it with an UPDATE, but that will not
5493
work on InnoDB if FOREIGN KEY checks are necessary.
5495
I (Matz) am not sure of the reason for the last_uniq_key()
5496
check as, but I'm guessing that it's something along the
5499
Suppose that we got the duplicate key to be a key that is not
5500
the last unique key for the table and we perform an update:
5501
then there might be another key for which the unique check will
5502
fail, so we're better off just deleting the row and inserting
5505
if (last_uniq_key(table, keynum) &&
5506
!table->file->referenced_by_foreign_key())
5508
error=table->file->ha_update_row(table->record[1],
5512
case HA_ERR_RECORD_IS_THE_SAME:
5519
table->file->print_error(error, MYF(0));
5526
if ((error= table->file->ha_delete_row(table->record[1])))
5528
table->file->print_error(error, MYF(0));
5531
/* Will retry ha_write_row() with the offending row removed. */
5539
int Rows_log_event::unpack_current_row(const Relay_log_info *const rli,
5540
MY_BITMAP const *cols)
5543
ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT);
5544
int const result= ::unpack_row(rli, m_table, m_width, m_curr_row, cols,
5545
&m_curr_row_end, &m_master_reclength);
5546
if (m_curr_row_end > m_rows_end)
5547
my_error(ER_SLAVE_CORRUPT_EVENT, MYF(0));
5548
ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT);
5554
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
5556
assert(m_table != NULL);
5558
write_row(rli, /* if 1 then overwrite */
5559
bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
5561
if (error && !session->is_error())
5564
my_error(ER_UNKNOWN_ERROR, MYF(0));
5571
/**************************************************************************
5572
Delete_rows_log_event member functions
5573
**************************************************************************/
5576
Compares table->record[0] and table->record[1]
5578
Returns TRUE if different.
5580
static bool record_compare(Table *table)
5583
Need to set the X bit and the filler bits in both records since
5584
there are engines that do not set it correctly.
5586
In addition, since MyISAM checks that one hasn't tampered with the
5587
record, it is necessary to restore the old bytes into the record
5588
after doing the comparison.
5590
TODO[record format ndb]: Remove it once NDB returns correct
5591
records. Check that the other engines also return correct records.
5594
unsigned char saved_x[2], saved_filler[2];
5596
if (table->s->null_bytes > 0)
5598
for (int i = 0 ; i < 2 ; ++i)
5600
saved_x[i]= table->record[i][0];
5601
saved_filler[i]= table->record[i][table->s->null_bytes - 1];
5602
table->record[i][0]|= 1U;
5603
table->record[i][table->s->null_bytes - 1]|=
5604
256U - (1U << table->s->last_null_bit_pos);
5608
if (table->s->blob_fields + table->s->varchar_fields == 0)
5610
result= cmp_record(table,record[1]);
5611
goto record_compare_exit;
5614
/* Compare null bits */
5615
if (memcmp(table->null_flags,
5616
table->null_flags+table->s->rec_buff_length,
5617
table->s->null_bytes))
5619
result= true; // Diff in NULL value
5620
goto record_compare_exit;
5623
/* Compare updated fields */
5624
for (Field **ptr=table->field ; *ptr ; ptr++)
5626
if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
5629
goto record_compare_exit;
5633
record_compare_exit:
5635
Restore the saved bytes.
5637
TODO[record format ndb]: Remove this code once NDB returns the
5638
correct record format.
5640
if (table->s->null_bytes > 0)
5642
for (int i = 0 ; i < 2 ; ++i)
5644
table->record[i][0]= saved_x[i];
5645
table->record[i][table->s->null_bytes - 1]= saved_filler[i];
5653
Locate the current row in event's table.
5655
The current row is pointed by @c m_curr_row. Member @c m_width tells how many
5656
columns are there in the row (this can be differnet from the number of columns
5657
in the table). It is assumed that event's table is already open and pointed
5660
If a corresponding record is found in the table it is stored in
5661
@c m_table->record[0]. Note that when record is located based on a primary
5662
key, it is possible that the record found differs from the row being located.
5664
If no key is specified or table does not have keys, a table scan is used to
5665
find the row. In that case the row should be complete and contain values for
5666
all columns. However, it can still be shorter than the table, i.e. the table
5667
can contain extra columns not present in the row. It is also possible that
5668
the table has fewer columns than the row being located.
5670
@returns Error code on failure, 0 on success.
5672
@post In case of success @c m_table->record[0] contains the record found.
5673
Also, the internal "cursor" of the table is positioned at the record found.
5675
@note If the engine allows random access of the records, a combination of
5676
@c position() and @c rnd_pos() will be used.
5679
int Rows_log_event::find_row(const Relay_log_info *rli)
5681
assert(m_table && m_table->in_use != NULL);
5683
Table *table= m_table;
5686
/* unpack row - missing fields get default values */
5687
prepare_record(table, &m_cols, m_width, false/* don't check errors */);
5688
error= unpack_current_row(rli, &m_cols);
5690
// Temporary fix to find out why it fails [/Matz]
5691
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
5693
if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
5694
table->s->primary_key < MAX_KEY)
5697
Use a more efficient method to fetch the record given by
5698
table->record[0] if the engine allows it. We first compute a
5699
row reference using the position() member function (it will be
5700
stored in table->file->ref) and the use rnd_pos() to position
5701
the "cursor" (i.e., record[0] in this case) at the correct row.
5703
TODO: Add a check that the correct record has been fetched by
5704
comparing with the original record. Take into account that the
5705
record on the master and slave can be of different
5706
length. Something along these lines should work:
5708
ADD>>> store_record(table,record[1]);
5709
int error= table->file->rnd_pos(table->record[0], table->file->ref);
5710
ADD>>> assert(memcmp(table->record[1], table->record[0],
5711
table->s->reclength) == 0);
5714
int error= table->file->rnd_pos_by_record(table->record[0]);
5715
table->file->ha_rnd_end();
5718
table->file->print_error(error, MYF(0));
5723
// We can't use position() - try other methods.
5726
Save copy of the record in table->record[1]. It might be needed
5727
later if linear search is used to find exact match.
5729
store_record(table,record[1]);
5731
if (table->s->keys > 0)
5733
/* We have a key: search the table using the index */
5734
if (!table->file->inited && (error= table->file->ha_index_init(0, false)))
5736
table->file->print_error(error, MYF(0));
5740
/* Fill key data for the row */
5743
key_copy(m_key, table->record[0], table->key_info, 0);
5746
We need to set the null bytes to ensure that the filler bit are
5747
all set when returning. There are storage engines that just set
5748
the necessary bits on the bytes and don't set the filler bits
5751
my_ptrdiff_t const pos=
5752
table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
5753
table->record[0][pos]= 0xFF;
5755
if ((error= table->file->index_read_map(table->record[0], m_key,
5757
HA_READ_KEY_EXACT)))
5759
table->file->print_error(error, MYF(0));
5760
table->file->ha_index_end();
5765
Below is a minor "optimization". If the key (i.e., key number
5766
0) has the HA_NOSAME flag set, we know that we have found the
5767
correct record (since there can be no duplicates); otherwise, we
5768
have to compare the record with the one found to see if it is
5771
CAVEAT! This behaviour is essential for the replication of,
5772
e.g., the mysql.proc table since the correct record *shall* be
5773
found using the primary key *only*. There shall be no
5774
comparison of non-PK columns to decide if the correct record is
5775
found. I can see no scenario where it would be incorrect to
5776
chose the row to change only using a PK or an UNNI.
5778
if (table->key_info->flags & HA_NOSAME)
5780
table->file->ha_index_end();
5785
In case key is not unique, we still have to iterate over records found
5786
and find the one which is identical to the row given. A copy of the
5787
record we are looking for is stored in record[1].
5789
while (record_compare(table))
5792
We need to set the null bytes to ensure that the filler bit
5793
are all set when returning. There are storage engines that
5794
just set the necessary bits on the bytes and don't set the
5795
filler bits correctly.
5797
TODO[record format ndb]: Remove this code once NDB returns the
5798
correct record format.
5800
if (table->s->null_bytes > 0)
5802
table->record[0][table->s->null_bytes - 1]|=
5803
256U - (1U << table->s->last_null_bit_pos);
5806
if ((error= table->file->index_next(table->record[0])))
5808
table->file->print_error(error, MYF(0));
5809
table->file->ha_index_end();
5815
Have to restart the scan to be able to fetch the next row.
5817
table->file->ha_index_end();
5821
int restart_count= 0; // Number of times scanning has restarted from top
5823
/* We don't have a key: search the table using rnd_next() */
5824
if ((error= table->file->ha_rnd_init(1)))
5826
table->file->print_error(error, MYF(0));
5830
/* Continue until we find the right record or have made a full loop */
5833
error= table->file->rnd_next(table->record[0]);
5838
case HA_ERR_RECORD_DELETED:
5841
case HA_ERR_END_OF_FILE:
5842
if (++restart_count < 2)
5843
table->file->ha_rnd_init(1);
5847
table->file->print_error(error, MYF(0));
5848
table->file->ha_rnd_end();
5852
while (restart_count < 2 && record_compare(table));
5855
Note: above record_compare will take into accout all record fields
5856
which might be incorrect in case a partial row was given in the event
5858
table->file->ha_rnd_end();
5860
assert(error == HA_ERR_END_OF_FILE || error == HA_ERR_RECORD_DELETED || error == 0);
5864
table->default_column_bitmaps();
5867
table->default_column_bitmaps();
5873
Constructor used to build an event for writing to the binary log.
5876
Delete_rows_log_event::Delete_rows_log_event(Session *session_arg, Table *tbl_arg,
5878
bool is_transactional)
5879
: Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
5884
Constructor used by slave to read the event from the binary log.
5886
Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint32_t event_len,
5887
const Format_description_log_event
5889
: Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
5895
Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
5897
if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
5898
m_table->s->primary_key < MAX_KEY)
5901
We don't need to allocate any memory for m_key since it is not used.
5906
if (m_table->s->keys > 0)
5908
// Allocate buffer for key searches
5909
m_key= (unsigned char*)malloc(m_table->key_info->key_length);
5911
return HA_ERR_OUT_OF_MEM;
5918
Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
5921
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
5922
m_table->file->ha_index_or_rnd_end();
5929
int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
5932
assert(m_table != NULL);
5934
if (!(error= find_row(rli)))
5937
Delete the record found, located in record[0]
5939
error= m_table->file->ha_delete_row(m_table->record[0]);
5945
/**************************************************************************
5946
Update_rows_log_event member functions
5947
**************************************************************************/
5950
Constructor used to build an event for writing to the binary log.
5952
Update_rows_log_event::Update_rows_log_event(Session *session_arg, Table *tbl_arg,
5954
bool is_transactional)
5955
: Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
5957
init(tbl_arg->write_set);
5960
void Update_rows_log_event::init(MY_BITMAP const *cols)
5962
/* if bitmap_init fails, caught in is_valid() */
5963
if (likely(!bitmap_init(&m_cols_ai,
5964
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
5968
/* Cols can be zero if this is a dummy binrows event */
5969
if (likely(cols != NULL))
5971
memcpy(m_cols_ai.bitmap, cols->bitmap, no_bytes_in_map(cols));
5972
create_last_word_mask(&m_cols_ai);
5978
Update_rows_log_event::~Update_rows_log_event()
5980
if (m_cols_ai.bitmap == m_bitbuf_ai) // no malloc happened
5981
m_cols_ai.bitmap= 0; // so no free in bitmap_free
5982
bitmap_free(&m_cols_ai); // To pair with bitmap_init().
5987
Constructor used by slave to read the event from the binary log.
5989
Update_rows_log_event::Update_rows_log_event(const char *buf, uint32_t event_len,
5991
Format_description_log_event
5993
: Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
5999
Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6001
if (m_table->s->keys > 0)
6003
// Allocate buffer for key searches
6004
m_key= (unsigned char*)malloc(m_table->key_info->key_length);
6006
return HA_ERR_OUT_OF_MEM;
6009
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
6015
Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6018
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6019
m_table->file->ha_index_or_rnd_end();
6020
free(m_key); // Free for multi_malloc
6027
Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6029
assert(m_table != NULL);
6031
int error= find_row(rli);
6035
We need to read the second image in the event of error to be
6036
able to skip to the next pair of updates
6038
m_curr_row= m_curr_row_end;
6039
unpack_current_row(rli, &m_cols_ai);
6044
This is the situation after locating BI:
6046
===|=== before image ====|=== after image ===|===
6048
m_curr_row m_curr_row_end
6050
BI found in the table is stored in record[0]. We copy it to record[1]
6051
and unpack AI to record[0].
6054
store_record(m_table,record[1]);
6056
m_curr_row= m_curr_row_end;
6057
error= unpack_current_row(rli, &m_cols_ai); // this also updates m_curr_row_end
6060
Now we have the right row to update. The old row (the one we're
6061
looking for) is in record[1] and the new row is in record[0].
6064
// Temporary fix to find out why it fails [/Matz]
6065
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
6066
memcpy(m_table->write_set->bitmap, m_cols_ai.bitmap, (m_table->write_set->n_bits + 7) / 8);
6068
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
6069
if (error == HA_ERR_RECORD_IS_THE_SAME)
6076
Incident_log_event::Incident_log_event(const char *buf, uint32_t event_len,
6077
const Format_description_log_event *descr_event)
6078
: Log_event(buf, descr_event)
6080
uint8_t const common_header_len=
6081
descr_event->common_header_len;
6082
uint8_t const post_header_len=
6083
descr_event->post_header_len[INCIDENT_EVENT-1];
6085
m_incident= static_cast<Incident>(uint2korr(buf + common_header_len));
6086
char const *ptr= buf + common_header_len + post_header_len;
6087
char const *const str_end= buf + event_len;
6088
uint8_t len= 0; // Assignment to keep compiler happy
6089
const char *str= NULL; // Assignment to keep compiler happy
6090
read_str(&ptr, str_end, &str, &len);
6091
m_message.str= const_cast<char*>(str);
6092
m_message.length= len;
6097
Incident_log_event::~Incident_log_event()
6103
Incident_log_event::description() const
6105
static const char *const description[]= {
6106
"NOTHING", // Not used
6110
assert(0 <= m_incident);
6111
assert((size_t) m_incident <= sizeof(description)/sizeof(*description));
6113
return description[m_incident];
6117
void Incident_log_event::pack_info(Protocol *protocol)
6121
if (m_message.length > 0)
6122
bytes= snprintf(buf, sizeof(buf), "#%d (%s)",
6123
m_incident, description());
6125
bytes= snprintf(buf, sizeof(buf), "#%d (%s): %s",
6126
m_incident, description(), m_message.str);
6127
protocol->store(buf, bytes, &my_charset_bin);
6132
Incident_log_event::do_apply_event(Relay_log_info const *rli)
6134
rli->report(ERROR_LEVEL, ER_SLAVE_INCIDENT,
6135
ER(ER_SLAVE_INCIDENT),
6137
m_message.length > 0 ? m_message.str : "<none>");
6143
Incident_log_event::write_data_header(IO_CACHE *file)
6145
unsigned char buf[sizeof(int16_t)];
6146
int2store(buf, (int16_t) m_incident);
6147
return(my_b_safe_write(file, buf, sizeof(buf)));
6151
Incident_log_event::write_data_body(IO_CACHE *file)
6153
return(write_str(file, m_message.str, m_message.length));
6156
Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint32_t event_len,
6157
const Format_description_log_event* description_event)
6158
:Log_event(buf, description_event)
6160
uint8_t header_size= description_event->common_header_len;
6161
ident_len = event_len - header_size;
6162
set_if_smaller(ident_len,FN_REFLEN-1);
6163
log_ident= buf + header_size;