1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008 Sun Microsystems
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; version 2 of the License.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20
#include <drizzled/server_includes.h>
21
#include <drizzled/log_event.h>
22
#include <drizzled/replication/rli.h>
23
#include <drizzled/replication/mi.h>
24
#include <libdrizzle/libdrizzle.h>
25
#include <mysys/hash.h>
26
#include <drizzled/replication/utility.h>
27
#include <drizzled/replication/record.h>
28
#include <mysys/my_dir.h>
29
#include <drizzled/error.h>
30
#include <libdrizzle/pack.h>
31
#include <drizzled/sql_parse.h>
32
#include <drizzled/sql_base.h>
33
#include <drizzled/sql_load.h>
34
#include <drizzled/item/return_int.h>
35
#include <drizzled/item/empty_string.h>
40
#include <mysys/base64.h>
41
#include <mysys/my_bitmap.h>
43
#include <drizzled/gettext.h>
44
#include <libdrizzle/libdrizzle.h>
45
#include <drizzled/error.h>
46
#include <drizzled/query_id.h>
47
#include <drizzled/tztime.h>
48
#include <drizzled/slave.h>
49
#include <drizzled/lock.h>
53
static const char *HA_ERR(int i)
56
case HA_ERR_KEY_NOT_FOUND: return "HA_ERR_KEY_NOT_FOUND";
57
case HA_ERR_FOUND_DUPP_KEY: return "HA_ERR_FOUND_DUPP_KEY";
58
case HA_ERR_RECORD_CHANGED: return "HA_ERR_RECORD_CHANGED";
59
case HA_ERR_WRONG_INDEX: return "HA_ERR_WRONG_INDEX";
60
case HA_ERR_CRASHED: return "HA_ERR_CRASHED";
61
case HA_ERR_WRONG_IN_RECORD: return "HA_ERR_WRONG_IN_RECORD";
62
case HA_ERR_OUT_OF_MEM: return "HA_ERR_OUT_OF_MEM";
63
case HA_ERR_NOT_A_TABLE: return "HA_ERR_NOT_A_TABLE";
64
case HA_ERR_WRONG_COMMAND: return "HA_ERR_WRONG_COMMAND";
65
case HA_ERR_OLD_FILE: return "HA_ERR_OLD_FILE";
66
case HA_ERR_NO_ACTIVE_RECORD: return "HA_ERR_NO_ACTIVE_RECORD";
67
case HA_ERR_RECORD_DELETED: return "HA_ERR_RECORD_DELETED";
68
case HA_ERR_RECORD_FILE_FULL: return "HA_ERR_RECORD_FILE_FULL";
69
case HA_ERR_INDEX_FILE_FULL: return "HA_ERR_INDEX_FILE_FULL";
70
case HA_ERR_END_OF_FILE: return "HA_ERR_END_OF_FILE";
71
case HA_ERR_UNSUPPORTED: return "HA_ERR_UNSUPPORTED";
72
case HA_ERR_TO_BIG_ROW: return "HA_ERR_TO_BIG_ROW";
73
case HA_WRONG_CREATE_OPTION: return "HA_WRONG_CREATE_OPTION";
74
case HA_ERR_FOUND_DUPP_UNIQUE: return "HA_ERR_FOUND_DUPP_UNIQUE";
75
case HA_ERR_UNKNOWN_CHARSET: return "HA_ERR_UNKNOWN_CHARSET";
76
case HA_ERR_WRONG_MRG_TABLE_DEF: return "HA_ERR_WRONG_MRG_TABLE_DEF";
77
case HA_ERR_CRASHED_ON_REPAIR: return "HA_ERR_CRASHED_ON_REPAIR";
78
case HA_ERR_CRASHED_ON_USAGE: return "HA_ERR_CRASHED_ON_USAGE";
79
case HA_ERR_LOCK_WAIT_TIMEOUT: return "HA_ERR_LOCK_WAIT_TIMEOUT";
80
case HA_ERR_LOCK_TABLE_FULL: return "HA_ERR_LOCK_TABLE_FULL";
81
case HA_ERR_READ_ONLY_TRANSACTION: return "HA_ERR_READ_ONLY_TRANSACTION";
82
case HA_ERR_LOCK_DEADLOCK: return "HA_ERR_LOCK_DEADLOCK";
83
case HA_ERR_CANNOT_ADD_FOREIGN: return "HA_ERR_CANNOT_ADD_FOREIGN";
84
case HA_ERR_NO_REFERENCED_ROW: return "HA_ERR_NO_REFERENCED_ROW";
85
case HA_ERR_ROW_IS_REFERENCED: return "HA_ERR_ROW_IS_REFERENCED";
86
case HA_ERR_NO_SAVEPOINT: return "HA_ERR_NO_SAVEPOINT";
87
case HA_ERR_NON_UNIQUE_BLOCK_SIZE: return "HA_ERR_NON_UNIQUE_BLOCK_SIZE";
88
case HA_ERR_NO_SUCH_TABLE: return "HA_ERR_NO_SUCH_TABLE";
89
case HA_ERR_TABLE_EXIST: return "HA_ERR_TABLE_EXIST";
90
case HA_ERR_NO_CONNECTION: return "HA_ERR_NO_CONNECTION";
91
case HA_ERR_NULL_IN_SPATIAL: return "HA_ERR_NULL_IN_SPATIAL";
92
case HA_ERR_TABLE_DEF_CHANGED: return "HA_ERR_TABLE_DEF_CHANGED";
93
case HA_ERR_NO_PARTITION_FOUND: return "HA_ERR_NO_PARTITION_FOUND";
94
case HA_ERR_RBR_LOGGING_FAILED: return "HA_ERR_RBR_LOGGING_FAILED";
95
case HA_ERR_DROP_INDEX_FK: return "HA_ERR_DROP_INDEX_FK";
96
case HA_ERR_FOREIGN_DUPLICATE_KEY: return "HA_ERR_FOREIGN_DUPLICATE_KEY";
97
case HA_ERR_TABLE_NEEDS_UPGRADE: return "HA_ERR_TABLE_NEEDS_UPGRADE";
98
case HA_ERR_TABLE_READONLY: return "HA_ERR_TABLE_READONLY";
99
case HA_ERR_AUTOINC_READ_FAILED: return "HA_ERR_AUTOINC_READ_FAILED";
100
case HA_ERR_AUTOINC_ERANGE: return "HA_ERR_AUTOINC_ERANGE";
101
case HA_ERR_GENERIC: return "HA_ERR_GENERIC";
102
case HA_ERR_RECORD_IS_THE_SAME: return "HA_ERR_RECORD_IS_THE_SAME";
103
case HA_ERR_LOGGING_IMPOSSIBLE: return "HA_ERR_LOGGING_IMPOSSIBLE";
104
case HA_ERR_CORRUPT_EVENT: return "HA_ERR_CORRUPT_EVENT";
105
case HA_ERR_ROWS_EVENT_APPLY : return "HA_ERR_ROWS_EVENT_APPLY";
111
Error reporting facility for Rows_log_event::do_apply_event
113
@param level error, warning or info
114
@param ha_error HA_ERR_ code
115
@param rli pointer to the active Relay_log_info instance
116
@param session pointer to the slave thread's session
117
@param table pointer to the event's table object
118
@param type the type of the event
119
@param log_name the master binlog file name
120
@param pos the master binlog file pos (the next after the event)
123
static void inline slave_rows_error_report(enum loglevel level, int ha_error,
124
Relay_log_info const *rli, Session *session,
125
Table *table, const char * type,
126
const char *log_name, ulong pos)
128
const char *handler_error= HA_ERR(ha_error);
129
char buff[MAX_SLAVE_ERRMSG], *slider;
130
const char *buff_end= buff + sizeof(buff);
132
List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
136
for (err= it++, slider= buff; err && slider < buff_end - 1;
137
slider += len, err= it++)
139
len= snprintf(slider, buff_end - slider,
140
_(" %s, Error_code: %d;"), err->msg, err->code);
143
rli->report(level, session->is_error()? session->main_da.sql_errno() : 0,
144
_("Could not execute %s event on table %s.%s;"
145
"%s handler error %s; "
146
"the event's master log %s, end_log_pos %lu"),
147
type, table->s->db.str,
148
table->s->table_name.str,
150
handler_error == NULL? _("<unknown>") : handler_error,
156
Cache that will automatically be written to a dedicated file on
162
class Write_on_release_cache
170
typedef unsigned short flag_set;
176
Write_on_release_cache
177
cache Pointer to cache to use
178
file File to write cache to upon destruction
179
flags Flags for the cache
183
Class used to guarantee copy of cache to file before exiting the
184
current block. On successful copy of the cache, the cache will
185
be reinited as a WRITE_CACHE.
187
Currently, a pointer to the cache is provided in the
188
constructor, but it would be possible to create a subclass
189
holding the IO_CACHE itself.
191
Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0)
192
: m_cache(cache), m_file(file), m_flags(flags)
194
reinit_io_cache(m_cache, WRITE_CACHE, 0L, false, true);
197
~Write_on_release_cache()
199
copy_event_cache_to_file_and_reinit(m_cache, m_file);
200
if (m_flags | FLUSH_F)
205
Return a pointer to the internal IO_CACHE.
212
Function to return a pointer to the internal cache, so that the
213
object can be treated as a IO_CACHE and used with the my_b_*
217
A pointer to the internal IO_CACHE.
219
IO_CACHE *operator&()
225
// Hidden, to prevent usage.
226
Write_on_release_cache(Write_on_release_cache const&);
233
uint32_t debug_not_change_ts_if_art_event= 1; // bug#29309 simulation
239
static void clear_all_errors(Session *session, Relay_log_info *rli)
241
session->is_slave_error = 0;
242
session->clear_error();
248
Ignore error code specified on command line.
251
inline int ignored_error_code(int err_code)
253
return ((err_code == ER_SLAVE_IGNORED_TABLE) ||
254
(use_slave_mask && bitmap_is_set(&slave_error_mask, err_code)));
262
static char *pretty_print_str(char *packet, const char *str, int len)
264
const char *end= str + len;
270
switch ((c=*str++)) {
271
case '\n': *pos++= '\\'; *pos++= 'n'; break;
272
case '\r': *pos++= '\\'; *pos++= 'r'; break;
273
case '\\': *pos++= '\\'; *pos++= '\\'; break;
274
case '\b': *pos++= '\\'; *pos++= 'b'; break;
275
case '\t': *pos++= '\\'; *pos++= 't'; break;
276
case '\'': *pos++= '\\'; *pos++= '\''; break;
277
case 0 : *pos++= '\\'; *pos++= '0'; break;
289
Creates a temporary name for load data infile:.
291
@param buf Store new filename here
292
@param file_id File_id (part of file name)
293
@param event_server_id Event_id (part of file name)
294
@param ext Extension for file name
297
Pointer to start of extension
300
static char *slave_load_file_stem(char *buf, uint32_t file_id,
301
int event_server_id, const char *ext)
304
fn_format(buf,"SQL_LOAD-",slave_load_tmpdir, "", MY_UNPACK_FILENAME);
306
buf= strchr(buf, '\0');
307
buf= int10_to_str(::server_id, buf, 10);
309
buf= int10_to_str(event_server_id, buf, 10);
311
res= int10_to_str(file_id, buf, 10);
312
strcpy(res, ext); // Add extension last
313
return res; // Pointer to extension
318
Delete all temporary files used for SQL_LOAD.
321
static void cleanup_load_tmpdir()
326
char fname[FN_REFLEN], prefbuf[31], *p;
328
if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
332
When we are deleting temporary files, we should only remove
333
the files associated with the server id of our server.
334
We don't use event_server_id here because since we've disabled
335
direct binlogging of Create_file/Append_file/Exec_load events
336
we cannot meet Start_log event in the middle of events from one
339
p= strncpy(prefbuf, STRING_WITH_LEN("SQL_LOAD-")) + 9;
340
p= int10_to_str(::server_id, p, 10);
344
for (i=0 ; i < (uint)dirp->number_off_files; i++)
346
file=dirp->dir_entry+i;
347
if (is_prefix(file->name, prefbuf))
349
fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME);
350
my_delete(fname, MYF(0));
362
static bool write_str(IO_CACHE *file, const char *str, uint32_t length)
364
unsigned char tmp[1];
365
tmp[0]= (unsigned char) length;
366
return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
367
my_b_safe_write(file, (unsigned char*) str, length));
375
static inline int read_str(const char **buf, const char *buf_end,
376
const char **str, uint8_t *len)
378
if (*buf + ((uint) (unsigned char) **buf) >= buf_end)
380
*len= (uint8_t) **buf;
382
(*buf)+= (uint) *len+1;
388
Transforms a string into "" or its expression in 0x... form.
391
char *str_to_hex(char *to, const char *from, uint32_t len)
397
to= octet2hex(to, from, len);
400
to= strcpy(to, "\"\"")+2;
401
return to; // pointer to end 0 of 'to'
406
Append a version of the 'from' string suitable for use in a query to
407
the 'to' string. To generate a correct escaping, the character set
408
information in 'csinfo' is used.
412
append_query_string(const CHARSET_INFO * const csinfo,
413
String const *from, String *to)
416
uint32_t const orig_len= to->length();
417
if (to->reserve(orig_len + from->length()*2+3))
420
beg= to->c_ptr_quick() + to->length();
422
if (csinfo->escape_with_backslash_is_dangerous)
423
ptr= str_to_hex(ptr, from->ptr(), from->length());
427
ptr+= drizzle_escape_string(ptr, from->ptr(), from->length());
430
to->length(orig_len + ptr - beg);
435
/**************************************************************************
436
Log_event methods (= the parent class of all events)
437
**************************************************************************/
441
returns the human readable name of the event's type
444
const char* Log_event::get_type_str(Log_event_type type)
447
case START_EVENT_V3: return "Start_v3";
448
case STOP_EVENT: return "Stop";
449
case QUERY_EVENT: return "Query";
450
case ROTATE_EVENT: return "Rotate";
451
case LOAD_EVENT: return "Load";
452
case NEW_LOAD_EVENT: return "New_load";
453
case SLAVE_EVENT: return "Slave";
454
case CREATE_FILE_EVENT: return "Create_file";
455
case APPEND_BLOCK_EVENT: return "Append_block";
456
case DELETE_FILE_EVENT: return "Delete_file";
457
case EXEC_LOAD_EVENT: return "Exec_load";
458
case XID_EVENT: return "Xid";
459
case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
460
case TABLE_MAP_EVENT: return "Table_map";
461
case WRITE_ROWS_EVENT: return "Write_rows";
462
case UPDATE_ROWS_EVENT: return "Update_rows";
463
case DELETE_ROWS_EVENT: return "Delete_rows";
464
case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
465
case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
466
case INCIDENT_EVENT: return "Incident";
467
default: return "Unknown"; /* impossible */
471
const char* Log_event::get_type_str()
473
return get_type_str(get_type_code());
478
Log_event::Log_event()
481
Log_event::Log_event(Session* session_arg, uint16_t flags_arg, bool using_trans)
482
:log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), session(session_arg)
484
server_id= session->server_id;
485
when= session->start_time;
486
cache_stmt= using_trans;
491
This minimal constructor is for when you are not even sure that there
492
is a valid Session. For example in the server when we are shutting down or
493
flushing logs after receiving a SIGHUP (then we must write a Rotate to
494
the binlog but we have no Session, so we need this minimal constructor).
497
Log_event::Log_event()
498
:temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
501
server_id= ::server_id;
503
We can't call time() here as this would cause a call before
512
Log_event::Log_event()
515
Log_event::Log_event(const char* buf,
516
const Format_description_log_event* description_event)
517
:temp_buf(0), cache_stmt(0)
520
when= uint4korr(buf);
521
server_id= uint4korr(buf + SERVER_ID_OFFSET);
522
data_written= uint4korr(buf + EVENT_LEN_OFFSET);
523
if (description_event->binlog_version==1)
530
log_pos= uint4korr(buf + LOG_POS_OFFSET);
532
If the log is 4.0 (so here it can only be a 4.0 relay log read by
533
the SQL thread or a 4.0 master binlog read by the I/O thread),
534
log_pos is the beginning of the event: we transform it into the end
535
of the event, which is more useful.
536
But how do you know that the log is 4.0: you know it if
537
description_event is version 3 *and* you are not reading a
538
Format_desc (remember that mysqlbinlog starts by assuming that 5.0
539
logs are in 4.0 format, until it finds a Format_desc).
541
if (description_event->binlog_version==3 &&
542
buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
545
If log_pos=0, don't change it. log_pos==0 is a marker to mean
546
"don't change rli->group_master_log_pos" (see
547
inc_group_relay_log_pos()). As it is unreal log_pos, adding the
548
event len's is nonsense. For example, a fake Rotate event should
549
not have its log_pos (which is 0) changed or it will modify
550
Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense
551
value of (a non-zero offset which does not exist in the master's
552
binlog, so which will cause problems if the user uses this value
555
log_pos+= data_written; /* purecov: inspected */
558
flags= uint2korr(buf + FLAGS_OFFSET);
559
if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
560
(buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
563
These events always have a header which stops here (i.e. their
567
Initialization to zero of all other Log_event members as they're
568
not specified. Currently there are no such members; in the future
569
there will be an event UID (but Format_description and Rotate
570
don't need this UID, as they are not propagated through
571
--log-slave-updates (remember the UID is used to not play a query
572
twice when you have two masters which are slaves of a 3rd master).
577
/* otherwise, go on with reading the header from buf (nothing now) */
581
int Log_event::do_update_pos(Relay_log_info *rli)
584
rli is null when (as far as I (Guilhem) know) the caller is
585
Load_log_event::do_apply_event *and* that one is called from
586
Execute_load_log_event::do_apply_event. In this case, we don't
587
do anything here ; Execute_load_log_event::do_apply_event will
588
call Log_event::do_apply_event again later with the proper rli.
589
Strictly speaking, if we were sure that rli is null only in the
590
case discussed above, 'if (rli)' is useless here. But as we are
591
not 100% sure, keep it for now.
593
Matz: I don't think we will need this check with this refactoring.
598
bug#29309 simulation: resetting the flag to force
599
wrong behaviour of artificial event to update
600
rli->last_master_timestamp for only one time -
601
the first FLUSH LOGS in the test.
603
if (debug_not_change_ts_if_art_event == 1
604
&& is_artificial_event())
605
debug_not_change_ts_if_art_event= 0;
606
rli->stmt_done(log_pos,
607
is_artificial_event() &&
608
debug_not_change_ts_if_art_event > 0 ? 0 : when);
609
if (debug_not_change_ts_if_art_event == 0)
610
debug_not_change_ts_if_art_event= 2;
612
return 0; // Cannot fail currently
616
Log_event::enum_skip_reason
617
Log_event::do_shall_skip(Relay_log_info *rli)
619
if ((server_id == ::server_id && !rli->replicate_same_server_id) || (rli->slave_skip_counter == 1 && rli->is_in_group()))
620
return EVENT_SKIP_IGNORE;
621
else if (rli->slave_skip_counter > 0)
622
return EVENT_SKIP_COUNT;
624
return EVENT_SKIP_NOT;
629
Log_event::pack_info()
632
void Log_event::pack_info(Protocol *protocol)
634
protocol->store("", &my_charset_bin);
638
const char* Log_event::get_db()
640
return session ? session->db : 0;
645
init_show_field_list() prepares the column names and types for the
646
output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
650
void Log_event::init_show_field_list(List<Item>* field_list)
652
field_list->push_back(new Item_empty_string("Log_name", 20));
653
field_list->push_back(new Item_return_int("Pos", MY_INT32_NUM_DECIMAL_DIGITS,
654
DRIZZLE_TYPE_LONGLONG));
655
field_list->push_back(new Item_empty_string("Event_type", 20));
656
field_list->push_back(new Item_return_int("Server_id", 10,
658
field_list->push_back(new Item_return_int("End_log_pos",
659
MY_INT32_NUM_DECIMAL_DIGITS,
660
DRIZZLE_TYPE_LONGLONG));
661
field_list->push_back(new Item_empty_string("Info", 20));
668
bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
670
unsigned char header[LOG_EVENT_HEADER_LEN];
673
/* Store number of bytes that will be written by this event */
674
data_written= event_data_length + sizeof(header);
677
log_pos != 0 if this is relay-log event. In this case we should not
681
if (is_artificial_event())
684
We should not do any cleanup on slave when reading this. We
685
mark this by setting log_pos to 0. Start_log_event_v3() will
686
detect this on reading and set artificial_event=1 for the event.
693
Calculate position of end of event
695
Note that with a SEQ_READ_APPEND cache, my_b_tell() does not
696
work well. So this will give slightly wrong positions for the
697
Format_desc/Rotate/Stop events which the slave writes to its
698
relay log. For example, the initial Format_desc will have
699
end_log_pos=91 instead of 95. Because after writing the first 4
700
bytes of the relay log, my_b_tell() still reports 0. Because
701
my_b_append() does not update the counter which my_b_tell()
702
later uses (one should probably use my_b_append_tell() to work
703
around this). To get right positions even when writing to the
704
relay log, we use the (new) my_b_safe_tell().
706
Note that this raises a question on the correctness of all these
707
assert(my_b_tell()=rli->event_relay_log_pos).
709
If in a transaction, the log_pos which we calculate below is not
710
very good (because then my_b_safe_tell() returns start position
711
of the BEGIN, so it's like the statement was at the BEGIN's
712
place), but it's not a very serious problem (as the slave, when
713
it is in a transaction, does not take those end_log_pos into
714
account (as it calls inc_event_relay_log_pos()). To be fixed
715
later, so that it looks less strange. But not bug.
718
log_pos= my_b_safe_tell(file)+data_written;
721
now= (ulong) get_time(); // Query start time
724
Header will be of size LOG_EVENT_HEADER_LEN for all events, except for
725
FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT, where it will be
726
LOG_EVENT_MINIMAL_HEADER_LEN (remember these 2 have a frozen header,
727
because we read them before knowing the format).
730
int4store(header, now); // timestamp
731
header[EVENT_TYPE_OFFSET]= get_type_code();
732
int4store(header+ SERVER_ID_OFFSET, server_id);
733
int4store(header+ EVENT_LEN_OFFSET, data_written);
734
int4store(header+ LOG_POS_OFFSET, log_pos);
735
int2store(header+ FLAGS_OFFSET, flags);
737
return(my_b_safe_write(file, header, sizeof(header)) != 0);
741
time_t Log_event::get_time()
743
Session *tmp_session;
747
return session->start_time;
748
if ((tmp_session= current_session))
749
return tmp_session->start_time;
755
This needn't be format-tolerant, because we only read
756
LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length).
759
int Log_event::read_log_event(IO_CACHE* file, String* packet,
760
pthread_mutex_t* log_lock)
764
char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
767
pthread_mutex_lock(log_lock);
768
if (my_b_read(file, (unsigned char*) buf, sizeof(buf)))
771
If the read hits eof, we must report it as eof so the caller
772
will know it can go into cond_wait to be woken up on the next
776
result= LOG_READ_EOF;
778
result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
781
data_len= uint4korr(buf + EVENT_LEN_OFFSET);
782
if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
783
data_len > current_session->variables.max_allowed_packet)
785
result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
790
/* Append the log event header to packet */
791
if (packet->append(buf, sizeof(buf)))
793
/* Failed to allocate packet */
794
result= LOG_READ_MEM;
797
data_len-= LOG_EVENT_MINIMAL_HEADER_LEN;
800
/* Append rest of event, read directly from file into packet */
801
if (packet->append(file, data_len))
804
Fatal error occured when appending rest of the event
805
to packet, possible failures:
806
1. EOF occured when reading from file, it's really an error
807
as data_len is >=0 there's supposed to be more bytes available.
808
file->error will have been set to number of bytes left to read
809
2. Read was interrupted, file->error would normally be set to -1
810
3. Failed to allocate memory for packet, my_errno
811
will be ENOMEM(file->error shuold be 0, but since the
812
memory allocation occurs before the call to read it might
815
result= (my_errno == ENOMEM ? LOG_READ_MEM :
816
(file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
817
/* Implicit goto end; */
823
pthread_mutex_unlock(log_lock);
827
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
828
#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
832
Allocates memory; The caller is responsible for clean-up.
834
Log_event* Log_event::read_log_event(IO_CACHE* file,
835
pthread_mutex_t* log_lock,
836
const Format_description_log_event
839
assert(description_event != 0);
840
char head[LOG_EVENT_MINIMAL_HEADER_LEN];
842
First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
843
check the event for sanity and to know its length; no need to really parse
844
it. We say "at most" because this could be a 3.23 master, which has header
845
of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's
846
"minimal" over the set {MySQL >=4.0}).
848
uint32_t header_size= cmin(description_event->common_header_len,
849
LOG_EVENT_MINIMAL_HEADER_LEN);
852
if (my_b_read(file, (unsigned char *) head, header_size))
856
No error here; it could be that we are at the file's end. However
857
if the next my_b_read() fails (below), it will be an error as we
858
were able to read the first bytes.
862
uint32_t data_len = uint4korr(head + EVENT_LEN_OFFSET);
864
const char *error= 0;
866
#ifndef max_allowed_packet
867
Session *session=current_session;
868
uint32_t max_allowed_packet= session ? session->variables.max_allowed_packet : ~(ulong)0;
871
if (data_len > max_allowed_packet)
873
error = "Event too big";
877
if (data_len < header_size)
879
error = "Event too small";
883
// some events use the extra byte to null-terminate strings
884
if (!(buf = (char*) malloc(data_len+1)))
886
error = "Out of memory";
890
memcpy(buf, head, header_size);
891
if (my_b_read(file, (unsigned char*) buf + header_size, data_len - header_size))
893
error = "read error";
896
if ((res= read_log_event(buf, data_len, &error, description_event)))
897
res->register_temp_buf(buf);
904
errmsg_printf(ERRMSG_LVL_ERROR, _("Error in Log_event::read_log_event(): "
905
"'%s', data_len: %d, event_type: %d"),
906
error,data_len,head[EVENT_TYPE_OFFSET]);
909
The SQL slave thread will check if file->error<0 to know
910
if there was an I/O error. Even if there is no "low-level" I/O errors
911
with 'file', any of the high-level above errors is worrying
912
enough to stop the SQL thread now ; as we are skipping the current event,
913
going on with reading and successfully executing other events can
914
only corrupt the slave's databases. So stop.
923
Binlog format tolerance is in (buf, event_len, description_event)
927
Log_event* Log_event::read_log_event(const char* buf, uint32_t event_len,
929
const Format_description_log_event *description_event)
932
assert(description_event != 0);
934
/* Check the integrity */
935
if (event_len < EVENT_LEN_OFFSET ||
936
buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
937
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
939
*error="Sanity check failed"; // Needed to free buffer
940
return(NULL); // general sanity check - will fail on a partial read
943
uint32_t event_type= buf[EVENT_TYPE_OFFSET];
944
if (event_type > description_event->number_of_event_types &&
945
event_type != FORMAT_DESCRIPTION_EVENT)
948
It is unsafe to use the description_event if its post_header_len
949
array does not include the event type.
956
In some previuos versions (see comment in
957
Format_description_log_event::Format_description_log_event(char*,...)),
958
event types were assigned different id numbers than in the
959
present version. In order to replicate from such versions to the
960
present version, we must map those event type id's to our event
961
type id's. The mapping is done with the event_type_permutation
962
array, which was set up when the Format_description_log_event
965
if (description_event->event_type_permutation)
966
event_type= description_event->event_type_permutation[event_type];
970
ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
973
ev = new Load_log_event(buf, event_len, description_event);
976
ev = new Load_log_event(buf, event_len, description_event);
979
ev = new Rotate_log_event(buf, event_len, description_event);
981
case CREATE_FILE_EVENT:
982
ev = new Create_file_log_event(buf, event_len, description_event);
984
case APPEND_BLOCK_EVENT:
985
ev = new Append_block_log_event(buf, event_len, description_event);
987
case DELETE_FILE_EVENT:
988
ev = new Delete_file_log_event(buf, event_len, description_event);
990
case EXEC_LOAD_EVENT:
991
ev = new Execute_load_log_event(buf, event_len, description_event);
993
case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
994
ev = new Start_log_event_v3(buf, description_event);
997
ev = new Stop_log_event(buf, description_event);
1000
ev = new Xid_log_event(buf, description_event);
1002
case FORMAT_DESCRIPTION_EVENT:
1003
ev = new Format_description_log_event(buf, event_len, description_event);
1005
case WRITE_ROWS_EVENT:
1006
ev = new Write_rows_log_event(buf, event_len, description_event);
1008
case UPDATE_ROWS_EVENT:
1009
ev = new Update_rows_log_event(buf, event_len, description_event);
1011
case DELETE_ROWS_EVENT:
1012
ev = new Delete_rows_log_event(buf, event_len, description_event);
1014
case TABLE_MAP_EVENT:
1015
ev = new Table_map_log_event(buf, event_len, description_event);
1017
case BEGIN_LOAD_QUERY_EVENT:
1018
ev = new Begin_load_query_log_event(buf, event_len, description_event);
1020
case EXECUTE_LOAD_QUERY_EVENT:
1021
ev= new Execute_load_query_log_event(buf, event_len, description_event);
1023
case INCIDENT_EVENT:
1024
ev = new Incident_log_event(buf, event_len, description_event);
1033
is_valid() are small event-specific sanity tests which are
1034
important; for example there are some malloc() in constructors
1035
(e.g. Query_log_event::Query_log_event(char*...)); when these
1036
malloc() fail we can't return an error out of the constructor
1037
(because constructor is "void") ; so instead we leave the pointer we
1038
wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
1039
Same for Format_description_log_event, member 'post_header_len'.
1041
if (!ev || !ev->is_valid())
1044
*error= "Found invalid event in binary log";
1050
inline Log_event::enum_skip_reason
1051
Log_event::continue_group(Relay_log_info *rli)
1053
if (rli->slave_skip_counter == 1)
1054
return Log_event::EVENT_SKIP_IGNORE;
1055
return Log_event::do_shall_skip(rli);
1058
/**************************************************************************
1059
Query_log_event methods
1060
**************************************************************************/
1063
This (which is used only for SHOW BINLOG EVENTS) could be updated to
1064
print SET @@session_var=. But this is not urgent, as SHOW BINLOG EVENTS is
1065
only an information, it does not produce suitable queries to replay (for
1066
example it does not print LOAD DATA INFILE).
1071
void Query_log_event::pack_info(Protocol *protocol)
1073
// TODO: show the catalog ??
1075
if (!(buf= (char*) malloc(9 + db_len + q_len)))
1078
if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
1081
pos= strcpy(buf, "use `")+5;
1082
memcpy(pos, db, db_len);
1083
pos= strcpy(pos+db_len, "`; ")+3;
1087
memcpy(pos, query, q_len);
1090
protocol->store(buf, pos-buf, &my_charset_bin);
1096
Query_log_event::write().
1099
In this event we have to modify the header to have the correct
1100
EVENT_LEN_OFFSET as we don't yet know how many status variables we
1104
bool Query_log_event::write(IO_CACHE* file)
1107
@todo if catalog can be of length FN_REFLEN==512, then we are not
1108
replicating it correctly, since the length is stored in a byte
1111
unsigned char buf[QUERY_HEADER_LEN+
1112
1+4+ // code of flags2 and flags2
1113
1+8+ // code of sql_mode and sql_mode
1114
1+1+FN_REFLEN+ // code of catalog and catalog length and catalog
1115
1+4+ // code of autoinc and the 2 autoinc variables
1116
1+6+ // code of charset and charset
1117
1+1+MAX_TIME_ZONE_NAME_LENGTH+ // code of tz and tz length and tz name
1118
1+2+ // code of lc_time_names and lc_time_names_number
1119
1+2 // code of charset_database and charset_database_number
1120
], *start, *start_of_status;
1124
return 1; // Something wrong with event
1127
We want to store the thread id:
1128
(- as an information for the user when he reads the binlog)
1129
- if the query uses temporary table: for the slave SQL thread to know to
1130
which master connection the temp table belongs.
1131
Now imagine we (write()) are called by the slave SQL thread (we are
1132
logging a query executed by this thread; the slave runs with
1133
--log-slave-updates). Then this query will be logged with
1134
thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of
1135
the same name were created simultaneously on the master (in the master
1137
CREATE TEMPORARY TABLE t; (thread 1)
1138
CREATE TEMPORARY TABLE t; (thread 2)
1140
then in the slave's binlog there will be
1141
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1142
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1143
which is bad (same thread id!).
1145
To avoid this, we log the thread's thread id EXCEPT for the SQL
1146
slave thread for which we log the original (master's) thread id.
1147
Now this moves the bug: what happens if the thread id on the
1148
master was 10 and when the slave replicates the query, a
1149
connection number 10 is opened by a normal client on the slave,
1150
and updates a temp table of the same name? We get a problem
1151
again. To avoid this, in the handling of temp tables (sql_base.cc)
1152
we use thread_id AND server_id. TODO when this is merged into
1153
4.1: in 4.1, slave_proxy_id has been renamed to pseudo_thread_id
1154
and is a session variable: that's to make mysqlbinlog work with
1155
temp tables. We probably need to introduce
1157
SET PSEUDO_SERVER_ID
1158
for mysqlbinlog in 4.1. mysqlbinlog would print:
1159
SET PSEUDO_SERVER_ID=
1160
SET PSEUDO_THREAD_ID=
1161
for each query using temp tables.
1163
int4store(buf + Q_THREAD_ID_OFFSET, slave_proxy_id);
1164
int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
1165
buf[Q_DB_LEN_OFFSET] = (char) db_len;
1166
int2store(buf + Q_ERR_CODE_OFFSET, error_code);
1169
You MUST always write status vars in increasing order of code. This
1170
guarantees that a slightly older slave will be able to parse those he
1173
start_of_status= start= buf+QUERY_HEADER_LEN;
1176
*start++= Q_FLAGS2_CODE;
1177
int4store(start, flags2);
1180
if (lc_time_names_number)
1182
assert(lc_time_names_number <= 0xFFFF);
1183
*start++= Q_LC_TIME_NAMES_CODE;
1184
int2store(start, lc_time_names_number);
1187
if (charset_database_number)
1189
assert(charset_database_number <= 0xFFFF);
1190
*start++= Q_CHARSET_DATABASE_CODE;
1191
int2store(start, charset_database_number);
1195
Here there could be code like
1196
if (command-line-option-which-says-"log_this_variable" && inited)
1198
*start++= Q_THIS_VARIABLE_CODE;
1199
int4store(start, this_variable);
1204
/* Store length of status variables */
1205
status_vars_len= (uint) (start-start_of_status);
1206
assert(status_vars_len <= MAX_SIZE_LOG_EVENT_STATUS);
1207
int2store(buf + Q_STATUS_VARS_LEN_OFFSET, status_vars_len);
1210
Calculate length of whole event
1211
The "1" below is the \0 in the db's length
1213
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
1215
return (write_header(file, event_length) ||
1216
my_b_safe_write(file, (unsigned char*) buf, QUERY_HEADER_LEN) ||
1217
write_post_header_for_derived(file) ||
1218
my_b_safe_write(file, (unsigned char*) start_of_status,
1219
(uint) (start-start_of_status)) ||
1220
my_b_safe_write(file, (db) ? (unsigned char*) db : (unsigned char*)"", db_len + 1) ||
1221
my_b_safe_write(file, (unsigned char*) query, q_len)) ? 1 : 0;
1225
The simplest constructor that could possibly work. This is used for
1226
creating static objects that have a special meaning and are invisible
1229
Query_log_event::Query_log_event()
1230
:Log_event(), data_buf(0)
1237
Query_log_event::Query_log_event()
1238
session_arg - thread handle
1239
query_arg - array of char representing the query
1240
query_length - size of the `query_arg' array
1241
using_trans - there is a modified transactional table
1242
suppress_use - suppress the generation of 'USE' statements
1243
killed_status_arg - an optional with default to Session::KILLED_NO_VALUE
1244
if the value is different from the default, the arg
1245
is set to the current session->killed value.
1246
A caller might need to masquerade session->killed with
1247
Session::NOT_KILLED.
1249
Creates an event for binlogging
1250
The value for local `killed_status' can be supplied by caller.
1252
Query_log_event::Query_log_event(Session* session_arg, const char* query_arg,
1253
ulong query_length, bool using_trans,
1255
Session::killed_state killed_status_arg)
1256
:Log_event(session_arg,
1257
(session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0) |
1258
(suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1260
data_buf(0), query(query_arg), catalog(session_arg->catalog),
1261
db(session_arg->db), q_len((uint32_t) query_length),
1262
thread_id(session_arg->thread_id),
1263
/* save the original thread id; we already know the server id */
1264
slave_proxy_id(session_arg->variables.pseudo_thread_id),
1265
flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1267
auto_increment_increment(session_arg->variables.auto_increment_increment),
1268
auto_increment_offset(session_arg->variables.auto_increment_offset),
1269
lc_time_names_number(session_arg->variables.lc_time_names->number),
1270
charset_database_number(0)
1274
if (killed_status_arg == Session::KILLED_NO_VALUE)
1275
killed_status_arg= session_arg->killed;
1278
(killed_status_arg == Session::NOT_KILLED) ?
1279
(session_arg->is_error() ? session_arg->main_da.sql_errno() : 0) :
1280
(session_arg->killed_errno());
1283
exec_time = (ulong) (end_time - session_arg->start_time);
1285
@todo this means that if we have no catalog, then it is replicated
1286
as an existing catalog of length zero. is that safe? /sven
1288
catalog_len = (catalog) ? (uint32_t) strlen(catalog) : 0;
1289
/* status_vars_len is set just before writing the event */
1290
db_len = (db) ? (uint32_t) strlen(db) : 0;
1291
if (session_arg->variables.collation_database != session_arg->db_charset)
1292
charset_database_number= session_arg->variables.collation_database->number;
1295
If we don't use flags2 for anything else than options contained in
1296
session_arg->options, it would be more efficient to flags2=session_arg->options
1297
(OPTIONS_WRITTEN_TO_BIN_LOG would be used only at reading time).
1298
But it's likely that we don't want to use 32 bits for 3 bits; in the future
1299
we will probably want to reclaim the 29 bits. So we need the &.
1301
flags2= (uint32_t) (session_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
1302
assert(session_arg->variables.collation_server->number < 256*256);
1303
int2store(charset+4, session_arg->variables.collation_server->number);
1307
static void copy_str_and_move(const char **src,
1308
Log_event::Byte **dst,
1311
memcpy(*dst, *src, len);
1312
*src= (const char *)*dst;
1319
Macro to check that there is enough space to read from memory.
1321
@param PTR Pointer to memory
1322
@param END End of memory
1323
@param CNT Number of bytes that should be read.
1325
#define CHECK_SPACE(PTR,END,CNT) \
1327
assert((PTR) + (CNT) <= (END)); \
1328
if ((PTR) + (CNT) > (END)) { \
1336
This is used by the SQL slave thread to prepare the event before execution.
1338
Query_log_event::Query_log_event(const char* buf, uint32_t event_len,
1339
const Format_description_log_event
1341
Log_event_type event_type)
1342
:Log_event(buf, description_event), data_buf(0), query(NULL),
1343
db(NULL), catalog_len(0), status_vars_len(0),
1344
flags2_inited(0), sql_mode_inited(0), charset_inited(0),
1345
auto_increment_increment(1), auto_increment_offset(1),
1346
time_zone_len(0), lc_time_names_number(0), charset_database_number(0)
1350
uint8_t common_header_len, post_header_len;
1351
Log_event::Byte *start;
1352
const Log_event::Byte *end;
1355
common_header_len= description_event->common_header_len;
1356
post_header_len= description_event->post_header_len[event_type-1];
1359
We test if the event's length is sensible, and if so we compute data_len.
1360
We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
1361
We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
1363
if (event_len < (uint)(common_header_len + post_header_len))
1365
data_len = event_len - (common_header_len + post_header_len);
1366
buf+= common_header_len;
1368
slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
1369
exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
1370
db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
1371
error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
1374
5.0 format starts here.
1375
Depending on the format, we may or not have affected/warnings etc
1376
The remnent post-header to be parsed has length:
1378
tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
1381
status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
1383
Check if status variable length is corrupt and will lead to very
1384
wrong data. We could be even more strict and require data_len to
1385
be even bigger, but this will suffice to catch most corruption
1386
errors that can lead to a crash.
1388
if (status_vars_len > cmin(data_len, (uint32_t)MAX_SIZE_LOG_EVENT_STATUS))
1393
data_len-= status_vars_len;
1397
We have parsed everything we know in the post header for QUERY_EVENT,
1398
the rest of post header is either comes from older version MySQL or
1399
dedicated to derived events (e.g. Execute_load_query...)
1402
/* variable-part: the status vars; only in MySQL 5.0 */
1404
start= (Log_event::Byte*) (buf+post_header_len);
1405
end= (const Log_event::Byte*) (start+status_vars_len);
1406
for (const Log_event::Byte* pos= start; pos < end;)
1410
CHECK_SPACE(pos, end, 4);
1412
flags2= uint4korr(pos);
1415
case Q_LC_TIME_NAMES_CODE:
1416
CHECK_SPACE(pos, end, 2);
1417
lc_time_names_number= uint2korr(pos);
1420
case Q_CHARSET_DATABASE_CODE:
1421
CHECK_SPACE(pos, end, 2);
1422
charset_database_number= uint2korr(pos);
1426
/* That's why you must write status vars in growing order of code */
1427
pos= (const unsigned char*) end; // Break loop
1431
if (!(start= data_buf = (Log_event::Byte*) malloc(catalog_len + 1 +
1435
if (catalog_len) // If catalog is given
1438
@todo we should clean up and do only copy_str_and_move; it
1439
works for both cases. Then we can remove the catalog_nz
1442
if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
1443
copy_str_and_move(&catalog, &start, catalog_len);
1446
memcpy(start, catalog, catalog_len+1); // copy end 0
1447
catalog= (const char *)start;
1448
start+= catalog_len+1;
1452
copy_str_and_move(&time_zone_str, &start, time_zone_len);
1455
if time_zone_len or catalog_len are 0, then time_zone and catalog
1456
are uninitialized at this point. shouldn't they point to the
1457
zero-length null-terminated strings we allocated space for in the
1458
my_alloc call above? /sven
1461
/* A 2nd variable part; this is common to all versions */
1462
memcpy(start, end, data_len); // Copy db and query
1463
start[data_len]= '\0'; // End query with \0 (For safetly)
1465
query= (char *)(start + db_len + 1);
1466
q_len= data_len - db_len -1;
1472
Query_log_event::do_apply_event()
1474
int Query_log_event::do_apply_event(Relay_log_info const *rli)
1476
return do_apply_event(rli, query, q_len);
1482
Compare the values of "affected rows" around here. Something
1485
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1487
errmsg_printf(ERRMSG_LVL_ERROR, "Slave: did not get the expected number of affected \
1488
rows running query from master - expected %d, got %d (this numbers \
1489
should have matched modulo 4294967296).", 0, ...);
1490
session->query_error = 1;
1493
We may also want an option to tell the slave to ignore "affected"
1494
mismatch. This mismatch could be implemented with a new ER_ code, and
1495
to ignore it you would use --slave-skip-errors...
1497
int Query_log_event::do_apply_event(Relay_log_info const *rli,
1498
const char *query_arg, uint32_t q_len_arg)
1500
int expected_error,actual_error= 0;
1501
Query_id &query_id= Query_id::get_query_id();
1503
Colleagues: please never free(session->catalog) in MySQL. This would
1504
lead to bugs as here session->catalog is a part of an alloced block,
1505
not an entire alloced block (see
1506
Query_log_event::do_apply_event()). Same for session->db. Thank
1509
session->catalog= catalog_len ? (char *) catalog : (char *)"";
1510
session->set_db(db, strlen(db)); /* allocates a copy of 'db' */
1511
session->variables.auto_increment_increment= auto_increment_increment;
1512
session->variables.auto_increment_offset= auto_increment_offset;
1515
InnoDB internally stores the master log position it has executed so far,
1516
i.e. the position just after the COMMIT event.
1517
When InnoDB will want to store, the positions in rli won't have
1518
been updated yet, so group_master_log_* will point to old BEGIN
1519
and event_master_log* will point to the beginning of current COMMIT.
1520
But log_pos of the COMMIT Query event is what we want, i.e. the pos of the
1521
END of the current log event (COMMIT). We save it in rli so that InnoDB can
1524
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
1526
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
1527
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
1530
Note: We do not need to execute reset_one_shot_variables() if this
1532
Reason: The db stored in binlog events is the same for SET and for
1533
its companion query. If the SET is ignored because of
1534
db_ok(), the companion query will also be ignored, and if
1535
the companion query is ignored in the db_ok() test of
1536
::do_apply_event(), then the companion SET also have so
1537
we don't need to reset_one_shot_variables().
1541
session->set_time((time_t)when);
1542
session->query_length= q_len_arg;
1543
session->query= (char*)query_arg;
1544
session->query_id= query_id.next();
1545
session->variables.pseudo_thread_id= thread_id; // for temp tables
1547
if (ignored_error_code((expected_error= error_code)) ||
1548
!check_expected_error(session,rli,expected_error))
1552
all bits of session->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
1553
must take their value from flags2.
1555
session->options= flags2|(session->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
1558
String tmp(time_zone_str, time_zone_len, &my_charset_bin);
1559
if (!(session->variables.time_zone= my_tz_find(session, &tmp)))
1561
my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr());
1562
session->variables.time_zone= global_system_variables.time_zone;
1563
goto compare_errors;
1566
if (lc_time_names_number)
1568
if (!(session->variables.lc_time_names=
1569
my_locale_by_number(lc_time_names_number)))
1571
my_printf_error(ER_UNKNOWN_ERROR,
1572
"Unknown locale: '%d'", MYF(0), lc_time_names_number);
1573
session->variables.lc_time_names= &my_locale_en_US;
1574
goto compare_errors;
1578
session->variables.lc_time_names= &my_locale_en_US;
1579
if (charset_database_number)
1581
const CHARSET_INFO *cs;
1582
if (!(cs= get_charset(charset_database_number, MYF(0))))
1585
int10_to_str((int) charset_database_number, buf, -10);
1586
my_error(ER_UNKNOWN_COLLATION, MYF(0), buf);
1587
goto compare_errors;
1589
session->variables.collation_database= cs;
1592
session->variables.collation_database= session->db_charset;
1594
/* Execute the query (note that we bypass dispatch_command()) */
1595
const char* found_semicolon= NULL;
1596
mysql_parse(session, session->query, session->query_length, &found_semicolon);
1597
log_slow_statement(session);
1602
The query got a really bad error on the master (thread killed etc),
1603
which could be inconsistent. Parse it to test the table names: if the
1604
replicate-*-do|ignore-table rules say "this query must be ignored" then
1605
we exit gracefully; otherwise we warn about the bad error and tell DBA
1608
if (mysql_test_parse_for_slave(session, session->query, session->query_length))
1609
clear_all_errors(session, const_cast<Relay_log_info*>(rli)); /* Can ignore query */
1612
rli->report(ERROR_LEVEL, expected_error,
1613
_("Query partially completed on the master "
1614
"(error on master: %d) and was aborted. There is a "
1615
"chance that your master is inconsistent at this "
1616
"point. If you are sure that your master is ok, run "
1617
"this query manually on the slave and then restart the "
1618
"slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1; "
1619
"START SLAVE; . Query: '%s'"),
1620
expected_error, session->query);
1621
session->is_slave_error= 1;
1629
If we expected a non-zero error code, and we don't get the same error
1630
code, and none of them should be ignored.
1632
actual_error= session->is_error() ? session->main_da.sql_errno() : 0;
1633
if ((expected_error != actual_error) &&
1635
!ignored_error_code(actual_error) &&
1636
!ignored_error_code(expected_error))
1638
rli->report(ERROR_LEVEL, 0,
1639
_("Query caused differenxt errors on master and slave.\n"
1640
"Error on master: '%s' (%d), Error on slave: '%s' (%d).\n"
1641
"Default database: '%s'. Query: '%s'"),
1644
actual_error ? session->main_da.message() : _("no error"),
1646
print_slave_db_safe(db), query_arg);
1647
session->is_slave_error= 1;
1650
If we get the same error code as expected, or they should be ignored.
1652
else if (expected_error == actual_error ||
1653
ignored_error_code(actual_error))
1655
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
1656
session->killed= Session::NOT_KILLED;
1659
Other cases: mostly we expected no error and get one.
1661
else if (session->is_slave_error || session->is_fatal_error)
1663
rli->report(ERROR_LEVEL, actual_error,
1664
_("Error '%s' on query. Default database: '%s'. Query: '%s'"),
1665
(actual_error ? session->main_da.message() :
1666
_("unexpected success or fatal error")),
1667
print_slave_db_safe(session->db), query_arg);
1668
session->is_slave_error= 1;
1672
TODO: compare the values of "affected rows" around here. Something
1674
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1676
errmsg_printf(ERRMSG_LVL_ERROR, "Slave: did not get the expected number of affected \
1677
rows running query from master - expected %d, got %d (this numbers \
1678
should have matched modulo 4294967296).", 0, ...);
1679
session->is_slave_error = 1;
1681
We may also want an option to tell the slave to ignore "affected"
1682
mismatch. This mismatch could be implemented with a new ER_ code, and
1683
to ignore it you would use --slave-skip-errors...
1685
To do the comparison we need to know the value of "affected" which the
1686
above mysql_parse() computed. And we need to know the value of
1687
"affected" in the master's binlog. Both will be implemented later. The
1688
important thing is that we now have the format ready to log the values
1689
of "affected" in the binlog. So we can release 5.0.0 before effectively
1690
logging "affected" and effectively comparing it.
1692
} /* End of if (db_ok(... */
1695
pthread_mutex_lock(&LOCK_thread_count);
1697
Probably we have set session->query, session->db, session->catalog to point to places
1698
in the data_buf of this event. Now the event is going to be deleted
1699
probably, so data_buf will be freed, so the session->... listed above will be
1700
pointers to freed memory.
1701
So we must set them to 0, so that those bad pointers values are not later
1702
used. Note that "cleanup" queries like automatic DROP TEMPORARY Table
1703
don't suffer from these assignments to 0 as DROP TEMPORARY
1704
Table uses the db.table syntax.
1706
session->catalog= 0;
1707
session->set_db(NULL, 0); /* will free the current database */
1708
session->query= 0; // just to be sure
1709
session->query_length= 0;
1710
pthread_mutex_unlock(&LOCK_thread_count);
1711
close_thread_tables(session);
1712
session->first_successful_insert_id_in_prev_stmt= 0;
1713
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
1714
return session->is_slave_error;
1717
int Query_log_event::do_update_pos(Relay_log_info *rli)
1719
return Log_event::do_update_pos(rli);
1723
Log_event::enum_skip_reason
1724
Query_log_event::do_shall_skip(Relay_log_info *rli)
1726
assert(query && q_len > 0);
1728
if (rli->slave_skip_counter > 0)
1730
if (strcmp("BEGIN", query) == 0)
1732
session->options|= OPTION_BEGIN;
1733
return(Log_event::continue_group(rli));
1736
if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
1738
session->options&= ~OPTION_BEGIN;
1739
return(Log_event::EVENT_SKIP_COUNT);
1742
return(Log_event::do_shall_skip(rli));
1746
/**************************************************************************
1747
Start_log_event_v3 methods
1748
**************************************************************************/
1750
Start_log_event_v3::Start_log_event_v3()
1751
:Log_event(), created(0), binlog_version(BINLOG_VERSION),
1752
artificial_event(0), dont_set_created(0)
1754
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
1758
Start_log_event_v3::pack_info()
1761
void Start_log_event_v3::pack_info(Protocol *protocol)
1763
char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
1764
pos= strcpy(buf, "Server ver: ")+12;
1765
pos= strcpy(pos, server_version)+strlen(server_version);
1766
pos= strcpy(pos, ", Binlog ver: ")+14;
1767
pos= int10_to_str(binlog_version, pos, 10);
1768
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
1773
Start_log_event_v3::Start_log_event_v3()
1776
Start_log_event_v3::Start_log_event_v3(const char* buf,
1777
const Format_description_log_event
1779
:Log_event(buf, description_event)
1781
buf+= description_event->common_header_len;
1782
binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET);
1783
memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
1785
// prevent overrun if log is corrupted on disk
1786
server_version[ST_SERVER_VER_LEN-1]= 0;
1787
created= uint4korr(buf+ST_CREATED_OFFSET);
1788
/* We use log_pos to mark if this was an artificial event or not */
1789
artificial_event= (log_pos == 0);
1790
dont_set_created= 1;
1795
Start_log_event_v3::write()
1798
bool Start_log_event_v3::write(IO_CACHE* file)
1800
char buff[START_V3_HEADER_LEN];
1801
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
1802
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
1803
if (!dont_set_created)
1804
created= when= get_time();
1805
int4store(buff + ST_CREATED_OFFSET,created);
1806
return (write_header(file, sizeof(buff)) ||
1807
my_b_safe_write(file, (unsigned char*) buff, sizeof(buff)));
1812
Start_log_event_v3::do_apply_event() .
1816
- To handle the case where the master died without having time to write
1817
DROP TEMPORARY Table, DO RELEASE_LOCK (prepared statements' deletion is
1818
TODO), we clean up all temporary tables that we got, if we are sure we
1822
- Remove all active user locks.
1823
Guilhem 2003-06: this is true but not urgent: the worst it can cause is
1824
the use of a bit of memory for a user lock which will not be used
1825
anymore. If the user lock is later used, the old one will be released. In
1826
other words, no deadlock problem.
1829
int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
1831
switch (binlog_version)
1836
This can either be 4.x (then a Start_log_event_v3 is only at master
1837
startup so we are sure the master has restarted and cleared his temp
1838
tables; the event always has 'created'>0) or 5.0 (then we have to test
1843
session->close_temporary_tables();
1844
cleanup_load_tmpdir();
1849
Now the older formats; in that case load_tmpdir is cleaned up by the I/O
1853
if (strncmp(rli->relay_log.description_event_for_exec->server_version,
1854
"3.23.57",7) >= 0 && created)
1857
Can distinguish, based on the value of 'created': this event was
1858
generated at master startup.
1860
session->close_temporary_tables();
1863
Otherwise, can't distinguish a Start_log_event generated at
1864
master startup and one generated by master FLUSH LOGS, so cannot
1865
be sure temp tables have to be dropped. So do nothing.
1869
/* this case is impossible */
1875
/***************************************************************************
1876
Format_description_log_event methods
1877
****************************************************************************/
1880
Format_description_log_event 1st ctor.
1882
Ctor. Can be used to create the event to write to the binary log (when the
1883
server starts or when FLUSH LOGS), or to create artificial events to parse
1884
binlogs from MySQL 3.23 or 4.x.
1885
When in a client, only the 2nd use is possible.
1887
@param binlog_version the binlog version for which we want to build
1888
an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x
1889
x>=2 and 4.1) or 4 (MySQL 5.0). Note that the
1890
old 4.0 (binlog version 2) is not supported;
1891
it should not be used for replication with
1895
Format_description_log_event::
1896
Format_description_log_event(uint8_t binlog_ver, const char*)
1897
:Start_log_event_v3(), event_type_permutation(0)
1899
binlog_version= binlog_ver;
1900
switch (binlog_ver) {
1901
case 4: /* MySQL 5.0 */
1902
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
1903
common_header_len= LOG_EVENT_HEADER_LEN;
1904
number_of_event_types= LOG_EVENT_TYPES;
1905
/* we'll catch malloc() error in is_valid() */
1906
post_header_len=(uint8_t*) malloc(number_of_event_types*sizeof(uint8_t));
1907
memset(post_header_len, 0, number_of_event_types*sizeof(uint8_t));
1909
This long list of assignments is not beautiful, but I see no way to
1910
make it nicer, as the right members are #defines, not array members, so
1911
it's impossible to write a loop.
1913
if (post_header_len)
1915
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
1916
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN;
1917
post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
1918
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
1919
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
1920
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
1921
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
1922
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
1923
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
1924
post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
1925
post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN;
1926
post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
1927
post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
1928
post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
1929
post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1];
1930
post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
1931
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
1932
post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
1936
default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
1939
calc_server_version_split();
1944
The problem with this constructor is that the fixed header may have a
1945
length different from this version, but we don't know this length as we
1946
have not read the Format_description_log_event which says it, yet. This
1947
length is in the post-header of the event, but we don't know where the
1950
So this type of event HAS to:
1951
- either have the header's length at the beginning (in the header, at a
1952
fixed position which will never be changed), not in the post-header. That
1953
would make the header be "shifted" compared to other events.
1954
- or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future
1955
versions, so that we know for sure.
1957
I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because
1958
it is sent before Format_description_log_event).
1961
Format_description_log_event::
1962
Format_description_log_event(const char* buf,
1965
Format_description_log_event*
1967
:Start_log_event_v3(buf, description_event), event_type_permutation(0)
1969
buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
1970
if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
1971
return; /* sanity check */
1972
number_of_event_types=
1973
event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1);
1974
post_header_len= (uint8_t*) malloc(number_of_event_types*
1975
sizeof(*post_header_len));
1976
/* If alloc fails, we'll detect it in is_valid() */
1977
if (post_header_len != NULL)
1978
memcpy(post_header_len, buf+ST_COMMON_HEADER_LEN_OFFSET+1,
1979
number_of_event_types* sizeof(*post_header_len));
1980
calc_server_version_split();
1983
In some previous versions, the events were given other event type
1984
id numbers than in the present version. When replicating from such
1985
a version, we therefore set up an array that maps those id numbers
1986
to the id numbers of the present server.
1988
If post_header_len is null, it means malloc failed, and is_valid
1989
will fail, so there is no need to do anything.
1991
The trees in which events have wrong id's are:
1993
mysql-5.1-wl1012.old mysql-5.1-wl2325-5.0-drop6p13-alpha
1994
mysql-5.1-wl2325-5.0-drop6 mysql-5.1-wl2325-5.0
1995
mysql-5.1-wl2325-no-dd
1997
(this was found by grepping for two lines in sequence where the
1998
first matches "FORMAT_DESCRIPTION_EVENT," and the second matches
1999
"TABLE_MAP_EVENT," in log_event.h in all trees)
2001
In these trees, the following server_versions existed since
2002
TABLE_MAP_EVENT was introduced:
2004
5.1.1-a_drop5p3 5.1.1-a_drop5p4 5.1.1-alpha
2005
5.1.2-a_drop5p10 5.1.2-a_drop5p11 5.1.2-a_drop5p12
2006
5.1.2-a_drop5p13 5.1.2-a_drop5p14 5.1.2-a_drop5p15
2007
5.1.2-a_drop5p16 5.1.2-a_drop5p16b 5.1.2-a_drop5p16c
2008
5.1.2-a_drop5p17 5.1.2-a_drop5p4 5.1.2-a_drop5p5
2009
5.1.2-a_drop5p6 5.1.2-a_drop5p7 5.1.2-a_drop5p8
2010
5.1.2-a_drop5p9 5.1.3-a_drop5p17 5.1.3-a_drop5p17b
2011
5.1.3-a_drop5p17c 5.1.4-a_drop5p18 5.1.4-a_drop5p19
2012
5.1.4-a_drop5p20 5.1.4-a_drop6p0 5.1.4-a_drop6p1
2013
5.1.4-a_drop6p2 5.1.5-a_drop5p20 5.2.0-a_drop6p3
2014
5.2.0-a_drop6p4 5.2.0-a_drop6p5 5.2.0-a_drop6p6
2015
5.2.1-a_drop6p10 5.2.1-a_drop6p11 5.2.1-a_drop6p12
2016
5.2.1-a_drop6p6 5.2.1-a_drop6p7 5.2.1-a_drop6p8
2017
5.2.2-a_drop6p13 5.2.2-a_drop6p13-alpha 5.2.2-a_drop6p13b
2020
(this was found by grepping for "mysql," in all historical
2021
versions of configure.in in the trees listed above).
2023
There are 5.1.1-alpha versions that use the new event id's, so we
2024
do not test that version string. So replication from 5.1.1-alpha
2025
with the other event id's to a new version does not work.
2026
Moreover, we can safely ignore the part after drop[56]. This
2027
allows us to simplify the big list above to the following regexes:
2029
5\.1\.[1-5]-a_drop5.*
2031
5\.2\.[0-2]-a_drop6.*
2033
This is what we test for in the 'if' below.
2035
if (post_header_len &&
2036
server_version[0] == '5' && server_version[1] == '.' &&
2037
server_version[3] == '.' &&
2038
strncmp(server_version + 5, "-a_drop", 7) == 0 &&
2039
((server_version[2] == '1' &&
2040
server_version[4] >= '1' && server_version[4] <= '5' &&
2041
server_version[12] == '5') ||
2042
(server_version[2] == '1' &&
2043
server_version[4] == '4' &&
2044
server_version[12] == '6') ||
2045
(server_version[2] == '2' &&
2046
server_version[4] >= '0' && server_version[4] <= '2' &&
2047
server_version[12] == '6')))
2049
if (number_of_event_types != 22)
2051
/* this makes is_valid() return false. */
2052
free(post_header_len);
2053
post_header_len= NULL;
2056
static const uint8_t perm[23]=
2058
UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
2059
LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
2060
APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
2062
FORMAT_DESCRIPTION_EVENT,
2065
BEGIN_LOAD_QUERY_EVENT,
2066
EXECUTE_LOAD_QUERY_EVENT,
2068
event_type_permutation= perm;
2070
Since we use (permuted) event id's to index the post_header_len
2071
array, we need to permute the post_header_len array too.
2073
uint8_t post_header_len_temp[23];
2074
for (int i= 1; i < 23; i++)
2075
post_header_len_temp[perm[i] - 1]= post_header_len[i - 1];
2076
for (int i= 0; i < 22; i++)
2077
post_header_len[i] = post_header_len_temp[i];
2082
bool Format_description_log_event::write(IO_CACHE* file)
2085
We don't call Start_log_event_v3::write() because this would make 2
2088
unsigned char buff[FORMAT_DESCRIPTION_HEADER_LEN];
2089
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
2090
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
2091
if (!dont_set_created)
2092
created= when= get_time();
2093
int4store(buff + ST_CREATED_OFFSET,created);
2094
buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
2095
memcpy(buff+ST_COMMON_HEADER_LEN_OFFSET+1, post_header_len,
2097
return (write_header(file, sizeof(buff)) ||
2098
my_b_safe_write(file, buff, sizeof(buff)));
2102
int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
2105
As a transaction NEVER spans on 2 or more binlogs:
2106
if we have an active transaction at this point, the master died
2107
while writing the transaction to the binary log, i.e. while
2108
flushing the binlog cache to the binlog. XA guarantees that master has
2109
rolled back. So we roll back.
2110
Note: this event could be sent by the master to inform us of the
2111
format of its binlog; in other words maybe it is not at its
2112
original place when it comes to us; we'll know this by checking
2113
log_pos ("artificial" events have log_pos == 0).
2115
if (!artificial_event && created && session->transaction.all.ha_list)
2117
/* This is not an error (XA is safe), just an information */
2118
rli->report(INFORMATION_LEVEL, 0,
2119
_("Rolling back unfinished transaction (no COMMIT "
2120
"or ROLLBACK in relay log). A probable cause is that "
2121
"the master died while writing the transaction to "
2122
"its binary log, thus rolled back too."));
2123
const_cast<Relay_log_info*>(rli)->cleanup_context(session, 1);
2126
If this event comes from ourselves, there is no cleaning task to
2127
perform, we don't call Start_log_event_v3::do_apply_event()
2128
(this was just to update the log's description event).
2130
if (server_id != ::server_id)
2133
If the event was not requested by the slave i.e. the master sent
2134
it while the slave asked for a position >4, the event will make
2135
rli->group_master_log_pos advance. Say that the slave asked for
2136
position 1000, and the Format_desc event's end is 96. Then in
2137
the beginning of replication rli->group_master_log_pos will be
2138
0, then 96, then jump to first really asked event (which is
2139
>96). So this is ok.
2141
return(Start_log_event_v3::do_apply_event(rli));
2146
int Format_description_log_event::do_update_pos(Relay_log_info *rli)
2148
/* save the information describing this binlog */
2149
delete rli->relay_log.description_event_for_exec;
2150
rli->relay_log.description_event_for_exec= this;
2152
if (server_id == ::server_id)
2155
We only increase the relay log position if we are skipping
2156
events and do not touch any group_* variables, nor flush the
2157
relay log info. If there is a crash, we will have to re-skip
2158
the events again, but that is a minor issue.
2160
If we do not skip stepping the group log position (and the
2161
server id was changed when restarting the server), it might well
2162
be that we start executing at a position that is invalid, e.g.,
2163
at a Rows_log_event or a Query_log_event preceeded by a
2164
Intvar_log_event instead of starting at a Table_map_log_event or
2165
the Intvar_log_event respectively.
2167
rli->inc_event_relay_log_pos();
2172
return Log_event::do_update_pos(rli);
2176
Log_event::enum_skip_reason
2177
Format_description_log_event::do_shall_skip(Relay_log_info *)
2179
return Log_event::EVENT_SKIP_NOT;
2184
Splits the event's 'server_version' string into three numeric pieces stored
2185
into 'server_version_split':
2186
X.Y.Zabc (X,Y,Z numbers, a not a digit) -> {X,Y,Z}
2189
'server_version_split' is then used for lookups to find if the server which
2190
created this event has some known bug.
2192
void Format_description_log_event::calc_server_version_split()
2194
char *p= server_version, *r;
2196
for (uint32_t i= 0; i<=2; i++)
2198
number= strtoul(p, &r, 10);
2199
server_version_split[i]= (unsigned char)number;
2200
assert(number < 256); // fit in unsigned char
2202
assert(!((i == 0) && (*r != '.'))); // should be true in practice
2204
p++; // skip the dot
2209
/**************************************************************************
2210
Load_log_event methods
2211
General note about Load_log_event: the binlogging of LOAD DATA INFILE is
2212
going to be changed in 5.0 (or maybe in 5.1; not decided yet).
2213
However, the 5.0 slave could still have to read such events (from a 4.x
2214
master), convert them (which just means maybe expand the header, when 5.0
2215
servers have a UID in events) (remember that whatever is after the header
2216
will be like in 4.x, as this event's format is not modified in 5.0 as we
2217
will use new types of events to log the new LOAD DATA INFILE features).
2218
To be able to read/convert, we just need to not assume that the common
2219
header is of length LOG_EVENT_HEADER_LEN (we must use the description
2221
Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE
2222
between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the
2223
positions displayed in SHOW SLAVE STATUS then are fine too).
2224
**************************************************************************/
2227
Load_log_event::pack_info()
2230
uint32_t Load_log_event::get_query_buffer_length()
2233
5 + db_len + 3 + // "use DB; "
2234
18 + fname_len + 2 + // "LOAD DATA INFILE 'file''"
2236
9 + // " REPLACE or IGNORE "
2237
13 + table_name_len*2 + // "INTO Table `table`"
2238
21 + sql_ex.field_term_len*4 + 2 + // " FIELDS TERMINATED BY 'str'"
2239
23 + sql_ex.enclosed_len*4 + 2 + // " OPTIONALLY ENCLOSED BY 'str'"
2240
12 + sql_ex.escaped_len*4 + 2 + // " ESCAPED BY 'str'"
2241
21 + sql_ex.line_term_len*4 + 2 + // " LINES TERMINATED BY 'str'"
2242
19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'"
2243
15 + 22 + // " IGNORE xxx LINES"
2244
3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)"
2248
void Load_log_event::print_query(bool need_db, char *buf,
2249
char **end, char **fn_start, char **fn_end)
2253
if (need_db && db && db_len)
2255
pos= strcpy(pos, "use `")+5;
2256
memcpy(pos, db, db_len);
2257
pos= strcpy(pos+db_len, "`; ")+3;
2260
pos= strcpy(pos, "LOAD DATA ")+10;
2265
if (check_fname_outside_temp_buf())
2266
pos= strcpy(pos, "LOCAL ")+6;
2267
pos= strcpy(pos, "INFILE '")+8;
2268
memcpy(pos, fname, fname_len);
2269
pos= strcpy(pos+fname_len, "' ")+2;
2271
if (sql_ex.opt_flags & REPLACE_FLAG)
2272
pos= strcpy(pos, " REPLACE ")+9;
2273
else if (sql_ex.opt_flags & IGNORE_FLAG)
2274
pos= strcpy(pos, " IGNORE ")+8;
2276
pos= strcpy(pos ,"INTO")+4;
2281
pos= strcpy(pos ," Table `")+8;
2282
memcpy(pos, table_name, table_name_len);
2283
pos+= table_name_len;
2285
/* We have to create all optinal fields as the default is not empty */
2286
pos= strcpy(pos, "` FIELDS TERMINATED BY ")+23;
2287
pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len);
2288
if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
2289
pos= strcpy(pos, " OPTIONALLY ")+12;
2290
pos= strcpy(pos, " ENCLOSED BY ")+13;
2291
pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len);
2293
pos= strcpy(pos, " ESCAPED BY ")+12;
2294
pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
2296
pos= strcpy(pos, " LINES TERMINATED BY ")+21;
2297
pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
2298
if (sql_ex.line_start_len)
2300
pos= strcpy(pos, " STARTING BY ")+13;
2301
pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
2304
if ((long) skip_lines > 0)
2306
pos= strcpy(pos, " IGNORE ")+8;
2307
pos= int64_t10_to_str((int64_t) skip_lines, pos, 10);
2308
pos= strcpy(pos," LINES ")+7;
2314
const char *field= fields;
2315
pos= strcpy(pos, " (")+2;
2316
for (i = 0; i < num_fields; i++)
2323
memcpy(pos, field, field_lens[i]);
2324
pos+= field_lens[i];
2325
field+= field_lens[i] + 1;
2334
void Load_log_event::pack_info(Protocol *protocol)
2338
if (!(buf= (char*) malloc(get_query_buffer_length())))
2340
print_query(true, buf, &end, 0, 0);
2341
protocol->store(buf, end-buf, &my_charset_bin);
2347
Load_log_event::write_data_header()
2350
bool Load_log_event::write_data_header(IO_CACHE* file)
2352
char buf[LOAD_HEADER_LEN];
2353
int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
2354
int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
2355
int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
2356
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
2357
buf[L_DB_LEN_OFFSET] = (char)db_len;
2358
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
2359
return my_b_safe_write(file, (unsigned char*)buf, LOAD_HEADER_LEN) != 0;
2364
Load_log_event::write_data_body()
2367
bool Load_log_event::write_data_body(IO_CACHE* file)
2369
if (sql_ex.write_data(file))
2371
if (num_fields && fields && field_lens)
2373
if (my_b_safe_write(file, (unsigned char*)field_lens, num_fields) ||
2374
my_b_safe_write(file, (unsigned char*)fields, field_block_len))
2377
return (my_b_safe_write(file, (unsigned char*)table_name, table_name_len + 1) ||
2378
my_b_safe_write(file, (unsigned char*)db, db_len + 1) ||
2379
my_b_safe_write(file, (unsigned char*)fname, fname_len));
2384
Load_log_event::Load_log_event()
2387
Load_log_event::Load_log_event(Session *session_arg, sql_exchange *ex,
2388
const char *db_arg, const char *table_name_arg,
2389
List<Item> &fields_arg,
2390
enum enum_duplicates handle_dup,
2391
bool ignore, bool using_trans)
2392
:Log_event(session_arg,
2393
session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
2395
thread_id(session_arg->thread_id),
2396
slave_proxy_id(session_arg->variables.pseudo_thread_id),
2397
num_fields(0),fields(0),
2398
field_lens(0),field_block_len(0),
2399
table_name(table_name_arg ? table_name_arg : ""),
2400
db(db_arg), fname(ex->file_name), local_fname(false)
2404
exec_time = (ulong) (end_time - session_arg->start_time);
2405
/* db can never be a zero pointer in 4.0 */
2406
db_len = (uint32_t) strlen(db);
2407
table_name_len = (uint32_t) strlen(table_name);
2408
fname_len = (fname) ? (uint) strlen(fname) : 0;
2409
sql_ex.field_term = (char*) ex->field_term->ptr();
2410
sql_ex.field_term_len = (uint8_t) ex->field_term->length();
2411
sql_ex.enclosed = (char*) ex->enclosed->ptr();
2412
sql_ex.enclosed_len = (uint8_t) ex->enclosed->length();
2413
sql_ex.line_term = (char*) ex->line_term->ptr();
2414
sql_ex.line_term_len = (uint8_t) ex->line_term->length();
2415
sql_ex.line_start = (char*) ex->line_start->ptr();
2416
sql_ex.line_start_len = (uint8_t) ex->line_start->length();
2417
sql_ex.escaped = (char*) ex->escaped->ptr();
2418
sql_ex.escaped_len = (uint8_t) ex->escaped->length();
2419
sql_ex.opt_flags = 0;
2420
sql_ex.cached_new_format = -1;
2423
sql_ex.opt_flags|= DUMPFILE_FLAG;
2424
if (ex->opt_enclosed)
2425
sql_ex.opt_flags|= OPT_ENCLOSED_FLAG;
2427
sql_ex.empty_flags= 0;
2429
switch (handle_dup) {
2431
sql_ex.opt_flags|= REPLACE_FLAG;
2433
case DUP_UPDATE: // Impossible here
2438
sql_ex.opt_flags|= IGNORE_FLAG;
2440
if (!ex->field_term->length())
2441
sql_ex.empty_flags |= FIELD_TERM_EMPTY;
2442
if (!ex->enclosed->length())
2443
sql_ex.empty_flags |= ENCLOSED_EMPTY;
2444
if (!ex->line_term->length())
2445
sql_ex.empty_flags |= LINE_TERM_EMPTY;
2446
if (!ex->line_start->length())
2447
sql_ex.empty_flags |= LINE_START_EMPTY;
2448
if (!ex->escaped->length())
2449
sql_ex.empty_flags |= ESCAPED_EMPTY;
2451
skip_lines = ex->skip_lines;
2453
List_iterator<Item> li(fields_arg);
2454
field_lens_buf.length(0);
2455
fields_buf.length(0);
2457
while ((item = li++))
2460
unsigned char len = (unsigned char) strlen(item->name);
2461
field_block_len += len + 1;
2462
fields_buf.append(item->name, len + 1);
2463
field_lens_buf.append((char*)&len, 1);
2466
field_lens = (const unsigned char*)field_lens_buf.ptr();
2467
fields = fields_buf.ptr();
2473
The caller must do buf[event_len] = 0 before he starts using the
2476
Load_log_event::Load_log_event(const char *buf, uint32_t event_len,
2477
const Format_description_log_event *description_event)
2478
:Log_event(buf, description_event), num_fields(0), fields(0),
2479
field_lens(0),field_block_len(0),
2480
table_name(0), db(0), fname(0), local_fname(false)
2483
I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0,
2484
4.0->5.0 and 5.0->5.0 and it works.
2487
copy_log_event(buf, event_len,
2488
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
2490
description_event->common_header_len :
2491
LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
2493
/* otherwise it's a derived class, will call copy_log_event() itself */
2499
Load_log_event::copy_log_event()
2502
int Load_log_event::copy_log_event(const char *buf, ulong event_len,
2504
const Format_description_log_event *description_event)
2507
char* buf_end = (char*)buf + event_len;
2508
/* this is the beginning of the post-header */
2509
const char* data_head = buf + description_event->common_header_len;
2510
slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
2511
exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
2512
skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
2513
table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
2514
db_len = (uint)data_head[L_DB_LEN_OFFSET];
2515
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
2517
if ((int) event_len < body_offset)
2520
Sql_ex.init() on success returns the pointer to the first byte after
2521
the sql_ex structure, which is the start of field lengths array.
2523
if (!(field_lens= (unsigned char*)sql_ex.init((char*)buf + body_offset,
2525
buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
2528
data_len = event_len - body_offset;
2529
if (num_fields > data_len) // simple sanity check against corruption
2531
for (uint32_t i = 0; i < num_fields; i++)
2532
field_block_len += (uint)field_lens[i] + 1;
2534
fields = (char*)field_lens + num_fields;
2535
table_name = fields + field_block_len;
2536
db = table_name + table_name_len + 1;
2537
fname = db + db_len + 1;
2538
fname_len = strlen(fname);
2539
// null termination is accomplished by the caller doing buf[event_len]=0
2546
Load_log_event::set_fields()
2549
This function can not use the member variable
2550
for the database, since LOAD DATA INFILE on the slave
2551
can be for a different database than the current one.
2552
This is the reason for the affected_db argument to this method.
2555
void Load_log_event::set_fields(const char* affected_db,
2556
List<Item> &field_list,
2557
Name_resolution_context *context)
2560
const char* field = fields;
2561
for (i= 0; i < num_fields; i++)
2563
field_list.push_back(new Item_field(context,
2564
affected_db, table_name, field));
2565
field+= field_lens[i] + 1;
2571
Does the data loading job when executing a LOAD DATA on the slave.
2575
@param use_rli_only_for_errors If set to 1, rli is provided to
2576
Load_log_event::exec_event only for this
2577
function to have RPL_LOG_NAME and
2578
rli->last_slave_error, both being used by
2579
error reports. rli's position advancing
2580
is skipped (done by the caller which is
2581
Execute_load_log_event::exec_event).
2582
If set to 0, rli is provided for full use,
2583
i.e. for error reports and position
2587
fix this; this can be done by testing rules in
2588
Create_file_log_event::exec_event() and then discarding Append_block and
2591
this is a bug - this needs to be moved to the I/O thread
2599
int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
2600
bool use_rli_only_for_errors)
2602
Query_id &query_id= Query_id::get_query_id();
2603
session->set_db(db, strlen(db));
2604
assert(session->query == 0);
2605
session->query_length= 0; // Should not be needed
2606
session->is_slave_error= 0;
2607
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
2609
/* see Query_log_event::do_apply_event() and BUG#13360 */
2610
assert(!rli->m_table_map.count());
2612
Usually lex_start() is called by mysql_parse(), but we need it here
2613
as the present method does not call mysql_parse().
2616
session->reset_for_next_command();
2618
if (!use_rli_only_for_errors)
2621
Saved for InnoDB, see comment in
2622
Query_log_event::do_apply_event()
2624
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
2628
We test replicate_*_db rules. Note that we have already prepared
2629
the file to load, even if we are going to ignore and delete it
2630
now. So it is possible that we did a lot of disk writes for
2631
nothing. In other words, a big LOAD DATA INFILE on the master will
2632
still consume a lot of space on the slave (space in the relay log
2633
+ space of temp files: twice the space of the file to load...)
2634
even if it will finally be ignored. TODO: fix this; this can be
2635
done by testing rules in Create_file_log_event::do_apply_event()
2636
and then discarding Append_block and al. Another way is do the
2637
filtering in the I/O thread (more efficient: no disk writes at
2641
Note: We do not need to execute reset_one_shot_variables() if this
2643
Reason: The db stored in binlog events is the same for SET and for
2644
its companion query. If the SET is ignored because of
2645
db_ok(), the companion query will also be ignored, and if
2646
the companion query is ignored in the db_ok() test of
2647
::do_apply_event(), then the companion SET also have so
2648
we don't need to reset_one_shot_variables().
2652
session->set_time((time_t)when);
2653
session->query_id = query_id.next();
2655
Initing session->row_count is not necessary in theory as this variable has no
2656
influence in the case of the slave SQL thread (it is used to generate a
2657
"data truncated" warning but which is absorbed and never gets to the
2658
error log); still we init it to avoid a Valgrind message.
2660
drizzle_reset_errors(session, 0);
2663
memset(&tables, 0, sizeof(tables));
2664
tables.db= session->strmake(session->db, session->db_length);
2665
tables.alias = tables.table_name = (char*) table_name;
2666
tables.lock_type = TL_WRITE;
2669
// the table will be opened in mysql_load
2673
enum enum_duplicates handle_dup;
2675
char *load_data_query;
2678
Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST
2679
and written to slave's binlog if binlogging is on.
2681
if (!(load_data_query= (char *)session->alloc(get_query_buffer_length() + 1)))
2684
This will set session->fatal_error in case of OOM. So we surely will notice
2685
that something is wrong.
2690
print_query(false, load_data_query, &end, (char **)&session->lex->fname_start,
2691
(char **)&session->lex->fname_end);
2693
session->query_length= end - load_data_query;
2694
session->query= load_data_query;
2696
if (sql_ex.opt_flags & REPLACE_FLAG)
2698
handle_dup= DUP_REPLACE;
2700
else if (sql_ex.opt_flags & IGNORE_FLAG)
2703
handle_dup= DUP_ERROR;
2708
When replication is running fine, if it was DUP_ERROR on the
2709
master then we could choose IGNORE here, because if DUP_ERROR
2710
suceeded on master, and data is identical on the master and slave,
2711
then there should be no uniqueness errors on slave, so IGNORE is
2712
the same as DUP_ERROR. But in the unlikely case of uniqueness errors
2713
(because the data on the master and slave happen to be different
2714
(user error or bug), we want LOAD DATA to print an error message on
2715
the slave to discover the problem.
2717
If reading from net (a 3.23 master), mysql_load() will change this
2720
handle_dup= DUP_ERROR;
2723
We need to set session->lex->sql_command and session->lex->duplicates
2724
since InnoDB tests these variables to decide if this is a LOAD
2725
DATA ... REPLACE INTO ... statement even though mysql_parse()
2726
is not called. This is not needed in 5.0 since there the LOAD
2727
DATA ... statement is replicated using mysql_parse(), which
2728
sets the session->lex fields correctly.
2730
session->lex->sql_command= SQLCOM_LOAD;
2731
session->lex->duplicates= handle_dup;
2733
sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
2734
String field_term(sql_ex.field_term,sql_ex.field_term_len,&my_charset_utf8_general_ci);
2735
String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,&my_charset_utf8_general_ci);
2736
String line_term(sql_ex.line_term,sql_ex.line_term_len,&my_charset_utf8_general_ci);
2737
String line_start(sql_ex.line_start,sql_ex.line_start_len,&my_charset_utf8_general_ci);
2738
String escaped(sql_ex.escaped,sql_ex.escaped_len, &my_charset_utf8_general_ci);
2739
ex.field_term= &field_term;
2740
ex.enclosed= &enclosed;
2741
ex.line_term= &line_term;
2742
ex.line_start= &line_start;
2743
ex.escaped= &escaped;
2745
ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
2746
if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
2747
ex.field_term->length(0);
2749
ex.skip_lines = skip_lines;
2750
List<Item> field_list;
2751
session->lex->select_lex.context.resolve_in_table_list_only(&tables);
2752
set_fields(tables.db, field_list, &session->lex->select_lex.context);
2753
session->variables.pseudo_thread_id= thread_id;
2756
// mysql_load will use session->net to read the file
2757
session->net.vio = net->vio;
2759
Make sure the client does not get confused about the packet sequence
2761
session->net.pkt_nr = net->pkt_nr;
2764
It is safe to use tmp_list twice because we are not going to
2765
update it inside mysql_load().
2767
List<Item> tmp_list;
2768
if (mysql_load(session, &ex, &tables, field_list, tmp_list, tmp_list,
2769
handle_dup, ignore, net != 0))
2770
session->is_slave_error= 1;
2771
if (session->cuted_fields)
2773
/* log_pos is the position of the LOAD event in the master log */
2774
errmsg_printf(ERRMSG_LVL_WARN, _("Slave: load data infile on table '%s' at "
2775
"log position %s in log '%s' produced %ld "
2776
"warning(s). Default database: '%s'"),
2778
llstr(log_pos,llbuff), RPL_LOG_NAME,
2779
(ulong) session->cuted_fields,
2780
print_slave_db_safe(session->db));
2783
net->pkt_nr= session->net.pkt_nr;
2789
We will just ask the master to send us /dev/null if we do not
2790
want to load the data.
2791
TODO: this a bug - needs to be done in I/O thread
2794
skip_load_data_infile(net);
2798
session->net.vio = 0;
2799
const char *remember_db= session->db;
2800
pthread_mutex_lock(&LOCK_thread_count);
2801
session->catalog= 0;
2802
session->set_db(NULL, 0); /* will free the current database */
2804
session->query_length= 0;
2805
pthread_mutex_unlock(&LOCK_thread_count);
2806
close_thread_tables(session);
2808
if (session->is_slave_error)
2810
/* this err/sql_errno code is copy-paste from net_send_error() */
2813
if (session->is_error())
2815
err= session->main_da.message();
2816
sql_errno= session->main_da.sql_errno();
2820
sql_errno=ER_UNKNOWN_ERROR;
2823
rli->report(ERROR_LEVEL, sql_errno,
2824
_("Error '%s' running LOAD DATA INFILE on table '%s'. "
2825
"Default database: '%s'"),
2826
err, (char*)table_name, print_slave_db_safe(remember_db));
2827
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
2830
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
2832
if (session->is_fatal_error)
2835
snprintf(buf, sizeof(buf),
2836
_("Running LOAD DATA INFILE on table '%-.64s'."
2837
" Default database: '%-.64s'"),
2839
print_slave_db_safe(remember_db));
2841
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2842
ER(ER_SLAVE_FATAL_ERROR), buf);
2846
return ( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) );
2850
/**************************************************************************
2851
Rotate_log_event methods
2852
**************************************************************************/
2855
Rotate_log_event::pack_info()
2858
void Rotate_log_event::pack_info(Protocol *protocol)
2860
char buf1[256], buf[22];
2861
String tmp(buf1, sizeof(buf1), &my_charset_utf8_general_ci);
2863
tmp.append(new_log_ident.c_str(), ident_len);
2864
tmp.append(STRING_WITH_LEN(";pos="));
2865
tmp.append(llstr(pos,buf));
2866
protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
2871
Rotate_log_event::Rotate_log_event() (2 constructors)
2875
Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
2876
uint32_t ident_len_arg, uint64_t pos_arg,
2878
:Log_event(), pos(pos_arg),
2879
ident_len(ident_len_arg
2881
: strlen(new_log_ident_arg)),
2884
new_log_ident.assign(new_log_ident_arg, ident_len);
2889
Rotate_log_event::Rotate_log_event(const char* buf, uint32_t event_len,
2890
const Format_description_log_event* description_event)
2891
:Log_event(buf, description_event), flags(DUP_NAME)
2893
// The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
2894
uint8_t header_size= description_event->common_header_len;
2895
uint8_t post_header_len= description_event->post_header_len[ROTATE_EVENT-1];
2896
uint32_t ident_offset;
2897
if (event_len < header_size)
2900
pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
2901
ident_len = (uint)(event_len -
2902
(header_size+post_header_len));
2903
ident_offset = post_header_len;
2904
set_if_smaller(ident_len,FN_REFLEN-1);
2905
new_log_ident.assign(buf + ident_offset, ident_len);
2911
Rotate_log_event::write()
2914
bool Rotate_log_event::write(IO_CACHE* file)
2916
char buf[ROTATE_HEADER_LEN];
2917
int8store(buf + R_POS_OFFSET, pos);
2918
return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
2919
my_b_safe_write(file, (unsigned char*)buf, ROTATE_HEADER_LEN) ||
2920
my_b_safe_write(file, (const unsigned char*)new_log_ident.c_str(),
2926
Got a rotate log event from the master.
2928
This is mainly used so that we can later figure out the logname and
2929
position for the master.
2931
We can't rotate the slave's BINlog as this will cause infinitive rotations
2932
in a A -> B -> A setup.
2933
The NOTES below is a wrong comment which will disappear when 4.1 is merged.
2938
int Rotate_log_event::do_update_pos(Relay_log_info *rli)
2940
pthread_mutex_lock(&rli->data_lock);
2941
rli->event_relay_log_pos= my_b_tell(rli->cur_log);
2943
If we are in a transaction or in a group: the only normal case is
2944
when the I/O thread was copying a big transaction, then it was
2945
stopped and restarted: we have this in the relay log:
2953
In that case, we don't want to touch the coordinates which
2954
correspond to the beginning of the transaction. Starting from
2955
5.0.0, there also are some rotates from the slave itself, in the
2956
relay log, which shall not change the group positions.
2958
if ((server_id != ::server_id || rli->replicate_same_server_id) &&
2959
!rli->is_in_group())
2961
rli->group_master_log_name.assign(new_log_ident);
2962
rli->notify_group_master_log_name_update();
2963
rli->group_master_log_pos= pos;
2964
rli->group_relay_log_name.assign(rli->event_relay_log_name);
2965
rli->notify_group_relay_log_name_update();
2966
rli->group_relay_log_pos= rli->event_relay_log_pos;
2968
Reset session->options and sql_mode etc, because this could be the signal of
2969
a master's downgrade from 5.0 to 4.0.
2970
However, no need to reset description_event_for_exec: indeed, if the next
2971
master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
2972
master is 4.0 then the events are in the slave's format (conversion).
2974
set_slave_thread_options(session);
2975
session->variables.auto_increment_increment=
2976
session->variables.auto_increment_offset= 1;
2978
pthread_mutex_unlock(&rli->data_lock);
2979
pthread_cond_broadcast(&rli->data_cond);
2980
flush_relay_log_info(rli);
2986
Log_event::enum_skip_reason
2987
Rotate_log_event::do_shall_skip(Relay_log_info *rli)
2989
enum_skip_reason reason= Log_event::do_shall_skip(rli);
2992
case Log_event::EVENT_SKIP_NOT:
2993
case Log_event::EVENT_SKIP_COUNT:
2994
return Log_event::EVENT_SKIP_NOT;
2996
case Log_event::EVENT_SKIP_IGNORE:
2997
return Log_event::EVENT_SKIP_IGNORE;
3000
return Log_event::EVENT_SKIP_NOT; // To keep compiler happy
3004
/**************************************************************************
3005
Xid_log_event methods
3006
**************************************************************************/
3008
void Xid_log_event::pack_info(Protocol *protocol)
3010
char buf[128], *pos;
3011
pos= strcpy(buf, "COMMIT /* xid=")+14;
3012
pos= int64_t10_to_str(xid, pos, 10);
3013
pos= strcpy(pos, " */")+3;
3014
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3019
It's ok not to use int8store here,
3020
as long as XID::set(uint64_t) and
3021
XID::get_my_xid doesn't do it either.
3022
We don't care about actual values of xids as long as
3023
identical numbers compare identically
3027
Xid_log_event(const char* buf,
3028
const Format_description_log_event *description_event)
3029
:Log_event(buf, description_event)
3031
buf+= description_event->common_header_len;
3032
memcpy(&xid, buf, sizeof(xid));
3036
bool Xid_log_event::write(IO_CACHE* file)
3038
return write_header(file, sizeof(xid)) ||
3039
my_b_safe_write(file, (unsigned char*) &xid, sizeof(xid));
3043
int Xid_log_event::do_apply_event(const Relay_log_info *)
3045
return end_trans(session, COMMIT);
3048
Log_event::enum_skip_reason
3049
Xid_log_event::do_shall_skip(Relay_log_info *rli)
3051
if (rli->slave_skip_counter > 0) {
3052
session->options&= ~OPTION_BEGIN;
3053
return(Log_event::EVENT_SKIP_COUNT);
3055
return(Log_event::do_shall_skip(rli));
3059
/**************************************************************************
3060
Slave_log_event methods
3061
**************************************************************************/
3063
void Slave_log_event::pack_info(Protocol *protocol)
3065
ostringstream stream;
3066
stream << "host=" << master_host << ",port=" << master_port;
3067
stream << ",log=" << master_log << ",pos=" << master_pos;
3069
protocol->store(stream.str().c_str(), stream.str().length(),
3076
re-write this better without holding both locks at the same time
3078
Slave_log_event::Slave_log_event(Session* session_arg,
3079
Relay_log_info* rli)
3080
:Log_event(session_arg, 0, 0) , mem_pool(0), master_host(0)
3082
if (!rli->inited) // QQ When can this happen ?
3085
Master_info* mi = rli->mi;
3086
// TODO: re-write this better without holding both locks at the same time
3087
pthread_mutex_lock(&mi->data_lock);
3088
pthread_mutex_lock(&rli->data_lock);
3089
// on OOM, just do not initialize the structure and print the error
3090
if ((mem_pool = (char*)malloc(get_data_size() + 1)))
3092
master_host.assign(mi->getHostname());
3093
master_log.assign(rli->group_master_log_name);
3094
master_port = mi->getPort();
3095
master_pos = rli->group_master_log_pos;
3098
errmsg_printf(ERRMSG_LVL_ERROR, _("Out of memory while recording slave event"));
3099
pthread_mutex_unlock(&rli->data_lock);
3100
pthread_mutex_unlock(&mi->data_lock);
3105
Slave_log_event::~Slave_log_event()
3111
int Slave_log_event::get_data_size()
3113
return master_host.length() + master_log.length() + 1 + SL_MASTER_HOST_OFFSET;
3117
bool Slave_log_event::write(IO_CACHE* file)
3119
ulong event_length= get_data_size();
3120
int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
3121
int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
3122
// log and host are already there
3124
return (write_header(file, event_length) ||
3125
my_b_safe_write(file, (unsigned char*) mem_pool, event_length));
3129
void Slave_log_event::init_from_mem_pool()
3131
master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
3132
master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
3134
/* Assign these correctly */
3135
master_host.assign(mem_pool + SL_MASTER_HOST_OFFSET);
3136
master_log.assign();
3141
int Slave_log_event::do_apply_event(const Relay_log_info *)
3143
if (drizzle_bin_log.is_open())
3144
drizzle_bin_log.write(this);
3149
/**************************************************************************
3150
Stop_log_event methods
3151
**************************************************************************/
3154
The master stopped. We used to clean up all temporary tables but
3155
this is useless as, as the master has shut down properly, it has
3156
written all DROP TEMPORARY Table (prepared statements' deletion is
3157
TODO only when we binlog prep stmts). We used to clean up
3158
slave_load_tmpdir, but this is useless as it has been cleared at the
3159
end of LOAD DATA INFILE. So we have nothing to do here. The place
3160
were we must do this cleaning is in
3161
Start_log_event_v3::do_apply_event(), not here. Because if we come
3162
here, the master was sane.
3164
int Stop_log_event::do_update_pos(Relay_log_info *rli)
3167
We do not want to update master_log pos because we get a rotate event
3168
before stop, so by now group_master_log_name is set to the next log.
3169
If we updated it, we will have incorrect master coordinates and this
3170
could give false triggers in MASTER_POS_WAIT() that we have reached
3171
the target position when in fact we have not.
3173
if (session->options & OPTION_BEGIN)
3174
rli->inc_event_relay_log_pos();
3177
rli->inc_group_relay_log_pos(0);
3178
flush_relay_log_info(rli);
3184
/**************************************************************************
3185
Create_file_log_event methods
3186
**************************************************************************/
3189
Create_file_log_event ctor
3192
Create_file_log_event::
3193
Create_file_log_event(Session* session_arg, sql_exchange* ex,
3194
const char* db_arg, const char* table_name_arg,
3195
List<Item>& fields_arg, enum enum_duplicates handle_dup,
3197
unsigned char* block_arg, uint32_t block_len_arg, bool using_trans)
3198
:Load_log_event(session_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
3200
fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
3201
file_id(session_arg->file_id = drizzle_bin_log.next_file_id())
3203
sql_ex.force_new_format();
3209
Create_file_log_event::write_data_body()
3212
bool Create_file_log_event::write_data_body(IO_CACHE* file)
3215
if ((res= Load_log_event::write_data_body(file)) || fake_base)
3217
return (my_b_safe_write(file, (unsigned char*) "", 1) ||
3218
my_b_safe_write(file, (unsigned char*) block, block_len));
3223
Create_file_log_event::write_data_header()
3226
bool Create_file_log_event::write_data_header(IO_CACHE* file)
3229
unsigned char buf[CREATE_FILE_HEADER_LEN];
3230
if ((res= Load_log_event::write_data_header(file)) || fake_base)
3232
int4store(buf + CF_FILE_ID_OFFSET, file_id);
3233
return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
3238
Create_file_log_event::write_base()
3241
bool Create_file_log_event::write_base(IO_CACHE* file)
3244
fake_base= 1; // pretend we are Load event
3251
Create_file_log_event ctor
3254
Create_file_log_event::Create_file_log_event(const char* buf, uint32_t len,
3255
const Format_description_log_event* description_event)
3256
:Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0)
3258
uint32_t block_offset;
3259
uint32_t header_len= description_event->common_header_len;
3260
uint8_t load_header_len= description_event->post_header_len[LOAD_EVENT-1];
3261
uint8_t create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
3262
if (!(event_buf= (const char*)malloc(len)) ||
3263
memcpy((char *)event_buf, buf, len) ||
3264
copy_log_event(event_buf,len,
3265
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
3266
load_header_len + header_len :
3267
(fake_base ? (header_len+load_header_len) :
3268
(header_len+load_header_len) +
3269
create_file_header_len)),
3272
if (description_event->binlog_version!=1)
3274
file_id= uint4korr(buf +
3276
load_header_len + CF_FILE_ID_OFFSET);
3278
Note that it's ok to use get_data_size() below, because it is computed
3279
with values we have already read from this event (because we called
3280
copy_log_event()); we are not using slave's format info to decode
3281
master's format, we are really using master's format info.
3282
Anyway, both formats should be identical (except the common_header_len)
3283
as these Load events are not changed between 4.0 and 5.0 (as logging of
3284
LOAD DATA INFILE does not use Load_log_event in 5.0).
3286
The + 1 is for \0 terminating fname
3288
block_offset= (description_event->common_header_len +
3289
Load_log_event::get_data_size() +
3290
create_file_header_len + 1);
3291
if (len < block_offset)
3293
block = (unsigned char*)buf + block_offset;
3294
block_len = len - block_offset;
3298
sql_ex.force_new_format();
3299
inited_from_old = 1;
3306
Create_file_log_event::pack_info()
3309
void Create_file_log_event::pack_info(Protocol *protocol)
3311
char buf[NAME_LEN*2 + 30 + 21*2], *pos;
3312
pos= strcpy(buf, "db=")+3;
3313
memcpy(pos, db, db_len);
3314
pos= strcpy(pos + db_len, ";table=")+7;
3315
memcpy(pos, table_name, table_name_len);
3316
pos= strcpy(pos + table_name_len, ";file_id=")+9;
3317
pos= int10_to_str((long) file_id, pos, 10);
3318
pos= strcpy(pos, ";block_len=")+11;
3319
pos= int10_to_str((long) block_len, pos, 10);
3320
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3325
Create_file_log_event::do_apply_event()
3328
int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
3330
char proc_info[17+FN_REFLEN+10], *fname_buf;
3336
memset(&file, 0, sizeof(file));
3337
fname_buf= strcpy(proc_info, "Making temp file ")+17;
3338
ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
3339
session->set_proc_info(proc_info);
3340
my_delete(fname_buf, MYF(0)); // old copy may exist already
3341
if ((fd= my_create(fname_buf, CREATE_MODE,
3343
MYF(MY_WME))) < 0 ||
3344
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
3345
MYF(MY_WME|MY_NABP)))
3347
rli->report(ERROR_LEVEL, my_errno,
3348
_("Error in Create_file event: could not open file '%s'"),
3353
// a trick to avoid allocating another buffer
3355
fname_len= (uint) ((strcpy(ext, ".data") + 5) - fname);
3356
if (write_base(&file))
3358
strcpy(ext, ".info"); // to have it right in the error message
3359
rli->report(ERROR_LEVEL, my_errno,
3360
_("Error in Create_file event: could not write to file '%s'"),
3364
end_io_cache(&file);
3365
my_close(fd, MYF(0));
3367
// fname_buf now already has .data, not .info, because we did our trick
3368
my_delete(fname_buf, MYF(0)); // old copy may exist already
3369
if ((fd= my_create(fname_buf, CREATE_MODE,
3373
rli->report(ERROR_LEVEL, my_errno,
3374
_("Error in Create_file event: could not open file '%s'"),
3378
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
3380
rli->report(ERROR_LEVEL, my_errno,
3381
_("Error in Create_file event: write to '%s' failed"),
3385
error=0; // Everything is ok
3389
end_io_cache(&file);
3391
my_close(fd, MYF(0));
3392
session->set_proc_info(0);
3397
/**************************************************************************
3398
Append_block_log_event methods
3399
**************************************************************************/
3402
Append_block_log_event ctor
3405
Append_block_log_event::Append_block_log_event(Session *session_arg,
3407
unsigned char *block_arg,
3408
uint32_t block_len_arg,
3410
:Log_event(session_arg,0, using_trans), block(block_arg),
3411
block_len(block_len_arg), file_id(session_arg->file_id), db(db_arg)
3417
Append_block_log_event ctor
3420
Append_block_log_event::Append_block_log_event(const char* buf, uint32_t len,
3421
const Format_description_log_event* description_event)
3422
:Log_event(buf, description_event),block(0)
3424
uint8_t common_header_len= description_event->common_header_len;
3425
uint8_t append_block_header_len=
3426
description_event->post_header_len[APPEND_BLOCK_EVENT-1];
3427
uint32_t total_header_len= common_header_len+append_block_header_len;
3428
if (len < total_header_len)
3430
file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
3431
block= (unsigned char*)buf + total_header_len;
3432
block_len= len - total_header_len;
3438
Append_block_log_event::write()
3441
bool Append_block_log_event::write(IO_CACHE* file)
3443
unsigned char buf[APPEND_BLOCK_HEADER_LEN];
3444
int4store(buf + AB_FILE_ID_OFFSET, file_id);
3445
return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
3446
my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
3447
my_b_safe_write(file, (unsigned char*) block, block_len));
3452
Append_block_log_event::pack_info()
3455
void Append_block_log_event::pack_info(Protocol *protocol)
3459
length= (uint) sprintf(buf, ";file_id=%u;block_len=%u", file_id,
3461
protocol->store(buf, length, &my_charset_bin);
3466
Append_block_log_event::get_create_or_append()
3469
int Append_block_log_event::get_create_or_append() const
3471
return 0; /* append to the file, fail if not exists */
3475
Append_block_log_event::do_apply_event()
3478
int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
3480
char proc_info[17+FN_REFLEN+10], *fname= proc_info+17;
3484
fname= strcpy(proc_info, "Making temp file ")+17;
3485
slave_load_file_stem(fname, file_id, server_id, ".data");
3486
session->set_proc_info(proc_info);
3487
if (get_create_or_append())
3489
my_delete(fname, MYF(0)); // old copy may exist already
3490
if ((fd= my_create(fname, CREATE_MODE,
3494
rli->report(ERROR_LEVEL, my_errno,
3495
_("Error in %s event: could not create file '%s'"),
3496
get_type_str(), fname);
3500
else if ((fd = my_open(fname, O_WRONLY | O_APPEND,
3503
rli->report(ERROR_LEVEL, my_errno,
3504
_("Error in %s event: could not open file '%s'"),
3505
get_type_str(), fname);
3508
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
3510
rli->report(ERROR_LEVEL, my_errno,
3511
_("Error in %s event: write to '%s' failed"),
3512
get_type_str(), fname);
3519
my_close(fd, MYF(0));
3520
session->set_proc_info(0);
3525
/**************************************************************************
3526
Delete_file_log_event methods
3527
**************************************************************************/
3530
Delete_file_log_event ctor
3533
Delete_file_log_event::Delete_file_log_event(Session *session_arg, const char* db_arg,
3535
:Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
3540
Delete_file_log_event ctor
3543
Delete_file_log_event::Delete_file_log_event(const char* buf, uint32_t len,
3544
const Format_description_log_event* description_event)
3545
:Log_event(buf, description_event),file_id(0)
3547
uint8_t common_header_len= description_event->common_header_len;
3548
uint8_t delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1];
3549
if (len < (uint)(common_header_len + delete_file_header_len))
3551
file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET);
3556
Delete_file_log_event::write()
3559
bool Delete_file_log_event::write(IO_CACHE* file)
3561
unsigned char buf[DELETE_FILE_HEADER_LEN];
3562
int4store(buf + DF_FILE_ID_OFFSET, file_id);
3563
return (write_header(file, sizeof(buf)) ||
3564
my_b_safe_write(file, buf, sizeof(buf)));
3569
Delete_file_log_event::pack_info()
3572
void Delete_file_log_event::pack_info(Protocol *protocol)
3576
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
3577
protocol->store(buf, (int32_t) length, &my_charset_bin);
3581
Delete_file_log_event::do_apply_event()
3584
int Delete_file_log_event::do_apply_event(const Relay_log_info *)
3586
char fname[FN_REFLEN+10];
3587
char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
3588
(void) my_delete(fname, MYF(MY_WME));
3589
strcpy(ext, ".info");
3590
(void) my_delete(fname, MYF(MY_WME));
3595
/**************************************************************************
3596
Execute_load_log_event methods
3597
**************************************************************************/
3600
Execute_load_log_event ctor
3603
Execute_load_log_event::Execute_load_log_event(Session *session_arg,
3606
:Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
3612
Execute_load_log_event ctor
3615
Execute_load_log_event::Execute_load_log_event(const char* buf, uint32_t len,
3616
const Format_description_log_event* description_event)
3617
:Log_event(buf, description_event), file_id(0)
3619
uint8_t common_header_len= description_event->common_header_len;
3620
uint8_t exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1];
3621
if (len < (uint)(common_header_len+exec_load_header_len))
3623
file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET);
3628
Execute_load_log_event::write()
3631
bool Execute_load_log_event::write(IO_CACHE* file)
3633
unsigned char buf[EXEC_LOAD_HEADER_LEN];
3634
int4store(buf + EL_FILE_ID_OFFSET, file_id);
3635
return (write_header(file, sizeof(buf)) ||
3636
my_b_safe_write(file, buf, sizeof(buf)));
3641
Execute_load_log_event::pack_info()
3644
void Execute_load_log_event::pack_info(Protocol *protocol)
3648
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
3649
protocol->store(buf, (int32_t) length, &my_charset_bin);
3654
Execute_load_log_event::do_apply_event()
3657
int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
3659
char fname[FN_REFLEN+10];
3664
Load_log_event *lev= 0;
3666
ext= slave_load_file_stem(fname, file_id, server_id, ".info");
3667
if ((fd = my_open(fname, O_RDONLY,
3668
MYF(MY_WME))) < 0 ||
3669
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
3670
MYF(MY_WME|MY_NABP)))
3672
rli->report(ERROR_LEVEL, my_errno,
3673
_("Error in Exec_load event: could not open file '%s'"),
3677
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
3678
(pthread_mutex_t*)0,
3679
rli->relay_log.description_event_for_exec)) ||
3680
lev->get_type_code() != NEW_LOAD_EVENT)
3682
rli->report(ERROR_LEVEL, 0,
3683
_("Error in Exec_load event: "
3684
"file '%s' appears corrupted"),
3689
lev->session = session;
3691
lev->do_apply_event should use rli only for errors i.e. should
3692
not advance rli's position.
3694
lev->do_apply_event is the place where the table is loaded (it
3695
calls mysql_load()).
3698
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
3699
if (lev->do_apply_event(0,rli,1))
3702
We want to indicate the name of the file that could not be loaded
3704
But as we are here we are sure the error is in rli->last_slave_error and
3705
rli->last_slave_errno (example of error: duplicate entry for key), so we
3706
don't want to overwrite it with the filename.
3707
What we want instead is add the filename to the current error message.
3709
char *tmp= strdup(rli->last_error().message);
3712
rli->report(ERROR_LEVEL, rli->last_error().number,
3713
_("%s. Failed executing load from '%s'"),
3720
We have an open file descriptor to the .info file; we need to close it
3721
or Windows will refuse to delete the file in my_delete().
3725
my_close(fd, MYF(0));
3726
end_io_cache(&file);
3729
(void) my_delete(fname, MYF(MY_WME));
3730
memcpy(ext, ".data", 6);
3731
(void) my_delete(fname, MYF(MY_WME));
3738
my_close(fd, MYF(0));
3739
end_io_cache(&file);
3745
/**************************************************************************
3746
Begin_load_query_log_event methods
3747
**************************************************************************/
3749
Begin_load_query_log_event::
3750
Begin_load_query_log_event(Session* session_arg, const char* db_arg, unsigned char* block_arg,
3751
uint32_t block_len_arg, bool using_trans)
3752
:Append_block_log_event(session_arg, db_arg, block_arg, block_len_arg,
3755
file_id= session_arg->file_id= drizzle_bin_log.next_file_id();
3759
Begin_load_query_log_event::
3760
Begin_load_query_log_event(const char* buf, uint32_t len,
3761
const Format_description_log_event* desc_event)
3762
:Append_block_log_event(buf, len, desc_event)
3767
int Begin_load_query_log_event::get_create_or_append() const
3769
return 1; /* create the file */
3773
Log_event::enum_skip_reason
3774
Begin_load_query_log_event::do_shall_skip(Relay_log_info *rli)
3777
If the slave skip counter is 1, then we should not start executing
3780
return continue_group(rli);
3784
/**************************************************************************
3785
Execute_load_query_log_event methods
3786
**************************************************************************/
3789
Execute_load_query_log_event::
3790
Execute_load_query_log_event(Session *session_arg, const char* query_arg,
3791
ulong query_length_arg, uint32_t fn_pos_start_arg,
3792
uint32_t fn_pos_end_arg,
3793
enum_load_dup_handling dup_handling_arg,
3794
bool using_trans, bool suppress_use,
3795
Session::killed_state killed_err_arg):
3796
Query_log_event(session_arg, query_arg, query_length_arg, using_trans,
3797
suppress_use, killed_err_arg),
3798
file_id(session_arg->file_id), fn_pos_start(fn_pos_start_arg),
3799
fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
3804
Execute_load_query_log_event::
3805
Execute_load_query_log_event(const char* buf, uint32_t event_len,
3806
const Format_description_log_event* desc_event):
3807
Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT),
3808
file_id(0), fn_pos_start(0), fn_pos_end(0)
3810
if (!Query_log_event::is_valid())
3813
buf+= desc_event->common_header_len;
3815
fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET);
3816
fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET);
3817
dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET));
3819
if (fn_pos_start > q_len || fn_pos_end > q_len ||
3820
dup_handling > LOAD_DUP_REPLACE)
3823
file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET);
3827
ulong Execute_load_query_log_event::get_post_header_size_for_derived()
3829
return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN;
3834
Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
3836
unsigned char buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
3837
int4store(buf, file_id);
3838
int4store(buf + 4, fn_pos_start);
3839
int4store(buf + 4 + 4, fn_pos_end);
3840
*(buf + 4 + 4 + 4)= (unsigned char) dup_handling;
3841
return my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
3845
void Execute_load_query_log_event::pack_info(Protocol *protocol)
3848
if (!(buf= (char*) malloc(9 + db_len + q_len + 10 + 21)))
3853
pos= strcpy(buf, "use `")+5;
3854
memcpy(pos, db, db_len);
3855
pos= strcpy(pos+db_len, "`; ")+3;
3859
memcpy(pos, query, q_len);
3862
pos= strcpy(pos, " ;file_id=")+10;
3863
pos= int10_to_str((long) file_id, pos, 10);
3864
protocol->store(buf, pos-buf, &my_charset_bin);
3870
Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli)
3878
buf= (char*) malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
3879
(FN_REFLEN + 10) + 10 + 8 + 5);
3881
/* Replace filename and LOCAL keyword in query before executing it */
3884
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
3885
ER(ER_SLAVE_FATAL_ERROR),
3886
_("Not enough memory"));
3891
memcpy(p, query, fn_pos_start);
3893
fname= (p= strncpy(p, STRING_WITH_LEN(" INFILE \'")) + 9);
3894
p= slave_load_file_stem(p, file_id, server_id, ".data");
3895
fname_end= p= strchr(p, '\0'); // Safer than p=p+5
3897
switch (dup_handling) {
3898
case LOAD_DUP_IGNORE:
3899
p= strncpy(p, STRING_WITH_LEN(" IGNORE")) + 7;
3901
case LOAD_DUP_REPLACE:
3902
p= strncpy(p, STRING_WITH_LEN(" REPLACE")) + 8;
3905
/* Ordinary load data */
3908
size_t end_len = q_len-fn_pos_end;
3909
p= strncpy(p, STRING_WITH_LEN(" INTO")) + 5;
3910
p= strncpy(p, query+fn_pos_end, end_len);
3913
error= Query_log_event::do_apply_event(rli, buf, p-buf);
3915
/* Forging file name for deletion in same buffer */
3919
If there was an error the slave is going to stop, leave the
3920
file so that we can re-execute this event at START SLAVE.
3923
(void) my_delete(fname, MYF(MY_WME));
3930
/**************************************************************************
3932
**************************************************************************/
3935
sql_ex_info::write_data()
3938
bool sql_ex_info::write_data(IO_CACHE* file)
3942
return (write_str(file, field_term, (uint) field_term_len) ||
3943
write_str(file, enclosed, (uint) enclosed_len) ||
3944
write_str(file, line_term, (uint) line_term_len) ||
3945
write_str(file, line_start, (uint) line_start_len) ||
3946
write_str(file, escaped, (uint) escaped_len) ||
3947
my_b_safe_write(file,(unsigned char*) &opt_flags,1));
3959
const char *sql_ex_info::init(const char *buf, const char *buf_end,
3960
bool use_new_format)
3962
cached_new_format = use_new_format;
3967
The code below assumes that buf will not disappear from
3968
under our feet during the lifetime of the event. This assumption
3969
holds true in the slave thread if the log is in new format, but is not
3970
the case when we have old format because we will be reusing net buffer
3971
to read the actual file before we write out the Create_file event.
3973
if (read_str(&buf, buf_end, &field_term, &field_term_len) ||
3974
read_str(&buf, buf_end, &enclosed, &enclosed_len) ||
3975
read_str(&buf, buf_end, &line_term, &line_term_len) ||
3976
read_str(&buf, buf_end, &line_start, &line_start_len) ||
3977
read_str(&buf, buf_end, &escaped, &escaped_len))
3983
field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
3984
field_term = buf++; // Use first byte in string
3990
empty_flags= *buf++;
3991
if (empty_flags & FIELD_TERM_EMPTY)
3993
if (empty_flags & ENCLOSED_EMPTY)
3995
if (empty_flags & LINE_TERM_EMPTY)
3997
if (empty_flags & LINE_START_EMPTY)
3999
if (empty_flags & ESCAPED_EMPTY)
4006
/**************************************************************************
4007
Rows_log_event member functions
4008
**************************************************************************/
4010
Rows_log_event::Rows_log_event(Session *session_arg, Table *tbl_arg, ulong tid,
4011
MY_BITMAP const *cols, bool is_transactional)
4012
: Log_event(session_arg, 0, is_transactional),
4016
m_width(tbl_arg ? tbl_arg->s->fields : 1),
4017
m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
4018
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4021
We allow a special form of dummy event when the table, and cols
4022
are null and the table id is UINT32_MAX. This is a temporary
4023
solution, to be able to terminate a started statement in the
4024
binary log: the extraneous events will be removed in the future.
4026
assert((tbl_arg && tbl_arg->s && tid != UINT32_MAX) || (!tbl_arg && !cols && tid == UINT32_MAX));
4028
if (session_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
4029
set_flags(NO_FOREIGN_KEY_CHECKS_F);
4030
if (session_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
4031
set_flags(RELAXED_UNIQUE_CHECKS_F);
4032
/* if bitmap_init fails, caught in is_valid() */
4033
if (likely(!bitmap_init(&m_cols,
4034
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4038
/* Cols can be zero if this is a dummy binrows event */
4039
if (likely(cols != NULL))
4041
memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
4042
create_last_word_mask(&m_cols);
4047
// Needed because bitmap_init() does not set it to null on failure
4053
Rows_log_event::Rows_log_event(const char *buf, uint32_t event_len,
4054
Log_event_type event_type,
4055
const Format_description_log_event
4057
: Log_event(buf, description_event),
4060
m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
4061
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4063
uint8_t const common_header_len= description_event->common_header_len;
4064
uint8_t const post_header_len= description_event->post_header_len[event_type-1];
4066
const char *post_start= buf + common_header_len;
4067
post_start+= RW_MAPID_OFFSET;
4068
if (post_header_len == 6)
4070
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
4071
m_table_id= uint4korr(post_start);
4076
m_table_id= (ulong) uint6korr(post_start);
4077
post_start+= RW_FLAGS_OFFSET;
4080
m_flags= uint2korr(post_start);
4082
unsigned char const *const var_start=
4083
(const unsigned char *)buf + common_header_len + post_header_len;
4084
unsigned char const *const ptr_width= var_start;
4085
unsigned char *ptr_after_width= (unsigned char*) ptr_width;
4086
m_width = net_field_length(&ptr_after_width);
4087
/* if bitmap_init fails, catched in is_valid() */
4088
if (likely(!bitmap_init(&m_cols,
4089
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4093
memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
4094
create_last_word_mask(&m_cols);
4095
ptr_after_width+= (m_width + 7) / 8;
4099
// Needed because bitmap_init() does not set it to null on failure
4100
m_cols.bitmap= NULL;
4104
m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
4106
if (event_type == UPDATE_ROWS_EVENT)
4108
/* if bitmap_init fails, caught in is_valid() */
4109
if (likely(!bitmap_init(&m_cols_ai,
4110
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
4114
memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
4115
create_last_word_mask(&m_cols_ai);
4116
ptr_after_width+= (m_width + 7) / 8;
4120
// Needed because bitmap_init() does not set it to null on failure
4121
m_cols_ai.bitmap= 0;
4126
const unsigned char* const ptr_rows_data= (const unsigned char*) ptr_after_width;
4128
size_t const data_size= event_len - (ptr_rows_data - (const unsigned char *) buf);
4130
m_rows_buf= (unsigned char*) malloc(data_size);
4131
if (likely((bool)m_rows_buf))
4133
m_curr_row= m_rows_buf;
4134
m_rows_end= m_rows_buf + data_size;
4135
m_rows_cur= m_rows_end;
4136
memcpy(m_rows_buf, ptr_rows_data, data_size);
4139
m_cols.bitmap= 0; // to not free it
4144
Rows_log_event::~Rows_log_event()
4146
if (m_cols.bitmap == m_bitbuf) // no malloc happened
4147
m_cols.bitmap= 0; // so no free in bitmap_free
4148
bitmap_free(&m_cols); // To pair with bitmap_init().
4149
free((unsigned char*)m_rows_buf);
4152
int Rows_log_event::get_data_size()
4154
int const type_code= get_type_code();
4156
unsigned char buf[sizeof(m_width)+1];
4157
unsigned char *end= net_store_length(buf, (m_width + 7) / 8);
4159
int data_size= ROWS_HEADER_LEN;
4160
data_size+= no_bytes_in_map(&m_cols);
4161
data_size+= end - buf;
4163
if (type_code == UPDATE_ROWS_EVENT)
4164
data_size+= no_bytes_in_map(&m_cols_ai);
4166
data_size+= (m_rows_cur - m_rows_buf);
4171
int Rows_log_event::do_add_row_data(unsigned char *row_data, size_t length)
4174
When the table has a primary key, we would probably want, by default, to
4175
log only the primary key value instead of the entire "before image". This
4176
would save binlog space. TODO
4180
If length is zero, there is nothing to write, so we just
4181
return. Note that this is not an optimization, since calling
4182
realloc() with size 0 means free().
4190
assert(m_rows_buf <= m_rows_cur);
4191
assert(!m_rows_buf || (m_rows_end && m_rows_buf <= m_rows_end));
4192
assert(m_rows_cur <= m_rows_end);
4194
/* The cast will always work since m_rows_cur <= m_rows_end */
4195
if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
4197
size_t const block_size= 1024;
4198
const size_t cur_size= m_rows_cur - m_rows_buf;
4199
const size_t new_alloc=
4200
block_size * ((cur_size + length + block_size - 1) / block_size);
4202
unsigned char* new_buf= (unsigned char*)realloc(m_rows_buf, new_alloc);
4203
if (unlikely(!new_buf))
4204
return(HA_ERR_OUT_OF_MEM);
4206
/* If the memory moved, we need to move the pointers */
4207
if (new_buf != m_rows_buf)
4209
m_rows_buf= new_buf;
4210
m_rows_cur= m_rows_buf + cur_size;
4214
The end pointer should always be changed to point to the end of
4215
the allocated memory.
4217
m_rows_end= m_rows_buf + new_alloc;
4220
assert(m_rows_cur + length <= m_rows_end);
4221
memcpy(m_rows_cur, row_data, length);
4222
m_rows_cur+= length;
4227
int Rows_log_event::do_apply_event(Relay_log_info const *rli)
4231
If m_table_id == UINT32_MAX, then we have a dummy event that does not
4232
contain any data. In that case, we just remove all tables in the
4233
tables_to_lock list, close the thread tables, and return with
4236
if (m_table_id == UINT32_MAX)
4239
This one is supposed to be set: just an extra check so that
4240
nothing strange has happened.
4242
assert(get_flags(STMT_END_F));
4244
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4245
close_thread_tables(session);
4246
session->clear_error();
4251
'session' has been set by exec_relay_log_event(), just before calling
4252
do_apply_event(). We still check here to prevent future coding
4255
assert(rli->sql_session == session);
4258
If there is no locks taken, this is the first binrow event seen
4259
after the table map events. We should then lock all the tables
4260
used in the transaction and proceed with execution of the actual
4265
bool need_reopen= 1; /* To execute the first lap of the loop below */
4268
lock_tables() reads the contents of session->lex, so they must be
4269
initialized. Contrary to in
4270
Table_map_log_event::do_apply_event() we don't call
4271
mysql_init_query() as that may reset the binlog format.
4276
There are a few flags that are replicated with each row event.
4277
Make sure to set/clear them before executing the main body of
4280
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
4281
session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
4283
session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
4285
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
4286
session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
4288
session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
4289
/* A small test to verify that objects have consistent types */
4290
assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
4293
while ((error= lock_tables(session, rli->tables_to_lock,
4294
rli->tables_to_lock_count, &need_reopen)))
4298
if (session->is_slave_error || session->is_fatal_error)
4301
Error reporting borrowed from Query_log_event with many excessive
4302
simplifications (we don't honour --slave-skip-errors)
4304
uint32_t actual_error= session->main_da.sql_errno();
4305
rli->report(ERROR_LEVEL, actual_error,
4306
_("Error '%s' in %s event: when locking tables"),
4308
? session->main_da.message()
4309
: _("unexpected success or fatal error")),
4311
session->is_fatal_error= 1;
4315
rli->report(ERROR_LEVEL, error,
4316
_("Error in %s event: when locking tables"),
4319
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4323
TableList *tables= rli->tables_to_lock;
4324
close_tables_for_reopen(session, &tables);
4326
uint32_t tables_count= rli->tables_to_lock_count;
4327
if ((error= open_tables(session, &tables, &tables_count, 0)))
4329
if (session->is_slave_error || session->is_fatal_error)
4332
Error reporting borrowed from Query_log_event with many excessive
4333
simplifications (we don't honour --slave-skip-errors)
4335
uint32_t actual_error= session->main_da.sql_errno();
4336
rli->report(ERROR_LEVEL, actual_error,
4337
_("Error '%s' on reopening tables"),
4339
? session->main_da.message()
4340
: _("unexpected success or fatal error")));
4341
session->is_slave_error= 1;
4343
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4349
When the open and locking succeeded, we check all tables to
4350
ensure that they still have the correct type.
4352
We can use a down cast here since we know that every table added
4353
to the tables_to_lock is a RPL_TableList.
4357
RPL_TableList *ptr= rli->tables_to_lock;
4358
for ( ; ptr ; ptr= static_cast<RPL_TableList*>(ptr->next_global))
4360
if (ptr->m_tabledef.compatible_with(rli, ptr->table))
4362
mysql_unlock_tables(session, session->lock);
4364
session->is_slave_error= 1;
4365
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4366
return(ERR_BAD_TABLE_DEF);
4372
... and then we add all the tables to the table map and remove
4373
them from tables to lock.
4375
We also invalidate the query cache for all the tables, since
4376
they will now be changed.
4378
TODO [/Matz]: Maybe the query cache should not be invalidated
4379
here? It might be that a table is not changed, even though it
4380
was locked for the statement. We do know that each
4381
Rows_log_event contain at least one row, so after processing one
4382
Rows_log_event, we can invalidate the query cache for the
4385
for (TableList *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
4387
const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
4393
m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
4398
table == NULL means that this table should not be replicated
4399
(this was set up by Table_map_log_event::do_apply_event()
4400
which tested replicate-* rules).
4404
It's not needed to set_time() but
4405
1) it continues the property that "Time" in SHOW PROCESSLIST shows how
4406
much slave is behind
4407
2) it will be needed when we allow replication from a table with no
4408
TIMESTAMP column to a table with one.
4409
So we call set_time(), like in SBR. Presently it changes nothing.
4411
session->set_time((time_t)when);
4413
There are a few flags that are replicated with each row event.
4414
Make sure to set/clear them before executing the main body of
4417
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
4418
session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
4420
session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
4422
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
4423
session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
4425
session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
4427
if (slave_allow_batching)
4428
session->options|= OPTION_ALLOW_BATCH;
4430
session->options&= ~OPTION_ALLOW_BATCH;
4432
/* A small test to verify that objects have consistent types */
4433
assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
4436
Now we are in a statement and will stay in a statement until we
4439
We set this flag here, before actually applying any rows, in
4440
case the SQL thread is stopped and we need to detect that we're
4441
inside a statement and halting abruptly might cause problems
4444
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
4446
if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
4447
set_flags(COMPLETE_ROWS_F);
4450
Set tables write and read sets.
4452
Read_set contains all slave columns (in case we are going to fetch
4453
a complete record from slave)
4455
Write_set equals the m_cols bitmap sent from master but it can be
4456
longer if slave has extra columns.
4459
bitmap_set_all(table->read_set);
4460
bitmap_set_all(table->write_set);
4461
if (!get_flags(COMPLETE_ROWS_F))
4462
bitmap_intersect(table->write_set,&m_cols);
4464
this->slave_exec_mode= slave_exec_mode_options; // fix the mode
4466
// Do event specific preparations
4467
error= do_before_row_operations(rli);
4469
// row processing loop
4471
while (error == 0 && m_curr_row < m_rows_end)
4473
/* in_use can have been set to NULL in close_tables_for_reopen */
4474
Session* old_session= table->in_use;
4476
table->in_use= session;
4478
error= do_exec_row(rli);
4480
table->in_use = old_session;
4486
The following list of "idempotent" errors
4487
means that an error from the list might happen
4488
because of idempotent (more than once)
4489
applying of a binlog file.
4490
Notice, that binlog has a ddl operation its
4491
second applying may cause
4493
case HA_ERR_TABLE_DEF_CHANGED:
4494
case HA_ERR_CANNOT_ADD_FOREIGN:
4496
which are not included into to the list.
4498
case HA_ERR_RECORD_CHANGED:
4499
case HA_ERR_RECORD_DELETED:
4500
case HA_ERR_KEY_NOT_FOUND:
4501
case HA_ERR_END_OF_FILE:
4502
case HA_ERR_FOUND_DUPP_KEY:
4503
case HA_ERR_FOUND_DUPP_UNIQUE:
4504
case HA_ERR_FOREIGN_DUPLICATE_KEY:
4505
case HA_ERR_NO_REFERENCED_ROW:
4506
case HA_ERR_ROW_IS_REFERENCED:
4507
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
4509
if (global_system_variables.log_warnings)
4510
slave_rows_error_report(WARNING_LEVEL, error, rli, session, table,
4512
RPL_LOG_NAME, (ulong) log_pos);
4518
session->is_slave_error= 1;
4523
If m_curr_row_end was not set during event execution (e.g., because
4524
of errors) we can't proceed to the next row. If the error is transient
4525
(i.e., error==0 at this point) we must call unpack_current_row() to set
4528
if (!m_curr_row_end && !error)
4529
unpack_current_row(rli, &m_cols);
4531
// at this moment m_curr_row_end should be set
4532
assert(error || m_curr_row_end != NULL);
4533
assert(error || m_curr_row < m_curr_row_end);
4534
assert(error || m_curr_row_end <= m_rows_end);
4536
m_curr_row= m_curr_row_end;
4538
} // row processing loop
4540
error= do_after_row_operations(rli, error);
4543
session->options|= OPTION_KEEP_LOG;
4548
We need to delay this clear until here bacause unpack_current_row() uses
4549
master-side table definitions stored in rli.
4551
if (rli->tables_to_lock && get_flags(STMT_END_F))
4552
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4553
/* reset OPTION_ALLOW_BATCH as not affect later events */
4554
session->options&= ~OPTION_ALLOW_BATCH;
4557
{ /* error has occured during the transaction */
4558
slave_rows_error_report(ERROR_LEVEL, error, rli, session, table,
4559
get_type_str(), RPL_LOG_NAME, (ulong) log_pos);
4564
If one day we honour --skip-slave-errors in row-based replication, and
4565
the error should be skipped, then we would clear mappings, rollback,
4566
close tables, but the slave SQL thread would not stop and then may
4567
assume the mapping is still available, the tables are still open...
4568
So then we should clear mappings/rollback/close here only if this is a
4570
For now we code, knowing that error is not skippable and so slave SQL
4571
thread is certainly going to stop.
4572
rollback at the caller along with sbr.
4574
const_cast<Relay_log_info*>(rli)->cleanup_context(session, error);
4575
session->is_slave_error= 1;
4580
This code would ideally be placed in do_update_pos() instead, but
4581
since we have no access to table there, we do the setting of
4582
last_event_start_time here instead.
4584
if (table && (table->s->primary_key == MAX_KEY) &&
4585
!cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS)
4588
------------ Temporary fix until WL#2975 is implemented ---------
4590
This event is not the last one (no STMT_END_F). If we stop now
4591
(in case of terminate_slave_thread()), how will we restart? We
4592
have to restart from Table_map_log_event, but as this table is
4593
not transactional, the rows already inserted will still be
4594
present, and idempotency is not guaranteed (no PK) so we risk
4595
that repeating leads to double insert. So we desperately try to
4596
continue, hope we'll eventually leave this buggy situation (by
4597
executing the final Rows_log_event). If we are in a hopeless
4598
wait (reached end of last relay log and nothing gets appended
4599
there), we timeout after one minute, and notify DBA about the
4600
problem. When WL#2975 is implemented, just remove the member
4601
Relay_log_info::last_event_start_time and all its occurrences.
4605
/* don't trust time() all the time */
4606
if (t == (time_t)-1)
4608
const_cast<Relay_log_info*>(rli)->last_event_start_time= time(0);
4614
Log_event::enum_skip_reason
4615
Rows_log_event::do_shall_skip(Relay_log_info *rli)
4618
If the slave skip counter is 1 and this event does not end a
4619
statement, then we should not start executing on the next event.
4620
Otherwise, we defer the decision to the normal skipping logic.
4622
if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
4623
return Log_event::EVENT_SKIP_IGNORE;
4625
return Log_event::do_shall_skip(rli);
4629
Rows_log_event::do_update_pos(Relay_log_info *rli)
4633
if (get_flags(STMT_END_F))
4636
If this event is not in a transaction, the call below will, if some
4637
transactional storage engines are involved, commit the statement into
4638
them and flush the pending event to binlog.
4639
If this event is in a transaction, the call will do nothing, but a
4640
Xid_log_event will come next which will, if some transactional engines
4641
are involved, commit the transaction and flush the pending event to the
4644
error= ha_autocommit_or_rollback(session, 0);
4647
Now what if this is not a transactional engine? we still need to
4648
flush the pending event to the binlog; we did it with
4649
session->binlog_flush_pending_rows_event(). Note that we imitate
4650
what is done for real queries: a call to
4651
ha_autocommit_or_rollback() (sometimes only if involves a
4652
transactional engine), and a call to be sure to have the pending
4656
rli->cleanup_context(session, 0);
4660
Indicate that a statement is finished.
4661
Step the group log position if we are not in a transaction,
4662
otherwise increase the event log position.
4664
rli->stmt_done(log_pos, when);
4667
Clear any errors pushed in session->net.last_err* if for example "no key
4668
found" (as this is allowed). This is a safety measure; apparently
4669
those errors (e.g. when executing a Delete_rows_log_event of a
4670
non-existing row, like in rpl_row_mystery22.test,
4671
session->net.last_error = "Can't find record in 't1'" and last_errno=1032)
4672
do not become visible. We still prefer to wipe them out.
4674
session->clear_error();
4677
rli->report(ERROR_LEVEL, error,
4678
_("Error in %s event: commit of row events failed, "
4680
get_type_str(), m_table->s->db.str,
4681
m_table->s->table_name.str);
4685
rli->inc_event_relay_log_pos();
4691
bool Rows_log_event::write_data_header(IO_CACHE *file)
4693
unsigned char buf[ROWS_HEADER_LEN]; // No need to init the buffer
4694
assert(m_table_id != UINT32_MAX);
4695
int6store(buf + RW_MAPID_OFFSET, (uint64_t)m_table_id);
4696
int2store(buf + RW_FLAGS_OFFSET, m_flags);
4697
return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
4700
bool Rows_log_event::write_data_body(IO_CACHE*file)
4703
Note that this should be the number of *bits*, not the number of
4706
unsigned char sbuf[sizeof(m_width)];
4707
my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
4709
unsigned char *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
4710
assert(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
4712
res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
4714
res= res || my_b_safe_write(file, (unsigned char*) m_cols.bitmap,
4715
no_bytes_in_map(&m_cols));
4717
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
4719
if (get_type_code() == UPDATE_ROWS_EVENT)
4721
res= res || my_b_safe_write(file, (unsigned char*) m_cols_ai.bitmap,
4722
no_bytes_in_map(&m_cols_ai));
4724
res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
4731
void Rows_log_event::pack_info(Protocol *protocol)
4734
char const *const flagstr=
4735
get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
4736
size_t bytes= snprintf(buf, sizeof(buf),
4737
"table_id: %lu%s", m_table_id, flagstr);
4738
protocol->store(buf, bytes, &my_charset_bin);
4742
/**************************************************************************
4743
Table_map_log_event member functions and support functions
4744
**************************************************************************/
4747
@page How replication of field metadata works.
4749
When a table map is created, the master first calls
4750
Table_map_log_event::save_field_metadata() which calculates how many
4751
values will be in the field metadata. Only those fields that require the
4752
extra data are added. The method also loops through all of the fields in
4753
the table calling the method Field::save_field_metadata() which returns the
4754
values for the field that will be saved in the metadata and replicated to
4755
the slave. Once all fields have been processed, the table map is written to
4756
the binlog adding the size of the field metadata and the field metadata to
4757
the end of the body of the table map.
4759
When a table map is read on the slave, the field metadata is read from the
4760
table map and passed to the table_def class constructor which saves the
4761
field metadata from the table map into an array based on the type of the
4762
field. Field metadata values not present (those fields that do not use extra
4763
data) in the table map are initialized as zero (0). The array size is the
4764
same as the columns for the table on the slave.
4766
Additionally, values saved for field metadata on the master are saved as a
4767
string of bytes (unsigned char) in the binlog. A field may require 1 or more bytes
4768
to store the information. In cases where values require multiple bytes
4769
(e.g. values > 255), the endian-safe methods are used to properly encode
4770
the values on the master and decode them on the slave. When the field
4771
metadata values are captured on the slave, they are stored in an array of
4772
type uint16_t. This allows the least number of casts to prevent casting bugs
4773
when the field metadata is used in comparisons of field attributes. When
4774
the field metadata is used for calculating addresses in pointer math, the
4775
type used is uint32_t.
4779
Save the field metadata based on the real_type of the field.
4780
The metadata saved depends on the type of the field. Some fields
4781
store a single byte for pack_length() while others store two bytes
4782
for field_length (max length).
4787
We may want to consider changing the encoding of the information.
4788
Currently, the code attempts to minimize the number of bytes written to
4789
the tablemap. There are at least two other alternatives; 1) using
4790
net_store_length() to store the data allowing it to choose the number of
4791
bytes that are appropriate thereby making the code much easier to
4792
maintain (only 1 place to change the encoding), or 2) use a fixed number
4793
of bytes for each field. The problem with option 1 is that net_store_length()
4794
will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
4795
for fields like CHAR which can be no larger than 255 characters, the method
4796
will use 3 bytes when the value is > 250. Further, every value that is
4797
encoded using 2 parts (e.g., pack_length, field_length) will be numerically
4798
> 250 therefore will use 3 bytes for eah value. The problem with option 2
4799
is less wasteful for space but does waste 1 byte for every field that does
4802
int Table_map_log_event::save_field_metadata()
4805
for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
4806
index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
4811
Constructor used to build an event for writing to the binary log.
4812
Mats says tbl->s lives longer than this event so it's ok to copy pointers
4813
(tbl->s->db etc) and not pointer content.
4815
Table_map_log_event::Table_map_log_event(Session *session, Table *tbl,
4816
ulong tid, bool, uint16_t flags)
4817
: Log_event(session, 0, true),
4819
m_dbnam(tbl->s->db.str),
4820
m_dblen(m_dbnam ? tbl->s->db.length : 0),
4821
m_tblnam(tbl->s->table_name.str),
4822
m_tbllen(tbl->s->table_name.length),
4823
m_colcnt(tbl->s->fields),
4828
m_field_metadata(0),
4829
m_field_metadata_size(0),
4833
assert(m_table_id != UINT32_MAX);
4835
In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
4836
table.cc / alloc_table_share():
4837
Use the fact the key is db/0/table_name/0
4838
As we rely on this let's assert it.
4840
assert((tbl->s->db.str == 0) ||
4841
(tbl->s->db.str[tbl->s->db.length] == 0));
4842
assert(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
4845
m_data_size= TABLE_MAP_HEADER_LEN;
4846
m_data_size+= m_dblen + 2; // Include length and terminating \0
4847
m_data_size+= m_tbllen + 2; // Include length and terminating \0
4848
m_data_size+= 1 + m_colcnt; // COLCNT and column types
4850
/* If malloc fails, caught in is_valid() */
4851
if ((m_memory= (unsigned char*) malloc(m_colcnt)))
4853
m_coltype= reinterpret_cast<unsigned char*>(m_memory);
4854
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
4855
m_coltype[i]= m_table->field[i]->type();
4859
Calculate a bitmap for the results of maybe_null() for all columns.
4860
The bitmap is used to determine when there is a column from the master
4861
that is not on the slave and is null and thus not in the row data during
4864
uint32_t num_null_bytes= (m_table->s->fields + 7) / 8;
4865
m_data_size+= num_null_bytes;
4866
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
4867
&m_null_bits, num_null_bytes,
4868
&m_field_metadata, (m_colcnt * 2),
4871
memset(m_field_metadata, 0, (m_colcnt * 2));
4874
Create an array for the field metadata and store it.
4876
m_field_metadata_size= save_field_metadata();
4877
assert(m_field_metadata_size <= (m_colcnt * 2));
4880
Now set the size of the data to the size of the field metadata array
4881
plus one or two bytes for number of elements in the field metadata array.
4883
if (m_field_metadata_size > 255)
4884
m_data_size+= m_field_metadata_size + 2;
4886
m_data_size+= m_field_metadata_size + 1;
4888
memset(m_null_bits, 0, num_null_bytes);
4889
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
4890
if (m_table->field[i]->maybe_null())
4891
m_null_bits[(i / 8)]+= 1 << (i % 8);
4897
Constructor used by slave to read the event from the binary log.
4899
Table_map_log_event::Table_map_log_event(const char *buf, uint32_t event_len,
4900
const Format_description_log_event
4903
: Log_event(buf, description_event),
4905
m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
4906
m_colcnt(0), m_coltype(0),
4907
m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
4908
m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
4909
m_null_bits(0), m_meta_memory(NULL)
4911
unsigned int bytes_read= 0;
4913
uint8_t common_header_len= description_event->common_header_len;
4914
uint8_t post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
4916
/* Read the post-header */
4917
const char *post_start= buf + common_header_len;
4919
post_start+= TM_MAPID_OFFSET;
4920
if (post_header_len == 6)
4922
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
4923
m_table_id= uint4korr(post_start);
4928
assert(post_header_len == TABLE_MAP_HEADER_LEN);
4929
m_table_id= (ulong) uint6korr(post_start);
4930
post_start+= TM_FLAGS_OFFSET;
4933
assert(m_table_id != UINT32_MAX);
4935
m_flags= uint2korr(post_start);
4937
/* Read the variable part of the event */
4938
const char *const vpart= buf + common_header_len + post_header_len;
4940
/* Extract the length of the various parts from the buffer */
4941
unsigned char const *const ptr_dblen= (unsigned char const*)vpart + 0;
4942
m_dblen= *(unsigned char*) ptr_dblen;
4944
/* Length of database name + counter + terminating null */
4945
unsigned char const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
4946
m_tbllen= *(unsigned char*) ptr_tbllen;
4948
/* Length of table name + counter + terminating null */
4949
unsigned char const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
4950
unsigned char *ptr_after_colcnt= (unsigned char*) ptr_colcnt;
4951
m_colcnt= net_field_length(&ptr_after_colcnt);
4953
/* Allocate mem for all fields in one go. If fails, caught in is_valid() */
4954
m_memory= (unsigned char*) my_multi_malloc(MYF(MY_WME),
4955
&m_dbnam, (uint) m_dblen + 1,
4956
&m_tblnam, (uint) m_tbllen + 1,
4957
&m_coltype, (uint) m_colcnt,
4962
/* Copy the different parts into their memory */
4963
strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
4964
strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
4965
memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
4967
ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
4968
bytes_read= ptr_after_colcnt - (unsigned char *)buf;
4969
if (bytes_read < event_len)
4971
m_field_metadata_size= net_field_length(&ptr_after_colcnt);
4972
assert(m_field_metadata_size <= (m_colcnt * 2));
4973
uint32_t num_null_bytes= (m_colcnt + 7) / 8;
4974
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
4975
&m_null_bits, num_null_bytes,
4976
&m_field_metadata, m_field_metadata_size,
4978
memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
4979
ptr_after_colcnt= (unsigned char*)ptr_after_colcnt + m_field_metadata_size;
4980
memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
4987
Table_map_log_event::~Table_map_log_event()
4989
free(m_meta_memory);
4994
Return value is an error code, one of:
4996
-1 Failure to open table [from open_tables()]
4998
1 No room for more tables [from set_table()]
4999
2 Out of memory [from set_table()]
5000
3 Wrong table definition
5001
4 Daisy-chaining RBR with SBR not possible
5004
int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
5006
RPL_TableList *table_list;
5007
char *db_mem, *tname_mem;
5008
Query_id &query_id= Query_id::get_query_id();
5010
assert(rli->sql_session == session);
5012
/* Step the query id to mark what columns that are actually used. */
5013
session->query_id= query_id.next();
5015
if (!(memory= my_multi_malloc(MYF(MY_WME),
5016
&table_list, (uint) sizeof(RPL_TableList),
5017
&db_mem, (uint) NAME_LEN + 1,
5018
&tname_mem, (uint) NAME_LEN + 1,
5020
return(HA_ERR_OUT_OF_MEM);
5022
memset(table_list, 0, sizeof(*table_list));
5023
table_list->db = db_mem;
5024
table_list->alias= table_list->table_name = tname_mem;
5025
table_list->lock_type= TL_WRITE;
5026
table_list->next_global= table_list->next_local= 0;
5027
table_list->table_id= m_table_id;
5028
table_list->updating= 1;
5029
strcpy(table_list->db, m_dbnam);
5030
strcpy(table_list->table_name, m_tblnam);
5036
open_tables() reads the contents of session->lex, so they must be
5037
initialized, so we should call lex_start(); to be even safer, we
5038
call mysql_init_query() which does a more complete set of inits.
5041
session->reset_for_next_command();
5044
Open the table if it is not already open and add the table to
5045
table map. Note that for any table that should not be
5046
replicated, a filter is needed.
5048
The creation of a new TableList is used to up-cast the
5049
table_list consisting of RPL_TableList items. This will work
5050
since the only case where the argument to open_tables() is
5051
changed, is when session->lex->query_tables == table_list, i.e.,
5052
when the statement requires prelocking. Since this is not
5053
executed when a statement is executed, this case will not occur.
5054
As a precaution, an assertion is added to ensure that the bad
5057
Either way, the memory in the list is *never* released
5058
internally in the open_tables() function, hence we take a copy
5059
of the pointer to make sure that it's not lost.
5062
assert(session->lex->query_tables != table_list);
5063
TableList *tmp_table_list= table_list;
5064
if ((error= open_tables(session, &tmp_table_list, &count, 0)))
5066
if (session->is_slave_error || session->is_fatal_error)
5069
Error reporting borrowed from Query_log_event with many excessive
5070
simplifications (we don't honour --slave-skip-errors)
5072
uint32_t actual_error= session->main_da.sql_errno();
5073
rli->report(ERROR_LEVEL, actual_error,
5074
_("Error '%s' on opening table `%s`.`%s`"),
5076
? session->main_da.message()
5077
: _("unexpected success or fatal error")),
5078
table_list->db, table_list->table_name);
5079
session->is_slave_error= 1;
5084
m_table= table_list->table;
5087
This will fail later otherwise, the 'in_use' field should be
5088
set to the current thread.
5090
assert(m_table->in_use);
5093
Use placement new to construct the table_def instance in the
5094
memory allocated for it inside table_list.
5096
The memory allocated by the table_def structure (i.e., not the
5097
memory allocated *for* the table_def structure) is released
5098
inside Relay_log_info::clear_tables_to_lock() by calling the
5099
table_def destructor explicitly.
5101
new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt,
5102
m_field_metadata, m_field_metadata_size, m_null_bits);
5103
table_list->m_tabledef_valid= true;
5106
We record in the slave's information that the table should be
5107
locked by linking the table into the list of tables to lock.
5109
table_list->next_global= table_list->next_local= rli->tables_to_lock;
5110
const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
5111
const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
5112
/* 'memory' is freed in clear_tables_to_lock */
5122
Log_event::enum_skip_reason
5123
Table_map_log_event::do_shall_skip(Relay_log_info *rli)
5126
If the slave skip counter is 1, then we should not start executing
5129
return continue_group(rli);
5132
int Table_map_log_event::do_update_pos(Relay_log_info *rli)
5134
rli->inc_event_relay_log_pos();
5139
bool Table_map_log_event::write_data_header(IO_CACHE *file)
5141
assert(m_table_id != UINT32_MAX);
5142
unsigned char buf[TABLE_MAP_HEADER_LEN];
5143
int6store(buf + TM_MAPID_OFFSET, (uint64_t)m_table_id);
5144
int2store(buf + TM_FLAGS_OFFSET, m_flags);
5145
return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
5148
bool Table_map_log_event::write_data_body(IO_CACHE *file)
5150
assert(m_dbnam != NULL);
5151
assert(m_tblnam != NULL);
5152
/* We use only one byte per length for storage in event: */
5153
assert(m_dblen < 128);
5154
assert(m_tbllen < 128);
5156
unsigned char const dbuf[]= { (unsigned char) m_dblen };
5157
unsigned char const tbuf[]= { (unsigned char) m_tbllen };
5159
unsigned char cbuf[sizeof(m_colcnt)];
5160
unsigned char *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
5161
assert(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
5164
Store the size of the field metadata.
5166
unsigned char mbuf[sizeof(m_field_metadata_size)];
5167
unsigned char *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
5169
return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
5170
my_b_safe_write(file, (const unsigned char*)m_dbnam, m_dblen+1) ||
5171
my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
5172
my_b_safe_write(file, (const unsigned char*)m_tblnam, m_tbllen+1) ||
5173
my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
5174
my_b_safe_write(file, m_coltype, m_colcnt) ||
5175
my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
5176
my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
5177
my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
5182
Print some useful information for the SHOW BINARY LOG information
5186
void Table_map_log_event::pack_info(Protocol *protocol)
5189
size_t bytes= snprintf(buf, sizeof(buf),
5190
"table_id: %lu (%s.%s)",
5191
m_table_id, m_dbnam, m_tblnam);
5192
protocol->store(buf, bytes, &my_charset_bin);
5196
/**************************************************************************
5197
Write_rows_log_event member functions
5198
**************************************************************************/
5201
Constructor used to build an event for writing to the binary log.
5203
Write_rows_log_event::Write_rows_log_event(Session *session_arg, Table *tbl_arg,
5205
bool is_transactional)
5206
: Rows_log_event(session_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
5211
Constructor used by slave to read the event from the binary log.
5213
Write_rows_log_event::Write_rows_log_event(const char *buf, uint32_t event_len,
5214
const Format_description_log_event
5216
: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
5221
Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
5226
todo: to introduce a property for the event (handler?) which forces
5227
applying the event in the replace (idempotent) fashion.
5229
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5232
We are using REPLACE semantics and not INSERT IGNORE semantics
5233
when writing rows, that is: new rows replace old rows. We need to
5234
inform the storage engine that it should use this behaviour.
5237
/* Tell the storage engine that we are using REPLACE semantics. */
5238
session->lex->duplicates= DUP_REPLACE;
5241
Pretend we're executing a REPLACE command: this is needed for
5242
InnoDB since it is not (properly) checking the
5243
lex->duplicates flag.
5245
session->lex->sql_command= SQLCOM_REPLACE;
5247
Do not raise the error flag in case of hitting to an unique attribute
5249
m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
5252
m_table->file->ha_start_bulk_insert(0);
5254
We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
5255
any TIMESTAMP column with data from the row but instead will use
5256
the event's current time.
5257
As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
5258
columns, we know that all TIMESTAMP columns on slave will receive explicit
5259
data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
5260
When we allow a table without TIMESTAMP to be replicated to a table having
5261
more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
5262
column to be replicated into a BIGINT column and the slave's table has a
5263
TIMESTAMP column, then the slave's TIMESTAMP column will take its value
5264
from set_time() which we called earlier (consistent with SBR). And then in
5265
some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
5266
analyze if explicit data is provided for slave's TIMESTAMP columns).
5268
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
5274
Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
5278
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5280
m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
5281
m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
5283
resetting the extra with
5284
table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
5286
explanation: file->reset() performs this duty
5287
ultimately. Still todo: fix
5290
if ((local_error= m_table->file->ha_end_bulk_insert()))
5292
m_table->file->print_error(local_error, MYF(0));
5294
return error? error : local_error;
5299
Check if there are more UNIQUE keys after the given key.
5302
last_uniq_key(Table *table, uint32_t keyno)
5304
while (++keyno < table->s->keys)
5305
if (table->key_info[keyno].flags & HA_NOSAME)
5311
Check if an error is a duplicate key error.
5313
This function is used to check if an error code is one of the
5314
duplicate key error, i.e., and error code for which it is sensible
5315
to do a <code>get_dup_key()</code> to retrieve the duplicate key.
5317
@param errcode The error code to check.
5319
@return <code>true</code> if the error code is such that
5320
<code>get_dup_key()</code> will return true, <code>false</code>
5324
is_duplicate_key_error(int errcode)
5328
case HA_ERR_FOUND_DUPP_KEY:
5329
case HA_ERR_FOUND_DUPP_UNIQUE:
5336
Write the current row into event's table.
5338
The row is located in the row buffer, pointed by @c m_curr_row member.
5339
Number of columns of the row is stored in @c m_width member (it can be
5340
different from the number of columns in the table to which we insert).
5341
Bitmap @c m_cols indicates which columns are present in the row. It is assumed
5342
that event's table is already open and pointed by @c m_table.
5344
If the same record already exists in the table it can be either overwritten
5345
or an error is reported depending on the value of @c overwrite flag
5346
(error reporting not yet implemented). Note that the matching record can be
5347
different from the row we insert if we use primary keys to identify records in
5350
The row to be inserted can contain values only for selected columns. The
5351
missing columns are filled with default values using @c prepare_record()
5352
function. If a matching record is found in the table and @c overwritte is
5353
true, the missing columns are taken from it.
5355
@param rli Relay log info (needed for row unpacking).
5357
Shall we overwrite if the row already exists or signal
5358
error (currently ignored).
5360
@returns Error code on failure, 0 on success.
5362
This method, if successful, sets @c m_curr_row_end pointer to point at the
5363
next row in the rows buffer. This is done when unpacking the row to be
5366
@note If a matching record is found, it is either updated using
5367
@c ha_update_row() or first deleted and then new record written.
5371
Rows_log_event::write_row(const Relay_log_info *const rli,
5372
const bool overwrite)
5374
assert(m_table != NULL && session != NULL);
5376
Table *table= m_table; // pointer to event's table
5379
basic_string<unsigned char> key;
5381
/* fill table->record[0] with default values */
5384
We only check if the columns have default values for non-NDB
5385
engines, for NDB we ignore the check since updates are sent as
5386
writes, causing errors when trying to prepare the record.
5388
TODO[ndb]: Elimiate this hard-coded dependency on NDB. Ideally,
5389
the engine should be able to set a flag that it want the default
5390
values filled in and one flag to handle the case that the default
5391
values should be checked. Maybe these two flags can be combined.
5393
if ((error= prepare_record(table, &m_cols, m_width, true)))
5396
/* unpack row into table->record[0] */
5397
error= unpack_current_row(rli, &m_cols);
5399
// Temporary fix to find out why it fails [/Matz]
5400
memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
5403
Try to write record. If a corresponding record already exists in the table,
5404
we try to change it using ha_update_row() if possible. Otherwise we delete
5405
it and repeat the whole process again.
5407
TODO: Add safety measures against infinite looping.
5410
while ((error= table->file->ha_write_row(table->record[0])))
5412
if (error == HA_ERR_LOCK_DEADLOCK ||
5413
error == HA_ERR_LOCK_WAIT_TIMEOUT ||
5414
(keynum= table->file->get_dup_key(error)) < 0 ||
5418
Deadlock, waiting for lock or just an error from the handler
5419
such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
5420
Retrieval of the duplicate key number may fail
5421
- either because the error was not "duplicate key" error
5422
- or because the information which key is not available
5424
table->file->print_error(error, MYF(0));
5428
We need to retrieve the old row into record[1] to be able to
5429
either update or delete the offending record. We either:
5431
- use rnd_pos() with a row-id (available as dupp_row) to the
5432
offending row, if that is possible (MyISAM and Blackhole), or else
5434
- use index_read_idx() with the key that is duplicated, to
5435
retrieve the offending row.
5437
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
5439
if (table->file->inited && (error= table->file->ha_index_end()))
5441
if ((error= table->file->ha_rnd_init(false)))
5444
error= table->file->rnd_pos(table->record[1], table->file->dup_ref);
5445
table->file->ha_rnd_end();
5448
table->file->print_error(error, MYF(0));
5454
if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
5459
key.reserve(table->s->max_unique_length);
5461
key_copy(key, table->record[0], table->key_info + keynum, 0);
5462
error= table->file->index_read_idx_map(table->record[1], keynum,
5468
table->file->print_error(error, MYF(0));
5474
Now, record[1] should contain the offending row. That
5475
will enable us to update it or, alternatively, delete it (so
5476
that we can insert the new row afterwards).
5480
If row is incomplete we will use the record found to fill
5483
if (!get_flags(COMPLETE_ROWS_F))
5485
restore_record(table,record[1]);
5486
error= unpack_current_row(rli, &m_cols);
5490
REPLACE is defined as either INSERT or DELETE + INSERT. If
5491
possible, we can replace it with an UPDATE, but that will not
5492
work on InnoDB if FOREIGN KEY checks are necessary.
5494
I (Matz) am not sure of the reason for the last_uniq_key()
5495
check as, but I'm guessing that it's something along the
5498
Suppose that we got the duplicate key to be a key that is not
5499
the last unique key for the table and we perform an update:
5500
then there might be another key for which the unique check will
5501
fail, so we're better off just deleting the row and inserting
5504
if (last_uniq_key(table, keynum) &&
5505
!table->file->referenced_by_foreign_key())
5507
error=table->file->ha_update_row(table->record[1],
5511
case HA_ERR_RECORD_IS_THE_SAME:
5518
table->file->print_error(error, MYF(0));
5525
if ((error= table->file->ha_delete_row(table->record[1])))
5527
table->file->print_error(error, MYF(0));
5530
/* Will retry ha_write_row() with the offending row removed. */
5538
int Rows_log_event::unpack_current_row(const Relay_log_info *const rli,
5539
MY_BITMAP const *cols)
5542
ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT);
5543
int const result= ::unpack_row(rli, m_table, m_width, m_curr_row, cols,
5544
&m_curr_row_end, &m_master_reclength);
5545
if (m_curr_row_end > m_rows_end)
5546
my_error(ER_SLAVE_CORRUPT_EVENT, MYF(0));
5547
ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT);
5553
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
5555
assert(m_table != NULL);
5557
write_row(rli, /* if 1 then overwrite */
5558
bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
5560
if (error && !session->is_error())
5563
my_error(ER_UNKNOWN_ERROR, MYF(0));
5570
/**************************************************************************
5571
Delete_rows_log_event member functions
5572
**************************************************************************/
5575
Compares table->record[0] and table->record[1]
5577
Returns TRUE if different.
5579
static bool record_compare(Table *table)
5582
Need to set the X bit and the filler bits in both records since
5583
there are engines that do not set it correctly.
5585
In addition, since MyISAM checks that one hasn't tampered with the
5586
record, it is necessary to restore the old bytes into the record
5587
after doing the comparison.
5589
TODO[record format ndb]: Remove it once NDB returns correct
5590
records. Check that the other engines also return correct records.
5593
unsigned char saved_x[2], saved_filler[2];
5595
if (table->s->null_bytes > 0)
5597
for (int i = 0 ; i < 2 ; ++i)
5599
saved_x[i]= table->record[i][0];
5600
saved_filler[i]= table->record[i][table->s->null_bytes - 1];
5601
table->record[i][0]|= 1U;
5602
table->record[i][table->s->null_bytes - 1]|=
5603
256U - (1U << table->s->last_null_bit_pos);
5607
if (table->s->blob_fields + table->s->varchar_fields == 0)
5609
result= cmp_record(table,record[1]);
5610
goto record_compare_exit;
5613
/* Compare null bits */
5614
if (memcmp(table->null_flags,
5615
table->null_flags+table->s->rec_buff_length,
5616
table->s->null_bytes))
5618
result= true; // Diff in NULL value
5619
goto record_compare_exit;
5622
/* Compare updated fields */
5623
for (Field **ptr=table->field ; *ptr ; ptr++)
5625
if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
5628
goto record_compare_exit;
5632
record_compare_exit:
5634
Restore the saved bytes.
5636
TODO[record format ndb]: Remove this code once NDB returns the
5637
correct record format.
5639
if (table->s->null_bytes > 0)
5641
for (int i = 0 ; i < 2 ; ++i)
5643
table->record[i][0]= saved_x[i];
5644
table->record[i][table->s->null_bytes - 1]= saved_filler[i];
5652
Locate the current row in event's table.
5654
The current row is pointed by @c m_curr_row. Member @c m_width tells how many
5655
columns are there in the row (this can be differnet from the number of columns
5656
in the table). It is assumed that event's table is already open and pointed
5659
If a corresponding record is found in the table it is stored in
5660
@c m_table->record[0]. Note that when record is located based on a primary
5661
key, it is possible that the record found differs from the row being located.
5663
If no key is specified or table does not have keys, a table scan is used to
5664
find the row. In that case the row should be complete and contain values for
5665
all columns. However, it can still be shorter than the table, i.e. the table
5666
can contain extra columns not present in the row. It is also possible that
5667
the table has fewer columns than the row being located.
5669
@returns Error code on failure, 0 on success.
5671
@post In case of success @c m_table->record[0] contains the record found.
5672
Also, the internal "cursor" of the table is positioned at the record found.
5674
@note If the engine allows random access of the records, a combination of
5675
@c position() and @c rnd_pos() will be used.
5678
int Rows_log_event::find_row(const Relay_log_info *rli)
5680
assert(m_table && m_table->in_use != NULL);
5682
Table *table= m_table;
5685
/* unpack row - missing fields get default values */
5686
prepare_record(table, &m_cols, m_width, false/* don't check errors */);
5687
error= unpack_current_row(rli, &m_cols);
5689
// Temporary fix to find out why it fails [/Matz]
5690
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
5692
if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
5693
table->s->primary_key < MAX_KEY)
5696
Use a more efficient method to fetch the record given by
5697
table->record[0] if the engine allows it. We first compute a
5698
row reference using the position() member function (it will be
5699
stored in table->file->ref) and the use rnd_pos() to position
5700
the "cursor" (i.e., record[0] in this case) at the correct row.
5702
TODO: Add a check that the correct record has been fetched by
5703
comparing with the original record. Take into account that the
5704
record on the master and slave can be of different
5705
length. Something along these lines should work:
5707
ADD>>> store_record(table,record[1]);
5708
int error= table->file->rnd_pos(table->record[0], table->file->ref);
5709
ADD>>> assert(memcmp(table->record[1], table->record[0],
5710
table->s->reclength) == 0);
5713
int error= table->file->rnd_pos_by_record(table->record[0]);
5714
table->file->ha_rnd_end();
5717
table->file->print_error(error, MYF(0));
5722
// We can't use position() - try other methods.
5725
Save copy of the record in table->record[1]. It might be needed
5726
later if linear search is used to find exact match.
5728
store_record(table,record[1]);
5730
if (table->s->keys > 0)
5732
/* We have a key: search the table using the index */
5733
if (!table->file->inited && (error= table->file->ha_index_init(0, false)))
5735
table->file->print_error(error, MYF(0));
5739
/* Fill key data for the row */
5742
key_copy(m_key, table->record[0], table->key_info, 0);
5745
We need to set the null bytes to ensure that the filler bit are
5746
all set when returning. There are storage engines that just set
5747
the necessary bits on the bytes and don't set the filler bits
5750
my_ptrdiff_t const pos=
5751
table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
5752
table->record[0][pos]= 0xFF;
5754
if ((error= table->file->index_read_map(table->record[0], m_key,
5756
HA_READ_KEY_EXACT)))
5758
table->file->print_error(error, MYF(0));
5759
table->file->ha_index_end();
5764
Below is a minor "optimization". If the key (i.e., key number
5765
0) has the HA_NOSAME flag set, we know that we have found the
5766
correct record (since there can be no duplicates); otherwise, we
5767
have to compare the record with the one found to see if it is
5770
CAVEAT! This behaviour is essential for the replication of,
5771
e.g., the mysql.proc table since the correct record *shall* be
5772
found using the primary key *only*. There shall be no
5773
comparison of non-PK columns to decide if the correct record is
5774
found. I can see no scenario where it would be incorrect to
5775
chose the row to change only using a PK or an UNNI.
5777
if (table->key_info->flags & HA_NOSAME)
5779
table->file->ha_index_end();
5784
In case key is not unique, we still have to iterate over records found
5785
and find the one which is identical to the row given. A copy of the
5786
record we are looking for is stored in record[1].
5788
while (record_compare(table))
5791
We need to set the null bytes to ensure that the filler bit
5792
are all set when returning. There are storage engines that
5793
just set the necessary bits on the bytes and don't set the
5794
filler bits correctly.
5796
TODO[record format ndb]: Remove this code once NDB returns the
5797
correct record format.
5799
if (table->s->null_bytes > 0)
5801
table->record[0][table->s->null_bytes - 1]|=
5802
256U - (1U << table->s->last_null_bit_pos);
5805
if ((error= table->file->index_next(table->record[0])))
5807
table->file->print_error(error, MYF(0));
5808
table->file->ha_index_end();
5814
Have to restart the scan to be able to fetch the next row.
5816
table->file->ha_index_end();
5820
int restart_count= 0; // Number of times scanning has restarted from top
5822
/* We don't have a key: search the table using rnd_next() */
5823
if ((error= table->file->ha_rnd_init(1)))
5825
table->file->print_error(error, MYF(0));
5829
/* Continue until we find the right record or have made a full loop */
5832
error= table->file->rnd_next(table->record[0]);
5837
case HA_ERR_RECORD_DELETED:
5840
case HA_ERR_END_OF_FILE:
5841
if (++restart_count < 2)
5842
table->file->ha_rnd_init(1);
5846
table->file->print_error(error, MYF(0));
5847
table->file->ha_rnd_end();
5851
while (restart_count < 2 && record_compare(table));
5854
Note: above record_compare will take into accout all record fields
5855
which might be incorrect in case a partial row was given in the event
5857
table->file->ha_rnd_end();
5859
assert(error == HA_ERR_END_OF_FILE || error == HA_ERR_RECORD_DELETED || error == 0);
5863
table->default_column_bitmaps();
5866
table->default_column_bitmaps();
5872
Constructor used to build an event for writing to the binary log.
5875
Delete_rows_log_event::Delete_rows_log_event(Session *session_arg, Table *tbl_arg,
5877
bool is_transactional)
5878
: Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
5883
Constructor used by slave to read the event from the binary log.
5885
Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint32_t event_len,
5886
const Format_description_log_event
5888
: Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
5894
Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
5896
if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
5897
m_table->s->primary_key < MAX_KEY)
5900
We don't need to allocate any memory for m_key since it is not used.
5905
if (m_table->s->keys > 0)
5907
// Allocate buffer for key searches
5908
m_key= (unsigned char*)malloc(m_table->key_info->key_length);
5910
return HA_ERR_OUT_OF_MEM;
5917
Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
5920
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
5921
m_table->file->ha_index_or_rnd_end();
5928
int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
5931
assert(m_table != NULL);
5933
if (!(error= find_row(rli)))
5936
Delete the record found, located in record[0]
5938
error= m_table->file->ha_delete_row(m_table->record[0]);
5944
/**************************************************************************
5945
Update_rows_log_event member functions
5946
**************************************************************************/
5949
Constructor used to build an event for writing to the binary log.
5951
Update_rows_log_event::Update_rows_log_event(Session *session_arg, Table *tbl_arg,
5953
bool is_transactional)
5954
: Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
5956
init(tbl_arg->write_set);
5959
void Update_rows_log_event::init(MY_BITMAP const *cols)
5961
/* if bitmap_init fails, caught in is_valid() */
5962
if (likely(!bitmap_init(&m_cols_ai,
5963
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
5967
/* Cols can be zero if this is a dummy binrows event */
5968
if (likely(cols != NULL))
5970
memcpy(m_cols_ai.bitmap, cols->bitmap, no_bytes_in_map(cols));
5971
create_last_word_mask(&m_cols_ai);
5977
Update_rows_log_event::~Update_rows_log_event()
5979
if (m_cols_ai.bitmap == m_bitbuf_ai) // no malloc happened
5980
m_cols_ai.bitmap= 0; // so no free in bitmap_free
5981
bitmap_free(&m_cols_ai); // To pair with bitmap_init().
5986
Constructor used by slave to read the event from the binary log.
5988
Update_rows_log_event::Update_rows_log_event(const char *buf, uint32_t event_len,
5990
Format_description_log_event
5992
: Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
5998
Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6000
if (m_table->s->keys > 0)
6002
// Allocate buffer for key searches
6003
m_key= (unsigned char*)malloc(m_table->key_info->key_length);
6005
return HA_ERR_OUT_OF_MEM;
6008
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
6014
Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6017
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6018
m_table->file->ha_index_or_rnd_end();
6019
free(m_key); // Free for multi_malloc
6026
Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6028
assert(m_table != NULL);
6030
int error= find_row(rli);
6034
We need to read the second image in the event of error to be
6035
able to skip to the next pair of updates
6037
m_curr_row= m_curr_row_end;
6038
unpack_current_row(rli, &m_cols_ai);
6043
This is the situation after locating BI:
6045
===|=== before image ====|=== after image ===|===
6047
m_curr_row m_curr_row_end
6049
BI found in the table is stored in record[0]. We copy it to record[1]
6050
and unpack AI to record[0].
6053
store_record(m_table,record[1]);
6055
m_curr_row= m_curr_row_end;
6056
error= unpack_current_row(rli, &m_cols_ai); // this also updates m_curr_row_end
6059
Now we have the right row to update. The old row (the one we're
6060
looking for) is in record[1] and the new row is in record[0].
6063
// Temporary fix to find out why it fails [/Matz]
6064
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
6065
memcpy(m_table->write_set->bitmap, m_cols_ai.bitmap, (m_table->write_set->n_bits + 7) / 8);
6067
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
6068
if (error == HA_ERR_RECORD_IS_THE_SAME)
6075
Incident_log_event::Incident_log_event(const char *buf, uint32_t event_len,
6076
const Format_description_log_event *descr_event)
6077
: Log_event(buf, descr_event)
6079
uint8_t const common_header_len=
6080
descr_event->common_header_len;
6081
uint8_t const post_header_len=
6082
descr_event->post_header_len[INCIDENT_EVENT-1];
6084
m_incident= static_cast<Incident>(uint2korr(buf + common_header_len));
6085
char const *ptr= buf + common_header_len + post_header_len;
6086
char const *const str_end= buf + event_len;
6087
uint8_t len= 0; // Assignment to keep compiler happy
6088
const char *str= NULL; // Assignment to keep compiler happy
6089
read_str(&ptr, str_end, &str, &len);
6090
m_message.str= const_cast<char*>(str);
6091
m_message.length= len;
6096
Incident_log_event::~Incident_log_event()
6102
Incident_log_event::description() const
6104
static const char *const description[]= {
6105
"NOTHING", // Not used
6109
assert(0 <= m_incident);
6110
assert((size_t) m_incident <= sizeof(description)/sizeof(*description));
6112
return description[m_incident];
6116
void Incident_log_event::pack_info(Protocol *protocol)
6120
if (m_message.length > 0)
6121
bytes= snprintf(buf, sizeof(buf), "#%d (%s)",
6122
m_incident, description());
6124
bytes= snprintf(buf, sizeof(buf), "#%d (%s): %s",
6125
m_incident, description(), m_message.str);
6126
protocol->store(buf, bytes, &my_charset_bin);
6131
Incident_log_event::do_apply_event(Relay_log_info const *rli)
6133
rli->report(ERROR_LEVEL, ER_SLAVE_INCIDENT,
6134
ER(ER_SLAVE_INCIDENT),
6136
m_message.length > 0 ? m_message.str : "<none>");
6142
Incident_log_event::write_data_header(IO_CACHE *file)
6144
unsigned char buf[sizeof(int16_t)];
6145
int2store(buf, (int16_t) m_incident);
6146
return(my_b_safe_write(file, buf, sizeof(buf)));
6150
Incident_log_event::write_data_body(IO_CACHE *file)
6152
return(write_str(file, m_message.str, m_message.length));
6155
Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint32_t event_len,
6156
const Format_description_log_event* description_event)
6157
:Log_event(buf, description_event)
6159
uint8_t header_size= description_event->common_header_len;
6160
ident_len = event_len - header_size;
6161
set_if_smaller(ident_len,FN_REFLEN-1);
6162
log_ident= buf + header_size;