1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008 Sun Microsystems
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; version 2 of the License.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20
#include <drizzled/server_includes.h>
21
#include <drizzled/log_event.h>
22
#include <drizzled/replication/rli.h>
23
#include <drizzled/replication/mi.h>
24
#include <libdrizzle/libdrizzle.h>
25
#include <mysys/hash.h>
26
#include <drizzled/replication/utility.h>
27
#include <drizzled/replication/record.h>
28
#include <mysys/my_dir.h>
29
#include <drizzled/error.h>
30
#include <libdrizzle/pack.h>
31
#include <drizzled/sql_parse.h>
32
#include <drizzled/sql_base.h>
33
#include <drizzled/sql_load.h>
38
#include <mysys/base64.h>
39
#include <mysys/my_bitmap.h>
41
#include <drizzled/gettext.h>
42
#include <libdrizzle/libdrizzle.h>
43
#include <drizzled/error.h>
44
#include <drizzled/query_id.h>
45
#include <drizzled/tztime.h>
46
#include <drizzled/slave.h>
50
static const char *HA_ERR(int i)
53
case HA_ERR_KEY_NOT_FOUND: return "HA_ERR_KEY_NOT_FOUND";
54
case HA_ERR_FOUND_DUPP_KEY: return "HA_ERR_FOUND_DUPP_KEY";
55
case HA_ERR_RECORD_CHANGED: return "HA_ERR_RECORD_CHANGED";
56
case HA_ERR_WRONG_INDEX: return "HA_ERR_WRONG_INDEX";
57
case HA_ERR_CRASHED: return "HA_ERR_CRASHED";
58
case HA_ERR_WRONG_IN_RECORD: return "HA_ERR_WRONG_IN_RECORD";
59
case HA_ERR_OUT_OF_MEM: return "HA_ERR_OUT_OF_MEM";
60
case HA_ERR_NOT_A_TABLE: return "HA_ERR_NOT_A_TABLE";
61
case HA_ERR_WRONG_COMMAND: return "HA_ERR_WRONG_COMMAND";
62
case HA_ERR_OLD_FILE: return "HA_ERR_OLD_FILE";
63
case HA_ERR_NO_ACTIVE_RECORD: return "HA_ERR_NO_ACTIVE_RECORD";
64
case HA_ERR_RECORD_DELETED: return "HA_ERR_RECORD_DELETED";
65
case HA_ERR_RECORD_FILE_FULL: return "HA_ERR_RECORD_FILE_FULL";
66
case HA_ERR_INDEX_FILE_FULL: return "HA_ERR_INDEX_FILE_FULL";
67
case HA_ERR_END_OF_FILE: return "HA_ERR_END_OF_FILE";
68
case HA_ERR_UNSUPPORTED: return "HA_ERR_UNSUPPORTED";
69
case HA_ERR_TO_BIG_ROW: return "HA_ERR_TO_BIG_ROW";
70
case HA_WRONG_CREATE_OPTION: return "HA_WRONG_CREATE_OPTION";
71
case HA_ERR_FOUND_DUPP_UNIQUE: return "HA_ERR_FOUND_DUPP_UNIQUE";
72
case HA_ERR_UNKNOWN_CHARSET: return "HA_ERR_UNKNOWN_CHARSET";
73
case HA_ERR_WRONG_MRG_TABLE_DEF: return "HA_ERR_WRONG_MRG_TABLE_DEF";
74
case HA_ERR_CRASHED_ON_REPAIR: return "HA_ERR_CRASHED_ON_REPAIR";
75
case HA_ERR_CRASHED_ON_USAGE: return "HA_ERR_CRASHED_ON_USAGE";
76
case HA_ERR_LOCK_WAIT_TIMEOUT: return "HA_ERR_LOCK_WAIT_TIMEOUT";
77
case HA_ERR_LOCK_TABLE_FULL: return "HA_ERR_LOCK_TABLE_FULL";
78
case HA_ERR_READ_ONLY_TRANSACTION: return "HA_ERR_READ_ONLY_TRANSACTION";
79
case HA_ERR_LOCK_DEADLOCK: return "HA_ERR_LOCK_DEADLOCK";
80
case HA_ERR_CANNOT_ADD_FOREIGN: return "HA_ERR_CANNOT_ADD_FOREIGN";
81
case HA_ERR_NO_REFERENCED_ROW: return "HA_ERR_NO_REFERENCED_ROW";
82
case HA_ERR_ROW_IS_REFERENCED: return "HA_ERR_ROW_IS_REFERENCED";
83
case HA_ERR_NO_SAVEPOINT: return "HA_ERR_NO_SAVEPOINT";
84
case HA_ERR_NON_UNIQUE_BLOCK_SIZE: return "HA_ERR_NON_UNIQUE_BLOCK_SIZE";
85
case HA_ERR_NO_SUCH_TABLE: return "HA_ERR_NO_SUCH_TABLE";
86
case HA_ERR_TABLE_EXIST: return "HA_ERR_TABLE_EXIST";
87
case HA_ERR_NO_CONNECTION: return "HA_ERR_NO_CONNECTION";
88
case HA_ERR_NULL_IN_SPATIAL: return "HA_ERR_NULL_IN_SPATIAL";
89
case HA_ERR_TABLE_DEF_CHANGED: return "HA_ERR_TABLE_DEF_CHANGED";
90
case HA_ERR_NO_PARTITION_FOUND: return "HA_ERR_NO_PARTITION_FOUND";
91
case HA_ERR_RBR_LOGGING_FAILED: return "HA_ERR_RBR_LOGGING_FAILED";
92
case HA_ERR_DROP_INDEX_FK: return "HA_ERR_DROP_INDEX_FK";
93
case HA_ERR_FOREIGN_DUPLICATE_KEY: return "HA_ERR_FOREIGN_DUPLICATE_KEY";
94
case HA_ERR_TABLE_NEEDS_UPGRADE: return "HA_ERR_TABLE_NEEDS_UPGRADE";
95
case HA_ERR_TABLE_READONLY: return "HA_ERR_TABLE_READONLY";
96
case HA_ERR_AUTOINC_READ_FAILED: return "HA_ERR_AUTOINC_READ_FAILED";
97
case HA_ERR_AUTOINC_ERANGE: return "HA_ERR_AUTOINC_ERANGE";
98
case HA_ERR_GENERIC: return "HA_ERR_GENERIC";
99
case HA_ERR_RECORD_IS_THE_SAME: return "HA_ERR_RECORD_IS_THE_SAME";
100
case HA_ERR_LOGGING_IMPOSSIBLE: return "HA_ERR_LOGGING_IMPOSSIBLE";
101
case HA_ERR_CORRUPT_EVENT: return "HA_ERR_CORRUPT_EVENT";
102
case HA_ERR_ROWS_EVENT_APPLY : return "HA_ERR_ROWS_EVENT_APPLY";
108
Error reporting facility for Rows_log_event::do_apply_event
110
@param level error, warning or info
111
@param ha_error HA_ERR_ code
112
@param rli pointer to the active Relay_log_info instance
113
@param session pointer to the slave thread's session
114
@param table pointer to the event's table object
115
@param type the type of the event
116
@param log_name the master binlog file name
117
@param pos the master binlog file pos (the next after the event)
120
static void inline slave_rows_error_report(enum loglevel level, int ha_error,
121
Relay_log_info const *rli, Session *session,
122
Table *table, const char * type,
123
const char *log_name, ulong pos)
125
const char *handler_error= HA_ERR(ha_error);
126
char buff[MAX_SLAVE_ERRMSG], *slider;
127
const char *buff_end= buff + sizeof(buff);
129
List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
133
for (err= it++, slider= buff; err && slider < buff_end - 1;
134
slider += len, err= it++)
136
len= snprintf(slider, buff_end - slider,
137
_(" %s, Error_code: %d;"), err->msg, err->code);
140
rli->report(level, session->is_error()? session->main_da.sql_errno() : 0,
141
_("Could not execute %s event on table %s.%s;"
142
"%s handler error %s; "
143
"the event's master log %s, end_log_pos %lu"),
144
type, table->s->db.str,
145
table->s->table_name.str,
147
handler_error == NULL? _("<unknown>") : handler_error,
153
Cache that will automatically be written to a dedicated file on
159
class Write_on_release_cache
167
typedef unsigned short flag_set;
173
Write_on_release_cache
174
cache Pointer to cache to use
175
file File to write cache to upon destruction
176
flags Flags for the cache
180
Class used to guarantee copy of cache to file before exiting the
181
current block. On successful copy of the cache, the cache will
182
be reinited as a WRITE_CACHE.
184
Currently, a pointer to the cache is provided in the
185
constructor, but it would be possible to create a subclass
186
holding the IO_CACHE itself.
188
Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0)
189
: m_cache(cache), m_file(file), m_flags(flags)
191
reinit_io_cache(m_cache, WRITE_CACHE, 0L, false, true);
194
~Write_on_release_cache()
196
copy_event_cache_to_file_and_reinit(m_cache, m_file);
197
if (m_flags | FLUSH_F)
202
Return a pointer to the internal IO_CACHE.
209
Function to return a pointer to the internal cache, so that the
210
object can be treated as a IO_CACHE and used with the my_b_*
214
A pointer to the internal IO_CACHE.
216
IO_CACHE *operator&()
222
// Hidden, to prevent usage.
223
Write_on_release_cache(Write_on_release_cache const&);
230
uint32_t debug_not_change_ts_if_art_event= 1; // bug#29309 simulation
236
static void clear_all_errors(Session *session, Relay_log_info *rli)
238
session->is_slave_error = 0;
239
session->clear_error();
245
Ignore error code specified on command line.
248
inline int ignored_error_code(int err_code)
250
return ((err_code == ER_SLAVE_IGNORED_TABLE) ||
251
(use_slave_mask && bitmap_is_set(&slave_error_mask, err_code)));
259
static char *pretty_print_str(char *packet, const char *str, int len)
261
const char *end= str + len;
267
switch ((c=*str++)) {
268
case '\n': *pos++= '\\'; *pos++= 'n'; break;
269
case '\r': *pos++= '\\'; *pos++= 'r'; break;
270
case '\\': *pos++= '\\'; *pos++= '\\'; break;
271
case '\b': *pos++= '\\'; *pos++= 'b'; break;
272
case '\t': *pos++= '\\'; *pos++= 't'; break;
273
case '\'': *pos++= '\\'; *pos++= '\''; break;
274
case 0 : *pos++= '\\'; *pos++= '0'; break;
286
Creates a temporary name for load data infile:.
288
@param buf Store new filename here
289
@param file_id File_id (part of file name)
290
@param event_server_id Event_id (part of file name)
291
@param ext Extension for file name
294
Pointer to start of extension
297
static char *slave_load_file_stem(char *buf, uint32_t file_id,
298
int event_server_id, const char *ext)
301
fn_format(buf,"SQL_LOAD-",slave_load_tmpdir, "", MY_UNPACK_FILENAME);
304
buf= strchr(buf, '\0');
305
buf= int10_to_str(::server_id, buf, 10);
307
buf= int10_to_str(event_server_id, buf, 10);
309
res= int10_to_str(file_id, buf, 10);
310
strcpy(res, ext); // Add extension last
311
return res; // Pointer to extension
316
Delete all temporary files used for SQL_LOAD.
319
static void cleanup_load_tmpdir()
324
char fname[FN_REFLEN], prefbuf[31], *p;
326
if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
330
When we are deleting temporary files, we should only remove
331
the files associated with the server id of our server.
332
We don't use event_server_id here because since we've disabled
333
direct binlogging of Create_file/Append_file/Exec_load events
334
we cannot meet Start_log event in the middle of events from one
337
p= strncpy(prefbuf, STRING_WITH_LEN("SQL_LOAD-")) + 9;
338
p= int10_to_str(::server_id, p, 10);
342
for (i=0 ; i < (uint)dirp->number_off_files; i++)
344
file=dirp->dir_entry+i;
345
if (is_prefix(file->name, prefbuf))
347
fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME);
348
my_delete(fname, MYF(0));
360
static bool write_str(IO_CACHE *file, const char *str, uint32_t length)
362
unsigned char tmp[1];
363
tmp[0]= (unsigned char) length;
364
return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
365
my_b_safe_write(file, (unsigned char*) str, length));
373
static inline int read_str(const char **buf, const char *buf_end,
374
const char **str, uint8_t *len)
376
if (*buf + ((uint) (unsigned char) **buf) >= buf_end)
378
*len= (uint8_t) **buf;
380
(*buf)+= (uint) *len+1;
386
Transforms a string into "" or its expression in 0x... form.
389
char *str_to_hex(char *to, const char *from, uint32_t len)
395
to= octet2hex(to, from, len);
398
to= strcpy(to, "\"\"")+2;
399
return to; // pointer to end 0 of 'to'
404
Append a version of the 'from' string suitable for use in a query to
405
the 'to' string. To generate a correct escaping, the character set
406
information in 'csinfo' is used.
410
append_query_string(const CHARSET_INFO * const csinfo,
411
String const *from, String *to)
414
uint32_t const orig_len= to->length();
415
if (to->reserve(orig_len + from->length()*2+3))
418
beg= to->c_ptr_quick() + to->length();
420
if (csinfo->escape_with_backslash_is_dangerous)
421
ptr= str_to_hex(ptr, from->ptr(), from->length());
425
ptr+= drizzle_escape_string(ptr, from->ptr(), from->length());
428
to->length(orig_len + ptr - beg);
433
/**************************************************************************
434
Log_event methods (= the parent class of all events)
435
**************************************************************************/
439
returns the human readable name of the event's type
442
const char* Log_event::get_type_str(Log_event_type type)
445
case START_EVENT_V3: return "Start_v3";
446
case STOP_EVENT: return "Stop";
447
case QUERY_EVENT: return "Query";
448
case ROTATE_EVENT: return "Rotate";
449
case LOAD_EVENT: return "Load";
450
case NEW_LOAD_EVENT: return "New_load";
451
case SLAVE_EVENT: return "Slave";
452
case CREATE_FILE_EVENT: return "Create_file";
453
case APPEND_BLOCK_EVENT: return "Append_block";
454
case DELETE_FILE_EVENT: return "Delete_file";
455
case EXEC_LOAD_EVENT: return "Exec_load";
456
case XID_EVENT: return "Xid";
457
case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
458
case TABLE_MAP_EVENT: return "Table_map";
459
case WRITE_ROWS_EVENT: return "Write_rows";
460
case UPDATE_ROWS_EVENT: return "Update_rows";
461
case DELETE_ROWS_EVENT: return "Delete_rows";
462
case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
463
case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
464
case INCIDENT_EVENT: return "Incident";
465
default: return "Unknown"; /* impossible */
469
const char* Log_event::get_type_str()
471
return get_type_str(get_type_code());
476
Log_event::Log_event()
479
Log_event::Log_event(Session* session_arg, uint16_t flags_arg, bool using_trans)
480
:log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), session(session_arg)
482
server_id= session->server_id;
483
when= session->start_time;
484
cache_stmt= using_trans;
489
This minimal constructor is for when you are not even sure that there
490
is a valid Session. For example in the server when we are shutting down or
491
flushing logs after receiving a SIGHUP (then we must write a Rotate to
492
the binlog but we have no Session, so we need this minimal constructor).
495
Log_event::Log_event()
496
:temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
499
server_id= ::server_id;
501
We can't call my_time() here as this would cause a call before
510
Log_event::Log_event()
513
Log_event::Log_event(const char* buf,
514
const Format_description_log_event* description_event)
515
:temp_buf(0), cache_stmt(0)
518
when= uint4korr(buf);
519
server_id= uint4korr(buf + SERVER_ID_OFFSET);
520
data_written= uint4korr(buf + EVENT_LEN_OFFSET);
521
if (description_event->binlog_version==1)
528
log_pos= uint4korr(buf + LOG_POS_OFFSET);
530
If the log is 4.0 (so here it can only be a 4.0 relay log read by
531
the SQL thread or a 4.0 master binlog read by the I/O thread),
532
log_pos is the beginning of the event: we transform it into the end
533
of the event, which is more useful.
534
But how do you know that the log is 4.0: you know it if
535
description_event is version 3 *and* you are not reading a
536
Format_desc (remember that mysqlbinlog starts by assuming that 5.0
537
logs are in 4.0 format, until it finds a Format_desc).
539
if (description_event->binlog_version==3 &&
540
buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
543
If log_pos=0, don't change it. log_pos==0 is a marker to mean
544
"don't change rli->group_master_log_pos" (see
545
inc_group_relay_log_pos()). As it is unreal log_pos, adding the
546
event len's is nonsense. For example, a fake Rotate event should
547
not have its log_pos (which is 0) changed or it will modify
548
Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense
549
value of (a non-zero offset which does not exist in the master's
550
binlog, so which will cause problems if the user uses this value
553
log_pos+= data_written; /* purecov: inspected */
556
flags= uint2korr(buf + FLAGS_OFFSET);
557
if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
558
(buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
561
These events always have a header which stops here (i.e. their
565
Initialization to zero of all other Log_event members as they're
566
not specified. Currently there are no such members; in the future
567
there will be an event UID (but Format_description and Rotate
568
don't need this UID, as they are not propagated through
569
--log-slave-updates (remember the UID is used to not play a query
570
twice when you have two masters which are slaves of a 3rd master).
575
/* otherwise, go on with reading the header from buf (nothing now) */
579
int Log_event::do_update_pos(Relay_log_info *rli)
582
rli is null when (as far as I (Guilhem) know) the caller is
583
Load_log_event::do_apply_event *and* that one is called from
584
Execute_load_log_event::do_apply_event. In this case, we don't
585
do anything here ; Execute_load_log_event::do_apply_event will
586
call Log_event::do_apply_event again later with the proper rli.
587
Strictly speaking, if we were sure that rli is null only in the
588
case discussed above, 'if (rli)' is useless here. But as we are
589
not 100% sure, keep it for now.
591
Matz: I don't think we will need this check with this refactoring.
596
bug#29309 simulation: resetting the flag to force
597
wrong behaviour of artificial event to update
598
rli->last_master_timestamp for only one time -
599
the first FLUSH LOGS in the test.
601
if (debug_not_change_ts_if_art_event == 1
602
&& is_artificial_event())
603
debug_not_change_ts_if_art_event= 0;
604
rli->stmt_done(log_pos,
605
is_artificial_event() &&
606
debug_not_change_ts_if_art_event > 0 ? 0 : when);
607
if (debug_not_change_ts_if_art_event == 0)
608
debug_not_change_ts_if_art_event= 2;
610
return 0; // Cannot fail currently
614
Log_event::enum_skip_reason
615
Log_event::do_shall_skip(Relay_log_info *rli)
617
if ((server_id == ::server_id && !rli->replicate_same_server_id) || (rli->slave_skip_counter == 1 && rli->is_in_group()))
618
return EVENT_SKIP_IGNORE;
619
else if (rli->slave_skip_counter > 0)
620
return EVENT_SKIP_COUNT;
622
return EVENT_SKIP_NOT;
627
Log_event::pack_info()
630
void Log_event::pack_info(Protocol *protocol)
632
protocol->store("", &my_charset_bin);
636
const char* Log_event::get_db()
638
return session ? session->db : 0;
643
init_show_field_list() prepares the column names and types for the
644
output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
648
void Log_event::init_show_field_list(List<Item>* field_list)
650
field_list->push_back(new Item_empty_string("Log_name", 20));
651
field_list->push_back(new Item_return_int("Pos", MY_INT32_NUM_DECIMAL_DIGITS,
652
DRIZZLE_TYPE_LONGLONG));
653
field_list->push_back(new Item_empty_string("Event_type", 20));
654
field_list->push_back(new Item_return_int("Server_id", 10,
656
field_list->push_back(new Item_return_int("End_log_pos",
657
MY_INT32_NUM_DECIMAL_DIGITS,
658
DRIZZLE_TYPE_LONGLONG));
659
field_list->push_back(new Item_empty_string("Info", 20));
666
bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
668
unsigned char header[LOG_EVENT_HEADER_LEN];
671
/* Store number of bytes that will be written by this event */
672
data_written= event_data_length + sizeof(header);
675
log_pos != 0 if this is relay-log event. In this case we should not
679
if (is_artificial_event())
682
We should not do any cleanup on slave when reading this. We
683
mark this by setting log_pos to 0. Start_log_event_v3() will
684
detect this on reading and set artificial_event=1 for the event.
691
Calculate position of end of event
693
Note that with a SEQ_READ_APPEND cache, my_b_tell() does not
694
work well. So this will give slightly wrong positions for the
695
Format_desc/Rotate/Stop events which the slave writes to its
696
relay log. For example, the initial Format_desc will have
697
end_log_pos=91 instead of 95. Because after writing the first 4
698
bytes of the relay log, my_b_tell() still reports 0. Because
699
my_b_append() does not update the counter which my_b_tell()
700
later uses (one should probably use my_b_append_tell() to work
701
around this). To get right positions even when writing to the
702
relay log, we use the (new) my_b_safe_tell().
704
Note that this raises a question on the correctness of all these
705
assert(my_b_tell()=rli->event_relay_log_pos).
707
If in a transaction, the log_pos which we calculate below is not
708
very good (because then my_b_safe_tell() returns start position
709
of the BEGIN, so it's like the statement was at the BEGIN's
710
place), but it's not a very serious problem (as the slave, when
711
it is in a transaction, does not take those end_log_pos into
712
account (as it calls inc_event_relay_log_pos()). To be fixed
713
later, so that it looks less strange. But not bug.
716
log_pos= my_b_safe_tell(file)+data_written;
719
now= (ulong) get_time(); // Query start time
722
Header will be of size LOG_EVENT_HEADER_LEN for all events, except for
723
FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT, where it will be
724
LOG_EVENT_MINIMAL_HEADER_LEN (remember these 2 have a frozen header,
725
because we read them before knowing the format).
728
int4store(header, now); // timestamp
729
header[EVENT_TYPE_OFFSET]= get_type_code();
730
int4store(header+ SERVER_ID_OFFSET, server_id);
731
int4store(header+ EVENT_LEN_OFFSET, data_written);
732
int4store(header+ LOG_POS_OFFSET, log_pos);
733
int2store(header+ FLAGS_OFFSET, flags);
735
return(my_b_safe_write(file, header, sizeof(header)) != 0);
739
time_t Log_event::get_time()
741
Session *tmp_session;
745
return session->start_time;
746
if ((tmp_session= current_session))
747
return tmp_session->start_time;
753
This needn't be format-tolerant, because we only read
754
LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length).
757
int Log_event::read_log_event(IO_CACHE* file, String* packet,
758
pthread_mutex_t* log_lock)
762
char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
765
pthread_mutex_lock(log_lock);
766
if (my_b_read(file, (unsigned char*) buf, sizeof(buf)))
769
If the read hits eof, we must report it as eof so the caller
770
will know it can go into cond_wait to be woken up on the next
774
result= LOG_READ_EOF;
776
result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
779
data_len= uint4korr(buf + EVENT_LEN_OFFSET);
780
if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
781
data_len > current_session->variables.max_allowed_packet)
783
result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
788
/* Append the log event header to packet */
789
if (packet->append(buf, sizeof(buf)))
791
/* Failed to allocate packet */
792
result= LOG_READ_MEM;
795
data_len-= LOG_EVENT_MINIMAL_HEADER_LEN;
798
/* Append rest of event, read directly from file into packet */
799
if (packet->append(file, data_len))
802
Fatal error occured when appending rest of the event
803
to packet, possible failures:
804
1. EOF occured when reading from file, it's really an error
805
as data_len is >=0 there's supposed to be more bytes available.
806
file->error will have been set to number of bytes left to read
807
2. Read was interrupted, file->error would normally be set to -1
808
3. Failed to allocate memory for packet, my_errno
809
will be ENOMEM(file->error shuold be 0, but since the
810
memory allocation occurs before the call to read it might
813
result= (my_errno == ENOMEM ? LOG_READ_MEM :
814
(file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
815
/* Implicit goto end; */
821
pthread_mutex_unlock(log_lock);
825
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
826
#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
830
Allocates memory; The caller is responsible for clean-up.
832
Log_event* Log_event::read_log_event(IO_CACHE* file,
833
pthread_mutex_t* log_lock,
834
const Format_description_log_event
837
assert(description_event != 0);
838
char head[LOG_EVENT_MINIMAL_HEADER_LEN];
840
First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
841
check the event for sanity and to know its length; no need to really parse
842
it. We say "at most" because this could be a 3.23 master, which has header
843
of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's
844
"minimal" over the set {MySQL >=4.0}).
846
uint32_t header_size= cmin(description_event->common_header_len,
847
LOG_EVENT_MINIMAL_HEADER_LEN);
850
if (my_b_read(file, (unsigned char *) head, header_size))
854
No error here; it could be that we are at the file's end. However
855
if the next my_b_read() fails (below), it will be an error as we
856
were able to read the first bytes.
860
uint32_t data_len = uint4korr(head + EVENT_LEN_OFFSET);
862
const char *error= 0;
864
#ifndef max_allowed_packet
865
Session *session=current_session;
866
uint32_t max_allowed_packet= session ? session->variables.max_allowed_packet : ~(ulong)0;
869
if (data_len > max_allowed_packet)
871
error = "Event too big";
875
if (data_len < header_size)
877
error = "Event too small";
881
// some events use the extra byte to null-terminate strings
882
if (!(buf = (char*) malloc(data_len+1)))
884
error = "Out of memory";
888
memcpy(buf, head, header_size);
889
if (my_b_read(file, (unsigned char*) buf + header_size, data_len - header_size))
891
error = "read error";
894
if ((res= read_log_event(buf, data_len, &error, description_event)))
895
res->register_temp_buf(buf);
902
sql_print_error(_("Error in Log_event::read_log_event(): "
903
"'%s', data_len: %d, event_type: %d"),
904
error,data_len,head[EVENT_TYPE_OFFSET]);
907
The SQL slave thread will check if file->error<0 to know
908
if there was an I/O error. Even if there is no "low-level" I/O errors
909
with 'file', any of the high-level above errors is worrying
910
enough to stop the SQL thread now ; as we are skipping the current event,
911
going on with reading and successfully executing other events can
912
only corrupt the slave's databases. So stop.
921
Binlog format tolerance is in (buf, event_len, description_event)
925
Log_event* Log_event::read_log_event(const char* buf, uint32_t event_len,
927
const Format_description_log_event *description_event)
930
assert(description_event != 0);
932
/* Check the integrity */
933
if (event_len < EVENT_LEN_OFFSET ||
934
buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
935
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
937
*error="Sanity check failed"; // Needed to free buffer
938
return(NULL); // general sanity check - will fail on a partial read
941
uint32_t event_type= buf[EVENT_TYPE_OFFSET];
942
if (event_type > description_event->number_of_event_types &&
943
event_type != FORMAT_DESCRIPTION_EVENT)
946
It is unsafe to use the description_event if its post_header_len
947
array does not include the event type.
954
In some previuos versions (see comment in
955
Format_description_log_event::Format_description_log_event(char*,...)),
956
event types were assigned different id numbers than in the
957
present version. In order to replicate from such versions to the
958
present version, we must map those event type id's to our event
959
type id's. The mapping is done with the event_type_permutation
960
array, which was set up when the Format_description_log_event
963
if (description_event->event_type_permutation)
964
event_type= description_event->event_type_permutation[event_type];
968
ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
971
ev = new Load_log_event(buf, event_len, description_event);
974
ev = new Load_log_event(buf, event_len, description_event);
977
ev = new Rotate_log_event(buf, event_len, description_event);
979
case CREATE_FILE_EVENT:
980
ev = new Create_file_log_event(buf, event_len, description_event);
982
case APPEND_BLOCK_EVENT:
983
ev = new Append_block_log_event(buf, event_len, description_event);
985
case DELETE_FILE_EVENT:
986
ev = new Delete_file_log_event(buf, event_len, description_event);
988
case EXEC_LOAD_EVENT:
989
ev = new Execute_load_log_event(buf, event_len, description_event);
991
case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
992
ev = new Start_log_event_v3(buf, description_event);
995
ev = new Stop_log_event(buf, description_event);
998
ev = new Xid_log_event(buf, description_event);
1000
case FORMAT_DESCRIPTION_EVENT:
1001
ev = new Format_description_log_event(buf, event_len, description_event);
1003
case WRITE_ROWS_EVENT:
1004
ev = new Write_rows_log_event(buf, event_len, description_event);
1006
case UPDATE_ROWS_EVENT:
1007
ev = new Update_rows_log_event(buf, event_len, description_event);
1009
case DELETE_ROWS_EVENT:
1010
ev = new Delete_rows_log_event(buf, event_len, description_event);
1012
case TABLE_MAP_EVENT:
1013
ev = new Table_map_log_event(buf, event_len, description_event);
1015
case BEGIN_LOAD_QUERY_EVENT:
1016
ev = new Begin_load_query_log_event(buf, event_len, description_event);
1018
case EXECUTE_LOAD_QUERY_EVENT:
1019
ev= new Execute_load_query_log_event(buf, event_len, description_event);
1021
case INCIDENT_EVENT:
1022
ev = new Incident_log_event(buf, event_len, description_event);
1031
is_valid() are small event-specific sanity tests which are
1032
important; for example there are some malloc() in constructors
1033
(e.g. Query_log_event::Query_log_event(char*...)); when these
1034
malloc() fail we can't return an error out of the constructor
1035
(because constructor is "void") ; so instead we leave the pointer we
1036
wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
1037
Same for Format_description_log_event, member 'post_header_len'.
1039
if (!ev || !ev->is_valid())
1042
*error= "Found invalid event in binary log";
1048
inline Log_event::enum_skip_reason
1049
Log_event::continue_group(Relay_log_info *rli)
1051
if (rli->slave_skip_counter == 1)
1052
return Log_event::EVENT_SKIP_IGNORE;
1053
return Log_event::do_shall_skip(rli);
1056
/**************************************************************************
1057
Query_log_event methods
1058
**************************************************************************/
1061
This (which is used only for SHOW BINLOG EVENTS) could be updated to
1062
print SET @@session_var=. But this is not urgent, as SHOW BINLOG EVENTS is
1063
only an information, it does not produce suitable queries to replay (for
1064
example it does not print LOAD DATA INFILE).
1069
void Query_log_event::pack_info(Protocol *protocol)
1071
// TODO: show the catalog ??
1073
if (!(buf= (char*) malloc(9 + db_len + q_len)))
1076
if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
1079
pos= strcpy(buf, "use `")+5;
1080
memcpy(pos, db, db_len);
1081
pos= strcpy(pos+db_len, "`; ")+3;
1085
memcpy(pos, query, q_len);
1088
protocol->store(buf, pos-buf, &my_charset_bin);
1094
Query_log_event::write().
1097
In this event we have to modify the header to have the correct
1098
EVENT_LEN_OFFSET as we don't yet know how many status variables we
1102
bool Query_log_event::write(IO_CACHE* file)
1105
@todo if catalog can be of length FN_REFLEN==512, then we are not
1106
replicating it correctly, since the length is stored in a byte
1109
unsigned char buf[QUERY_HEADER_LEN+
1110
1+4+ // code of flags2 and flags2
1111
1+8+ // code of sql_mode and sql_mode
1112
1+1+FN_REFLEN+ // code of catalog and catalog length and catalog
1113
1+4+ // code of autoinc and the 2 autoinc variables
1114
1+6+ // code of charset and charset
1115
1+1+MAX_TIME_ZONE_NAME_LENGTH+ // code of tz and tz length and tz name
1116
1+2+ // code of lc_time_names and lc_time_names_number
1117
1+2 // code of charset_database and charset_database_number
1118
], *start, *start_of_status;
1122
return 1; // Something wrong with event
1125
We want to store the thread id:
1126
(- as an information for the user when he reads the binlog)
1127
- if the query uses temporary table: for the slave SQL thread to know to
1128
which master connection the temp table belongs.
1129
Now imagine we (write()) are called by the slave SQL thread (we are
1130
logging a query executed by this thread; the slave runs with
1131
--log-slave-updates). Then this query will be logged with
1132
thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of
1133
the same name were created simultaneously on the master (in the master
1135
CREATE TEMPORARY TABLE t; (thread 1)
1136
CREATE TEMPORARY TABLE t; (thread 2)
1138
then in the slave's binlog there will be
1139
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1140
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1141
which is bad (same thread id!).
1143
To avoid this, we log the thread's thread id EXCEPT for the SQL
1144
slave thread for which we log the original (master's) thread id.
1145
Now this moves the bug: what happens if the thread id on the
1146
master was 10 and when the slave replicates the query, a
1147
connection number 10 is opened by a normal client on the slave,
1148
and updates a temp table of the same name? We get a problem
1149
again. To avoid this, in the handling of temp tables (sql_base.cc)
1150
we use thread_id AND server_id. TODO when this is merged into
1151
4.1: in 4.1, slave_proxy_id has been renamed to pseudo_thread_id
1152
and is a session variable: that's to make mysqlbinlog work with
1153
temp tables. We probably need to introduce
1155
SET PSEUDO_SERVER_ID
1156
for mysqlbinlog in 4.1. mysqlbinlog would print:
1157
SET PSEUDO_SERVER_ID=
1158
SET PSEUDO_THREAD_ID=
1159
for each query using temp tables.
1161
int4store(buf + Q_THREAD_ID_OFFSET, slave_proxy_id);
1162
int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
1163
buf[Q_DB_LEN_OFFSET] = (char) db_len;
1164
int2store(buf + Q_ERR_CODE_OFFSET, error_code);
1167
You MUST always write status vars in increasing order of code. This
1168
guarantees that a slightly older slave will be able to parse those he
1171
start_of_status= start= buf+QUERY_HEADER_LEN;
1174
*start++= Q_FLAGS2_CODE;
1175
int4store(start, flags2);
1178
if (lc_time_names_number)
1180
assert(lc_time_names_number <= 0xFFFF);
1181
*start++= Q_LC_TIME_NAMES_CODE;
1182
int2store(start, lc_time_names_number);
1185
if (charset_database_number)
1187
assert(charset_database_number <= 0xFFFF);
1188
*start++= Q_CHARSET_DATABASE_CODE;
1189
int2store(start, charset_database_number);
1193
Here there could be code like
1194
if (command-line-option-which-says-"log_this_variable" && inited)
1196
*start++= Q_THIS_VARIABLE_CODE;
1197
int4store(start, this_variable);
1202
/* Store length of status variables */
1203
status_vars_len= (uint) (start-start_of_status);
1204
assert(status_vars_len <= MAX_SIZE_LOG_EVENT_STATUS);
1205
int2store(buf + Q_STATUS_VARS_LEN_OFFSET, status_vars_len);
1208
Calculate length of whole event
1209
The "1" below is the \0 in the db's length
1211
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
1213
return (write_header(file, event_length) ||
1214
my_b_safe_write(file, (unsigned char*) buf, QUERY_HEADER_LEN) ||
1215
write_post_header_for_derived(file) ||
1216
my_b_safe_write(file, (unsigned char*) start_of_status,
1217
(uint) (start-start_of_status)) ||
1218
my_b_safe_write(file, (db) ? (unsigned char*) db : (unsigned char*)"", db_len + 1) ||
1219
my_b_safe_write(file, (unsigned char*) query, q_len)) ? 1 : 0;
1223
The simplest constructor that could possibly work. This is used for
1224
creating static objects that have a special meaning and are invisible
1227
Query_log_event::Query_log_event()
1228
:Log_event(), data_buf(0)
1235
Query_log_event::Query_log_event()
1236
session_arg - thread handle
1237
query_arg - array of char representing the query
1238
query_length - size of the `query_arg' array
1239
using_trans - there is a modified transactional table
1240
suppress_use - suppress the generation of 'USE' statements
1241
killed_status_arg - an optional with default to Session::KILLED_NO_VALUE
1242
if the value is different from the default, the arg
1243
is set to the current session->killed value.
1244
A caller might need to masquerade session->killed with
1245
Session::NOT_KILLED.
1247
Creates an event for binlogging
1248
The value for local `killed_status' can be supplied by caller.
1250
Query_log_event::Query_log_event(Session* session_arg, const char* query_arg,
1251
ulong query_length, bool using_trans,
1253
Session::killed_state killed_status_arg)
1254
:Log_event(session_arg,
1255
(session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0) |
1256
(suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1258
data_buf(0), query(query_arg), catalog(session_arg->catalog),
1259
db(session_arg->db), q_len((uint32_t) query_length),
1260
thread_id(session_arg->thread_id),
1261
/* save the original thread id; we already know the server id */
1262
slave_proxy_id(session_arg->variables.pseudo_thread_id),
1263
flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1265
auto_increment_increment(session_arg->variables.auto_increment_increment),
1266
auto_increment_offset(session_arg->variables.auto_increment_offset),
1267
lc_time_names_number(session_arg->variables.lc_time_names->number),
1268
charset_database_number(0)
1272
if (killed_status_arg == Session::KILLED_NO_VALUE)
1273
killed_status_arg= session_arg->killed;
1276
(killed_status_arg == Session::NOT_KILLED) ?
1277
(session_arg->is_error() ? session_arg->main_da.sql_errno() : 0) :
1278
(session_arg->killed_errno());
1281
exec_time = (ulong) (end_time - session_arg->start_time);
1283
@todo this means that if we have no catalog, then it is replicated
1284
as an existing catalog of length zero. is that safe? /sven
1286
catalog_len = (catalog) ? (uint32_t) strlen(catalog) : 0;
1287
/* status_vars_len is set just before writing the event */
1288
db_len = (db) ? (uint32_t) strlen(db) : 0;
1289
if (session_arg->variables.collation_database != session_arg->db_charset)
1290
charset_database_number= session_arg->variables.collation_database->number;
1293
If we don't use flags2 for anything else than options contained in
1294
session_arg->options, it would be more efficient to flags2=session_arg->options
1295
(OPTIONS_WRITTEN_TO_BIN_LOG would be used only at reading time).
1296
But it's likely that we don't want to use 32 bits for 3 bits; in the future
1297
we will probably want to reclaim the 29 bits. So we need the &.
1299
flags2= (uint32_t) (session_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
1300
assert(session_arg->variables.character_set_client->number < 256*256);
1301
assert(session_arg->variables.collation_connection->number < 256*256);
1302
assert(session_arg->variables.collation_server->number < 256*256);
1303
assert(session_arg->variables.character_set_client->mbminlen == 1);
1304
int2store(charset, session_arg->variables.character_set_client->number);
1305
int2store(charset+2, session_arg->variables.collation_connection->number);
1306
int2store(charset+4, session_arg->variables.collation_server->number);
1310
static void copy_str_and_move(const char **src,
1311
Log_event::Byte **dst,
1314
memcpy(*dst, *src, len);
1315
*src= (const char *)*dst;
1322
Macro to check that there is enough space to read from memory.
1324
@param PTR Pointer to memory
1325
@param END End of memory
1326
@param CNT Number of bytes that should be read.
1328
#define CHECK_SPACE(PTR,END,CNT) \
1330
assert((PTR) + (CNT) <= (END)); \
1331
if ((PTR) + (CNT) > (END)) { \
1339
This is used by the SQL slave thread to prepare the event before execution.
1341
Query_log_event::Query_log_event(const char* buf, uint32_t event_len,
1342
const Format_description_log_event
1344
Log_event_type event_type)
1345
:Log_event(buf, description_event), data_buf(0), query(NULL),
1346
db(NULL), catalog_len(0), status_vars_len(0),
1347
flags2_inited(0), sql_mode_inited(0), charset_inited(0),
1348
auto_increment_increment(1), auto_increment_offset(1),
1349
time_zone_len(0), lc_time_names_number(0), charset_database_number(0)
1353
uint8_t common_header_len, post_header_len;
1354
Log_event::Byte *start;
1355
const Log_event::Byte *end;
1358
common_header_len= description_event->common_header_len;
1359
post_header_len= description_event->post_header_len[event_type-1];
1362
We test if the event's length is sensible, and if so we compute data_len.
1363
We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
1364
We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
1366
if (event_len < (uint)(common_header_len + post_header_len))
1368
data_len = event_len - (common_header_len + post_header_len);
1369
buf+= common_header_len;
1371
slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
1372
exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
1373
db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
1374
error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
1377
5.0 format starts here.
1378
Depending on the format, we may or not have affected/warnings etc
1379
The remnent post-header to be parsed has length:
1381
tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
1384
status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
1386
Check if status variable length is corrupt and will lead to very
1387
wrong data. We could be even more strict and require data_len to
1388
be even bigger, but this will suffice to catch most corruption
1389
errors that can lead to a crash.
1391
if (status_vars_len > cmin(data_len, (uint32_t)MAX_SIZE_LOG_EVENT_STATUS))
1396
data_len-= status_vars_len;
1400
We have parsed everything we know in the post header for QUERY_EVENT,
1401
the rest of post header is either comes from older version MySQL or
1402
dedicated to derived events (e.g. Execute_load_query...)
1405
/* variable-part: the status vars; only in MySQL 5.0 */
1407
start= (Log_event::Byte*) (buf+post_header_len);
1408
end= (const Log_event::Byte*) (start+status_vars_len);
1409
for (const Log_event::Byte* pos= start; pos < end;)
1413
CHECK_SPACE(pos, end, 4);
1415
flags2= uint4korr(pos);
1418
case Q_LC_TIME_NAMES_CODE:
1419
CHECK_SPACE(pos, end, 2);
1420
lc_time_names_number= uint2korr(pos);
1423
case Q_CHARSET_DATABASE_CODE:
1424
CHECK_SPACE(pos, end, 2);
1425
charset_database_number= uint2korr(pos);
1429
/* That's why you must write status vars in growing order of code */
1430
pos= (const unsigned char*) end; // Break loop
1434
if (!(start= data_buf = (Log_event::Byte*) malloc(catalog_len + 1 +
1438
if (catalog_len) // If catalog is given
1441
@todo we should clean up and do only copy_str_and_move; it
1442
works for both cases. Then we can remove the catalog_nz
1445
if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
1446
copy_str_and_move(&catalog, &start, catalog_len);
1449
memcpy(start, catalog, catalog_len+1); // copy end 0
1450
catalog= (const char *)start;
1451
start+= catalog_len+1;
1455
copy_str_and_move(&time_zone_str, &start, time_zone_len);
1458
if time_zone_len or catalog_len are 0, then time_zone and catalog
1459
are uninitialized at this point. shouldn't they point to the
1460
zero-length null-terminated strings we allocated space for in the
1461
my_alloc call above? /sven
1464
/* A 2nd variable part; this is common to all versions */
1465
memcpy(start, end, data_len); // Copy db and query
1466
start[data_len]= '\0'; // End query with \0 (For safetly)
1468
query= (char *)(start + db_len + 1);
1469
q_len= data_len - db_len -1;
1475
Query_log_event::do_apply_event()
1477
int Query_log_event::do_apply_event(Relay_log_info const *rli)
1479
return do_apply_event(rli, query, q_len);
1485
Compare the values of "affected rows" around here. Something
1488
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1490
sql_print_error("Slave: did not get the expected number of affected \
1491
rows running query from master - expected %d, got %d (this numbers \
1492
should have matched modulo 4294967296).", 0, ...);
1493
session->query_error = 1;
1496
We may also want an option to tell the slave to ignore "affected"
1497
mismatch. This mismatch could be implemented with a new ER_ code, and
1498
to ignore it you would use --slave-skip-errors...
1500
int Query_log_event::do_apply_event(Relay_log_info const *rli,
1501
const char *query_arg, uint32_t q_len_arg)
1503
int expected_error,actual_error= 0;
1504
Query_id &query_id= Query_id::get_query_id();
1506
Colleagues: please never free(session->catalog) in MySQL. This would
1507
lead to bugs as here session->catalog is a part of an alloced block,
1508
not an entire alloced block (see
1509
Query_log_event::do_apply_event()). Same for session->db. Thank
1512
session->catalog= catalog_len ? (char *) catalog : (char *)"";
1513
session->set_db(db, strlen(db)); /* allocates a copy of 'db' */
1514
session->variables.auto_increment_increment= auto_increment_increment;
1515
session->variables.auto_increment_offset= auto_increment_offset;
1518
InnoDB internally stores the master log position it has executed so far,
1519
i.e. the position just after the COMMIT event.
1520
When InnoDB will want to store, the positions in rli won't have
1521
been updated yet, so group_master_log_* will point to old BEGIN
1522
and event_master_log* will point to the beginning of current COMMIT.
1523
But log_pos of the COMMIT Query event is what we want, i.e. the pos of the
1524
END of the current log event (COMMIT). We save it in rli so that InnoDB can
1527
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
1529
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
1530
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
1533
Note: We do not need to execute reset_one_shot_variables() if this
1535
Reason: The db stored in binlog events is the same for SET and for
1536
its companion query. If the SET is ignored because of
1537
db_ok(), the companion query will also be ignored, and if
1538
the companion query is ignored in the db_ok() test of
1539
::do_apply_event(), then the companion SET also have so
1540
we don't need to reset_one_shot_variables().
1544
session->set_time((time_t)when);
1545
session->query_length= q_len_arg;
1546
session->query= (char*)query_arg;
1547
session->query_id= query_id.next();
1548
session->variables.pseudo_thread_id= thread_id; // for temp tables
1550
if (ignored_error_code((expected_error= error_code)) ||
1551
!check_expected_error(session,rli,expected_error))
1555
all bits of session->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
1556
must take their value from flags2.
1558
session->options= flags2|(session->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
1561
String tmp(time_zone_str, time_zone_len, &my_charset_bin);
1562
if (!(session->variables.time_zone= my_tz_find(session, &tmp)))
1564
my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr());
1565
session->variables.time_zone= global_system_variables.time_zone;
1566
goto compare_errors;
1569
if (lc_time_names_number)
1571
if (!(session->variables.lc_time_names=
1572
my_locale_by_number(lc_time_names_number)))
1574
my_printf_error(ER_UNKNOWN_ERROR,
1575
"Unknown locale: '%d'", MYF(0), lc_time_names_number);
1576
session->variables.lc_time_names= &my_locale_en_US;
1577
goto compare_errors;
1581
session->variables.lc_time_names= &my_locale_en_US;
1582
if (charset_database_number)
1584
const CHARSET_INFO *cs;
1585
if (!(cs= get_charset(charset_database_number, MYF(0))))
1588
int10_to_str((int) charset_database_number, buf, -10);
1589
my_error(ER_UNKNOWN_COLLATION, MYF(0), buf);
1590
goto compare_errors;
1592
session->variables.collation_database= cs;
1595
session->variables.collation_database= session->db_charset;
1597
/* Execute the query (note that we bypass dispatch_command()) */
1598
const char* found_semicolon= NULL;
1599
mysql_parse(session, session->query, session->query_length, &found_semicolon);
1600
log_slow_statement(session);
1605
The query got a really bad error on the master (thread killed etc),
1606
which could be inconsistent. Parse it to test the table names: if the
1607
replicate-*-do|ignore-table rules say "this query must be ignored" then
1608
we exit gracefully; otherwise we warn about the bad error and tell DBA
1611
if (mysql_test_parse_for_slave(session, session->query, session->query_length))
1612
clear_all_errors(session, const_cast<Relay_log_info*>(rli)); /* Can ignore query */
1615
rli->report(ERROR_LEVEL, expected_error,
1616
_("Query partially completed on the master "
1617
"(error on master: %d) and was aborted. There is a "
1618
"chance that your master is inconsistent at this "
1619
"point. If you are sure that your master is ok, run "
1620
"this query manually on the slave and then restart the "
1621
"slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1; "
1622
"START SLAVE; . Query: '%s'"),
1623
expected_error, session->query);
1624
session->is_slave_error= 1;
1632
If we expected a non-zero error code, and we don't get the same error
1633
code, and none of them should be ignored.
1635
actual_error= session->is_error() ? session->main_da.sql_errno() : 0;
1636
if ((expected_error != actual_error) &&
1638
!ignored_error_code(actual_error) &&
1639
!ignored_error_code(expected_error))
1641
rli->report(ERROR_LEVEL, 0,
1642
_("Query caused differenxt errors on master and slave.\n"
1643
"Error on master: '%s' (%d), Error on slave: '%s' (%d).\n"
1644
"Default database: '%s'. Query: '%s'"),
1647
actual_error ? session->main_da.message() : _("no error"),
1649
print_slave_db_safe(db), query_arg);
1650
session->is_slave_error= 1;
1653
If we get the same error code as expected, or they should be ignored.
1655
else if (expected_error == actual_error ||
1656
ignored_error_code(actual_error))
1658
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
1659
session->killed= Session::NOT_KILLED;
1662
Other cases: mostly we expected no error and get one.
1664
else if (session->is_slave_error || session->is_fatal_error)
1666
rli->report(ERROR_LEVEL, actual_error,
1667
_("Error '%s' on query. Default database: '%s'. Query: '%s'"),
1668
(actual_error ? session->main_da.message() :
1669
_("unexpected success or fatal error")),
1670
print_slave_db_safe(session->db), query_arg);
1671
session->is_slave_error= 1;
1675
TODO: compare the values of "affected rows" around here. Something
1677
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1679
sql_print_error("Slave: did not get the expected number of affected \
1680
rows running query from master - expected %d, got %d (this numbers \
1681
should have matched modulo 4294967296).", 0, ...);
1682
session->is_slave_error = 1;
1684
We may also want an option to tell the slave to ignore "affected"
1685
mismatch. This mismatch could be implemented with a new ER_ code, and
1686
to ignore it you would use --slave-skip-errors...
1688
To do the comparison we need to know the value of "affected" which the
1689
above mysql_parse() computed. And we need to know the value of
1690
"affected" in the master's binlog. Both will be implemented later. The
1691
important thing is that we now have the format ready to log the values
1692
of "affected" in the binlog. So we can release 5.0.0 before effectively
1693
logging "affected" and effectively comparing it.
1695
} /* End of if (db_ok(... */
1698
pthread_mutex_lock(&LOCK_thread_count);
1700
Probably we have set session->query, session->db, session->catalog to point to places
1701
in the data_buf of this event. Now the event is going to be deleted
1702
probably, so data_buf will be freed, so the session->... listed above will be
1703
pointers to freed memory.
1704
So we must set them to 0, so that those bad pointers values are not later
1705
used. Note that "cleanup" queries like automatic DROP TEMPORARY Table
1706
don't suffer from these assignments to 0 as DROP TEMPORARY
1707
Table uses the db.table syntax.
1709
session->catalog= 0;
1710
session->set_db(NULL, 0); /* will free the current database */
1711
session->query= 0; // just to be sure
1712
session->query_length= 0;
1713
pthread_mutex_unlock(&LOCK_thread_count);
1714
close_thread_tables(session);
1715
session->first_successful_insert_id_in_prev_stmt= 0;
1716
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
1717
return session->is_slave_error;
1720
int Query_log_event::do_update_pos(Relay_log_info *rli)
1722
return Log_event::do_update_pos(rli);
1726
Log_event::enum_skip_reason
1727
Query_log_event::do_shall_skip(Relay_log_info *rli)
1729
assert(query && q_len > 0);
1731
if (rli->slave_skip_counter > 0)
1733
if (strcmp("BEGIN", query) == 0)
1735
session->options|= OPTION_BEGIN;
1736
return(Log_event::continue_group(rli));
1739
if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
1741
session->options&= ~OPTION_BEGIN;
1742
return(Log_event::EVENT_SKIP_COUNT);
1745
return(Log_event::do_shall_skip(rli));
1749
/**************************************************************************
1750
Start_log_event_v3 methods
1751
**************************************************************************/
1753
Start_log_event_v3::Start_log_event_v3()
1754
:Log_event(), created(0), binlog_version(BINLOG_VERSION),
1755
artificial_event(0), dont_set_created(0)
1757
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
1761
Start_log_event_v3::pack_info()
1764
void Start_log_event_v3::pack_info(Protocol *protocol)
1766
char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
1767
pos= strcpy(buf, "Server ver: ")+12;
1768
pos= strcpy(pos, server_version)+strlen(server_version);
1769
pos= strcpy(pos, ", Binlog ver: ")+14;
1770
pos= int10_to_str(binlog_version, pos, 10);
1771
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
1776
Start_log_event_v3::Start_log_event_v3()
1779
Start_log_event_v3::Start_log_event_v3(const char* buf,
1780
const Format_description_log_event
1782
:Log_event(buf, description_event)
1784
buf+= description_event->common_header_len;
1785
binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET);
1786
memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
1788
// prevent overrun if log is corrupted on disk
1789
server_version[ST_SERVER_VER_LEN-1]= 0;
1790
created= uint4korr(buf+ST_CREATED_OFFSET);
1791
/* We use log_pos to mark if this was an artificial event or not */
1792
artificial_event= (log_pos == 0);
1793
dont_set_created= 1;
1798
Start_log_event_v3::write()
1801
bool Start_log_event_v3::write(IO_CACHE* file)
1803
char buff[START_V3_HEADER_LEN];
1804
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
1805
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
1806
if (!dont_set_created)
1807
created= when= get_time();
1808
int4store(buff + ST_CREATED_OFFSET,created);
1809
return (write_header(file, sizeof(buff)) ||
1810
my_b_safe_write(file, (unsigned char*) buff, sizeof(buff)));
1815
Start_log_event_v3::do_apply_event() .
1819
- To handle the case where the master died without having time to write
1820
DROP TEMPORARY Table, DO RELEASE_LOCK (prepared statements' deletion is
1821
TODO), we clean up all temporary tables that we got, if we are sure we
1825
- Remove all active user locks.
1826
Guilhem 2003-06: this is true but not urgent: the worst it can cause is
1827
the use of a bit of memory for a user lock which will not be used
1828
anymore. If the user lock is later used, the old one will be released. In
1829
other words, no deadlock problem.
1832
int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
1834
switch (binlog_version)
1839
This can either be 4.x (then a Start_log_event_v3 is only at master
1840
startup so we are sure the master has restarted and cleared his temp
1841
tables; the event always has 'created'>0) or 5.0 (then we have to test
1846
close_temporary_tables(session);
1847
cleanup_load_tmpdir();
1852
Now the older formats; in that case load_tmpdir is cleaned up by the I/O
1856
if (strncmp(rli->relay_log.description_event_for_exec->server_version,
1857
"3.23.57",7) >= 0 && created)
1860
Can distinguish, based on the value of 'created': this event was
1861
generated at master startup.
1863
close_temporary_tables(session);
1866
Otherwise, can't distinguish a Start_log_event generated at
1867
master startup and one generated by master FLUSH LOGS, so cannot
1868
be sure temp tables have to be dropped. So do nothing.
1872
/* this case is impossible */
1878
/***************************************************************************
1879
Format_description_log_event methods
1880
****************************************************************************/
1883
Format_description_log_event 1st ctor.
1885
Ctor. Can be used to create the event to write to the binary log (when the
1886
server starts or when FLUSH LOGS), or to create artificial events to parse
1887
binlogs from MySQL 3.23 or 4.x.
1888
When in a client, only the 2nd use is possible.
1890
@param binlog_version the binlog version for which we want to build
1891
an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x
1892
x>=2 and 4.1) or 4 (MySQL 5.0). Note that the
1893
old 4.0 (binlog version 2) is not supported;
1894
it should not be used for replication with
1898
Format_description_log_event::
1899
Format_description_log_event(uint8_t binlog_ver, const char*)
1900
:Start_log_event_v3(), event_type_permutation(0)
1902
binlog_version= binlog_ver;
1903
switch (binlog_ver) {
1904
case 4: /* MySQL 5.0 */
1905
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
1906
common_header_len= LOG_EVENT_HEADER_LEN;
1907
number_of_event_types= LOG_EVENT_TYPES;
1908
/* we'll catch malloc() error in is_valid() */
1909
post_header_len=(uint8_t*) malloc(number_of_event_types*sizeof(uint8_t));
1910
memset(post_header_len, 0, number_of_event_types*sizeof(uint8_t));
1912
This long list of assignments is not beautiful, but I see no way to
1913
make it nicer, as the right members are #defines, not array members, so
1914
it's impossible to write a loop.
1916
if (post_header_len)
1918
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
1919
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN;
1920
post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
1921
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
1922
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
1923
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
1924
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
1925
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
1926
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
1927
post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
1928
post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN;
1929
post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
1930
post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
1931
post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
1932
post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1];
1933
post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
1934
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
1935
post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
1939
default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
1942
calc_server_version_split();
1947
The problem with this constructor is that the fixed header may have a
1948
length different from this version, but we don't know this length as we
1949
have not read the Format_description_log_event which says it, yet. This
1950
length is in the post-header of the event, but we don't know where the
1953
So this type of event HAS to:
1954
- either have the header's length at the beginning (in the header, at a
1955
fixed position which will never be changed), not in the post-header. That
1956
would make the header be "shifted" compared to other events.
1957
- or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future
1958
versions, so that we know for sure.
1960
I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because
1961
it is sent before Format_description_log_event).
1964
Format_description_log_event::
1965
Format_description_log_event(const char* buf,
1968
Format_description_log_event*
1970
:Start_log_event_v3(buf, description_event), event_type_permutation(0)
1972
buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
1973
if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
1974
return; /* sanity check */
1975
number_of_event_types=
1976
event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1);
1977
/* If alloc fails, we'll detect it in is_valid() */
1978
post_header_len= (uint8_t*) malloc(number_of_event_types*
1979
sizeof(*post_header_len));
1980
memcpy(post_header_len, buf+ST_COMMON_HEADER_LEN_OFFSET+1,
1981
number_of_event_types* sizeof(*post_header_len));
1982
calc_server_version_split();
1985
In some previous versions, the events were given other event type
1986
id numbers than in the present version. When replicating from such
1987
a version, we therefore set up an array that maps those id numbers
1988
to the id numbers of the present server.
1990
If post_header_len is null, it means malloc failed, and is_valid
1991
will fail, so there is no need to do anything.
1993
The trees in which events have wrong id's are:
1995
mysql-5.1-wl1012.old mysql-5.1-wl2325-5.0-drop6p13-alpha
1996
mysql-5.1-wl2325-5.0-drop6 mysql-5.1-wl2325-5.0
1997
mysql-5.1-wl2325-no-dd
1999
(this was found by grepping for two lines in sequence where the
2000
first matches "FORMAT_DESCRIPTION_EVENT," and the second matches
2001
"TABLE_MAP_EVENT," in log_event.h in all trees)
2003
In these trees, the following server_versions existed since
2004
TABLE_MAP_EVENT was introduced:
2006
5.1.1-a_drop5p3 5.1.1-a_drop5p4 5.1.1-alpha
2007
5.1.2-a_drop5p10 5.1.2-a_drop5p11 5.1.2-a_drop5p12
2008
5.1.2-a_drop5p13 5.1.2-a_drop5p14 5.1.2-a_drop5p15
2009
5.1.2-a_drop5p16 5.1.2-a_drop5p16b 5.1.2-a_drop5p16c
2010
5.1.2-a_drop5p17 5.1.2-a_drop5p4 5.1.2-a_drop5p5
2011
5.1.2-a_drop5p6 5.1.2-a_drop5p7 5.1.2-a_drop5p8
2012
5.1.2-a_drop5p9 5.1.3-a_drop5p17 5.1.3-a_drop5p17b
2013
5.1.3-a_drop5p17c 5.1.4-a_drop5p18 5.1.4-a_drop5p19
2014
5.1.4-a_drop5p20 5.1.4-a_drop6p0 5.1.4-a_drop6p1
2015
5.1.4-a_drop6p2 5.1.5-a_drop5p20 5.2.0-a_drop6p3
2016
5.2.0-a_drop6p4 5.2.0-a_drop6p5 5.2.0-a_drop6p6
2017
5.2.1-a_drop6p10 5.2.1-a_drop6p11 5.2.1-a_drop6p12
2018
5.2.1-a_drop6p6 5.2.1-a_drop6p7 5.2.1-a_drop6p8
2019
5.2.2-a_drop6p13 5.2.2-a_drop6p13-alpha 5.2.2-a_drop6p13b
2022
(this was found by grepping for "mysql," in all historical
2023
versions of configure.in in the trees listed above).
2025
There are 5.1.1-alpha versions that use the new event id's, so we
2026
do not test that version string. So replication from 5.1.1-alpha
2027
with the other event id's to a new version does not work.
2028
Moreover, we can safely ignore the part after drop[56]. This
2029
allows us to simplify the big list above to the following regexes:
2031
5\.1\.[1-5]-a_drop5.*
2033
5\.2\.[0-2]-a_drop6.*
2035
This is what we test for in the 'if' below.
2037
if (post_header_len &&
2038
server_version[0] == '5' && server_version[1] == '.' &&
2039
server_version[3] == '.' &&
2040
strncmp(server_version + 5, "-a_drop", 7) == 0 &&
2041
((server_version[2] == '1' &&
2042
server_version[4] >= '1' && server_version[4] <= '5' &&
2043
server_version[12] == '5') ||
2044
(server_version[2] == '1' &&
2045
server_version[4] == '4' &&
2046
server_version[12] == '6') ||
2047
(server_version[2] == '2' &&
2048
server_version[4] >= '0' && server_version[4] <= '2' &&
2049
server_version[12] == '6')))
2051
if (number_of_event_types != 22)
2053
/* this makes is_valid() return false. */
2054
free(post_header_len);
2055
post_header_len= NULL;
2058
static const uint8_t perm[23]=
2060
UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
2061
LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
2062
APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
2064
FORMAT_DESCRIPTION_EVENT,
2067
BEGIN_LOAD_QUERY_EVENT,
2068
EXECUTE_LOAD_QUERY_EVENT,
2070
event_type_permutation= perm;
2072
Since we use (permuted) event id's to index the post_header_len
2073
array, we need to permute the post_header_len array too.
2075
uint8_t post_header_len_temp[23];
2076
for (int i= 1; i < 23; i++)
2077
post_header_len_temp[perm[i] - 1]= post_header_len[i - 1];
2078
for (int i= 0; i < 22; i++)
2079
post_header_len[i] = post_header_len_temp[i];
2084
bool Format_description_log_event::write(IO_CACHE* file)
2087
We don't call Start_log_event_v3::write() because this would make 2
2090
unsigned char buff[FORMAT_DESCRIPTION_HEADER_LEN];
2091
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
2092
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
2093
if (!dont_set_created)
2094
created= when= get_time();
2095
int4store(buff + ST_CREATED_OFFSET,created);
2096
buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
2097
memcpy(buff+ST_COMMON_HEADER_LEN_OFFSET+1, post_header_len,
2099
return (write_header(file, sizeof(buff)) ||
2100
my_b_safe_write(file, buff, sizeof(buff)));
2104
int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
2107
As a transaction NEVER spans on 2 or more binlogs:
2108
if we have an active transaction at this point, the master died
2109
while writing the transaction to the binary log, i.e. while
2110
flushing the binlog cache to the binlog. XA guarantees that master has
2111
rolled back. So we roll back.
2112
Note: this event could be sent by the master to inform us of the
2113
format of its binlog; in other words maybe it is not at its
2114
original place when it comes to us; we'll know this by checking
2115
log_pos ("artificial" events have log_pos == 0).
2117
if (!artificial_event && created && session->transaction.all.ha_list)
2119
/* This is not an error (XA is safe), just an information */
2120
rli->report(INFORMATION_LEVEL, 0,
2121
_("Rolling back unfinished transaction (no COMMIT "
2122
"or ROLLBACK in relay log). A probable cause is that "
2123
"the master died while writing the transaction to "
2124
"its binary log, thus rolled back too."));
2125
const_cast<Relay_log_info*>(rli)->cleanup_context(session, 1);
2128
If this event comes from ourselves, there is no cleaning task to
2129
perform, we don't call Start_log_event_v3::do_apply_event()
2130
(this was just to update the log's description event).
2132
if (server_id != ::server_id)
2135
If the event was not requested by the slave i.e. the master sent
2136
it while the slave asked for a position >4, the event will make
2137
rli->group_master_log_pos advance. Say that the slave asked for
2138
position 1000, and the Format_desc event's end is 96. Then in
2139
the beginning of replication rli->group_master_log_pos will be
2140
0, then 96, then jump to first really asked event (which is
2141
>96). So this is ok.
2143
return(Start_log_event_v3::do_apply_event(rli));
2148
int Format_description_log_event::do_update_pos(Relay_log_info *rli)
2150
/* save the information describing this binlog */
2151
delete rli->relay_log.description_event_for_exec;
2152
rli->relay_log.description_event_for_exec= this;
2154
if (server_id == ::server_id)
2157
We only increase the relay log position if we are skipping
2158
events and do not touch any group_* variables, nor flush the
2159
relay log info. If there is a crash, we will have to re-skip
2160
the events again, but that is a minor issue.
2162
If we do not skip stepping the group log position (and the
2163
server id was changed when restarting the server), it might well
2164
be that we start executing at a position that is invalid, e.g.,
2165
at a Rows_log_event or a Query_log_event preceeded by a
2166
Intvar_log_event instead of starting at a Table_map_log_event or
2167
the Intvar_log_event respectively.
2169
rli->inc_event_relay_log_pos();
2174
return Log_event::do_update_pos(rli);
2178
Log_event::enum_skip_reason
2179
Format_description_log_event::do_shall_skip(Relay_log_info *)
2181
return Log_event::EVENT_SKIP_NOT;
2186
Splits the event's 'server_version' string into three numeric pieces stored
2187
into 'server_version_split':
2188
X.Y.Zabc (X,Y,Z numbers, a not a digit) -> {X,Y,Z}
2191
'server_version_split' is then used for lookups to find if the server which
2192
created this event has some known bug.
2194
void Format_description_log_event::calc_server_version_split()
2196
char *p= server_version, *r;
2198
for (uint32_t i= 0; i<=2; i++)
2200
number= strtoul(p, &r, 10);
2201
server_version_split[i]= (unsigned char)number;
2202
assert(number < 256); // fit in unsigned char
2204
assert(!((i == 0) && (*r != '.'))); // should be true in practice
2206
p++; // skip the dot
2211
/**************************************************************************
2212
Load_log_event methods
2213
General note about Load_log_event: the binlogging of LOAD DATA INFILE is
2214
going to be changed in 5.0 (or maybe in 5.1; not decided yet).
2215
However, the 5.0 slave could still have to read such events (from a 4.x
2216
master), convert them (which just means maybe expand the header, when 5.0
2217
servers have a UID in events) (remember that whatever is after the header
2218
will be like in 4.x, as this event's format is not modified in 5.0 as we
2219
will use new types of events to log the new LOAD DATA INFILE features).
2220
To be able to read/convert, we just need to not assume that the common
2221
header is of length LOG_EVENT_HEADER_LEN (we must use the description
2223
Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE
2224
between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the
2225
positions displayed in SHOW SLAVE STATUS then are fine too).
2226
**************************************************************************/
2229
Load_log_event::pack_info()
2232
uint32_t Load_log_event::get_query_buffer_length()
2235
5 + db_len + 3 + // "use DB; "
2236
18 + fname_len + 2 + // "LOAD DATA INFILE 'file''"
2238
9 + // " REPLACE or IGNORE "
2239
13 + table_name_len*2 + // "INTO Table `table`"
2240
21 + sql_ex.field_term_len*4 + 2 + // " FIELDS TERMINATED BY 'str'"
2241
23 + sql_ex.enclosed_len*4 + 2 + // " OPTIONALLY ENCLOSED BY 'str'"
2242
12 + sql_ex.escaped_len*4 + 2 + // " ESCAPED BY 'str'"
2243
21 + sql_ex.line_term_len*4 + 2 + // " LINES TERMINATED BY 'str'"
2244
19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'"
2245
15 + 22 + // " IGNORE xxx LINES"
2246
3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)"
2250
void Load_log_event::print_query(bool need_db, char *buf,
2251
char **end, char **fn_start, char **fn_end)
2255
if (need_db && db && db_len)
2257
pos= strcpy(pos, "use `")+5;
2258
memcpy(pos, db, db_len);
2259
pos= strcpy(pos+db_len, "`; ")+3;
2262
pos= strcpy(pos, "LOAD DATA ")+10;
2267
if (check_fname_outside_temp_buf())
2268
pos= strcpy(pos, "LOCAL ")+6;
2269
pos= strcpy(pos, "INFILE '")+8;
2270
memcpy(pos, fname, fname_len);
2271
pos= strcpy(pos+fname_len, "' ")+2;
2273
if (sql_ex.opt_flags & REPLACE_FLAG)
2274
pos= strcpy(pos, " REPLACE ")+9;
2275
else if (sql_ex.opt_flags & IGNORE_FLAG)
2276
pos= strcpy(pos, " IGNORE ")+8;
2278
pos= strcpy(pos ,"INTO")+4;
2283
pos= strcpy(pos ," Table `")+8;
2284
memcpy(pos, table_name, table_name_len);
2285
pos+= table_name_len;
2287
/* We have to create all optinal fields as the default is not empty */
2288
pos= strcpy(pos, "` FIELDS TERMINATED BY ")+23;
2289
pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len);
2290
if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
2291
pos= strcpy(pos, " OPTIONALLY ")+12;
2292
pos= strcpy(pos, " ENCLOSED BY ")+13;
2293
pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len);
2295
pos= strcpy(pos, " ESCAPED BY ")+12;
2296
pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
2298
pos= strcpy(pos, " LINES TERMINATED BY ")+21;
2299
pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
2300
if (sql_ex.line_start_len)
2302
pos= strcpy(pos, " STARTING BY ")+13;
2303
pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
2306
if ((long) skip_lines > 0)
2308
pos= strcpy(pos, " IGNORE ")+8;
2309
pos= int64_t10_to_str((int64_t) skip_lines, pos, 10);
2310
pos= strcpy(pos," LINES ")+7;
2316
const char *field= fields;
2317
pos= strcpy(pos, " (")+2;
2318
for (i = 0; i < num_fields; i++)
2325
memcpy(pos, field, field_lens[i]);
2326
pos+= field_lens[i];
2327
field+= field_lens[i] + 1;
2336
void Load_log_event::pack_info(Protocol *protocol)
2340
if (!(buf= (char*) malloc(get_query_buffer_length())))
2342
print_query(true, buf, &end, 0, 0);
2343
protocol->store(buf, end-buf, &my_charset_bin);
2349
Load_log_event::write_data_header()
2352
bool Load_log_event::write_data_header(IO_CACHE* file)
2354
char buf[LOAD_HEADER_LEN];
2355
int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
2356
int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
2357
int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
2358
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
2359
buf[L_DB_LEN_OFFSET] = (char)db_len;
2360
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
2361
return my_b_safe_write(file, (unsigned char*)buf, LOAD_HEADER_LEN) != 0;
2366
Load_log_event::write_data_body()
2369
bool Load_log_event::write_data_body(IO_CACHE* file)
2371
if (sql_ex.write_data(file))
2373
if (num_fields && fields && field_lens)
2375
if (my_b_safe_write(file, (unsigned char*)field_lens, num_fields) ||
2376
my_b_safe_write(file, (unsigned char*)fields, field_block_len))
2379
return (my_b_safe_write(file, (unsigned char*)table_name, table_name_len + 1) ||
2380
my_b_safe_write(file, (unsigned char*)db, db_len + 1) ||
2381
my_b_safe_write(file, (unsigned char*)fname, fname_len));
2386
Load_log_event::Load_log_event()
2389
Load_log_event::Load_log_event(Session *session_arg, sql_exchange *ex,
2390
const char *db_arg, const char *table_name_arg,
2391
List<Item> &fields_arg,
2392
enum enum_duplicates handle_dup,
2393
bool ignore, bool using_trans)
2394
:Log_event(session_arg,
2395
session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
2397
thread_id(session_arg->thread_id),
2398
slave_proxy_id(session_arg->variables.pseudo_thread_id),
2399
num_fields(0),fields(0),
2400
field_lens(0),field_block_len(0),
2401
table_name(table_name_arg ? table_name_arg : ""),
2402
db(db_arg), fname(ex->file_name), local_fname(false)
2406
exec_time = (ulong) (end_time - session_arg->start_time);
2407
/* db can never be a zero pointer in 4.0 */
2408
db_len = (uint32_t) strlen(db);
2409
table_name_len = (uint32_t) strlen(table_name);
2410
fname_len = (fname) ? (uint) strlen(fname) : 0;
2411
sql_ex.field_term = (char*) ex->field_term->ptr();
2412
sql_ex.field_term_len = (uint8_t) ex->field_term->length();
2413
sql_ex.enclosed = (char*) ex->enclosed->ptr();
2414
sql_ex.enclosed_len = (uint8_t) ex->enclosed->length();
2415
sql_ex.line_term = (char*) ex->line_term->ptr();
2416
sql_ex.line_term_len = (uint8_t) ex->line_term->length();
2417
sql_ex.line_start = (char*) ex->line_start->ptr();
2418
sql_ex.line_start_len = (uint8_t) ex->line_start->length();
2419
sql_ex.escaped = (char*) ex->escaped->ptr();
2420
sql_ex.escaped_len = (uint8_t) ex->escaped->length();
2421
sql_ex.opt_flags = 0;
2422
sql_ex.cached_new_format = -1;
2425
sql_ex.opt_flags|= DUMPFILE_FLAG;
2426
if (ex->opt_enclosed)
2427
sql_ex.opt_flags|= OPT_ENCLOSED_FLAG;
2429
sql_ex.empty_flags= 0;
2431
switch (handle_dup) {
2433
sql_ex.opt_flags|= REPLACE_FLAG;
2435
case DUP_UPDATE: // Impossible here
2440
sql_ex.opt_flags|= IGNORE_FLAG;
2442
if (!ex->field_term->length())
2443
sql_ex.empty_flags |= FIELD_TERM_EMPTY;
2444
if (!ex->enclosed->length())
2445
sql_ex.empty_flags |= ENCLOSED_EMPTY;
2446
if (!ex->line_term->length())
2447
sql_ex.empty_flags |= LINE_TERM_EMPTY;
2448
if (!ex->line_start->length())
2449
sql_ex.empty_flags |= LINE_START_EMPTY;
2450
if (!ex->escaped->length())
2451
sql_ex.empty_flags |= ESCAPED_EMPTY;
2453
skip_lines = ex->skip_lines;
2455
List_iterator<Item> li(fields_arg);
2456
field_lens_buf.length(0);
2457
fields_buf.length(0);
2459
while ((item = li++))
2462
unsigned char len = (unsigned char) strlen(item->name);
2463
field_block_len += len + 1;
2464
fields_buf.append(item->name, len + 1);
2465
field_lens_buf.append((char*)&len, 1);
2468
field_lens = (const unsigned char*)field_lens_buf.ptr();
2469
fields = fields_buf.ptr();
2475
The caller must do buf[event_len] = 0 before he starts using the
2478
Load_log_event::Load_log_event(const char *buf, uint32_t event_len,
2479
const Format_description_log_event *description_event)
2480
:Log_event(buf, description_event), num_fields(0), fields(0),
2481
field_lens(0),field_block_len(0),
2482
table_name(0), db(0), fname(0), local_fname(false)
2485
I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0,
2486
4.0->5.0 and 5.0->5.0 and it works.
2489
copy_log_event(buf, event_len,
2490
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
2492
description_event->common_header_len :
2493
LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
2495
/* otherwise it's a derived class, will call copy_log_event() itself */
2501
Load_log_event::copy_log_event()
2504
int Load_log_event::copy_log_event(const char *buf, ulong event_len,
2506
const Format_description_log_event *description_event)
2509
char* buf_end = (char*)buf + event_len;
2510
/* this is the beginning of the post-header */
2511
const char* data_head = buf + description_event->common_header_len;
2512
slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
2513
exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
2514
skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
2515
table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
2516
db_len = (uint)data_head[L_DB_LEN_OFFSET];
2517
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
2519
if ((int) event_len < body_offset)
2522
Sql_ex.init() on success returns the pointer to the first byte after
2523
the sql_ex structure, which is the start of field lengths array.
2525
if (!(field_lens= (unsigned char*)sql_ex.init((char*)buf + body_offset,
2527
buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
2530
data_len = event_len - body_offset;
2531
if (num_fields > data_len) // simple sanity check against corruption
2533
for (uint32_t i = 0; i < num_fields; i++)
2534
field_block_len += (uint)field_lens[i] + 1;
2536
fields = (char*)field_lens + num_fields;
2537
table_name = fields + field_block_len;
2538
db = table_name + table_name_len + 1;
2539
fname = db + db_len + 1;
2540
fname_len = strlen(fname);
2541
// null termination is accomplished by the caller doing buf[event_len]=0
2548
Load_log_event::set_fields()
2551
This function can not use the member variable
2552
for the database, since LOAD DATA INFILE on the slave
2553
can be for a different database than the current one.
2554
This is the reason for the affected_db argument to this method.
2557
void Load_log_event::set_fields(const char* affected_db,
2558
List<Item> &field_list,
2559
Name_resolution_context *context)
2562
const char* field = fields;
2563
for (i= 0; i < num_fields; i++)
2565
field_list.push_back(new Item_field(context,
2566
affected_db, table_name, field));
2567
field+= field_lens[i] + 1;
2573
Does the data loading job when executing a LOAD DATA on the slave.
2577
@param use_rli_only_for_errors If set to 1, rli is provided to
2578
Load_log_event::exec_event only for this
2579
function to have RPL_LOG_NAME and
2580
rli->last_slave_error, both being used by
2581
error reports. rli's position advancing
2582
is skipped (done by the caller which is
2583
Execute_load_log_event::exec_event).
2584
If set to 0, rli is provided for full use,
2585
i.e. for error reports and position
2589
fix this; this can be done by testing rules in
2590
Create_file_log_event::exec_event() and then discarding Append_block and
2593
this is a bug - this needs to be moved to the I/O thread
2601
int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
2602
bool use_rli_only_for_errors)
2604
Query_id &query_id= Query_id::get_query_id();
2605
session->set_db(db, strlen(db));
2606
assert(session->query == 0);
2607
session->query_length= 0; // Should not be needed
2608
session->is_slave_error= 0;
2609
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
2611
/* see Query_log_event::do_apply_event() and BUG#13360 */
2612
assert(!rli->m_table_map.count());
2614
Usually lex_start() is called by mysql_parse(), but we need it here
2615
as the present method does not call mysql_parse().
2618
mysql_reset_session_for_next_command(session);
2620
if (!use_rli_only_for_errors)
2623
Saved for InnoDB, see comment in
2624
Query_log_event::do_apply_event()
2626
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
2630
We test replicate_*_db rules. Note that we have already prepared
2631
the file to load, even if we are going to ignore and delete it
2632
now. So it is possible that we did a lot of disk writes for
2633
nothing. In other words, a big LOAD DATA INFILE on the master will
2634
still consume a lot of space on the slave (space in the relay log
2635
+ space of temp files: twice the space of the file to load...)
2636
even if it will finally be ignored. TODO: fix this; this can be
2637
done by testing rules in Create_file_log_event::do_apply_event()
2638
and then discarding Append_block and al. Another way is do the
2639
filtering in the I/O thread (more efficient: no disk writes at
2643
Note: We do not need to execute reset_one_shot_variables() if this
2645
Reason: The db stored in binlog events is the same for SET and for
2646
its companion query. If the SET is ignored because of
2647
db_ok(), the companion query will also be ignored, and if
2648
the companion query is ignored in the db_ok() test of
2649
::do_apply_event(), then the companion SET also have so
2650
we don't need to reset_one_shot_variables().
2654
session->set_time((time_t)when);
2655
session->query_id = query_id.next();
2657
Initing session->row_count is not necessary in theory as this variable has no
2658
influence in the case of the slave SQL thread (it is used to generate a
2659
"data truncated" warning but which is absorbed and never gets to the
2660
error log); still we init it to avoid a Valgrind message.
2662
drizzle_reset_errors(session, 0);
2665
memset(&tables, 0, sizeof(tables));
2666
tables.db= session->strmake(session->db, session->db_length);
2667
tables.alias = tables.table_name = (char*) table_name;
2668
tables.lock_type = TL_WRITE;
2671
// the table will be opened in mysql_load
2675
enum enum_duplicates handle_dup;
2677
char *load_data_query;
2680
Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST
2681
and written to slave's binlog if binlogging is on.
2683
if (!(load_data_query= (char *)session->alloc(get_query_buffer_length() + 1)))
2686
This will set session->fatal_error in case of OOM. So we surely will notice
2687
that something is wrong.
2692
print_query(false, load_data_query, &end, (char **)&session->lex->fname_start,
2693
(char **)&session->lex->fname_end);
2695
session->query_length= end - load_data_query;
2696
session->query= load_data_query;
2698
if (sql_ex.opt_flags & REPLACE_FLAG)
2700
handle_dup= DUP_REPLACE;
2702
else if (sql_ex.opt_flags & IGNORE_FLAG)
2705
handle_dup= DUP_ERROR;
2710
When replication is running fine, if it was DUP_ERROR on the
2711
master then we could choose IGNORE here, because if DUP_ERROR
2712
suceeded on master, and data is identical on the master and slave,
2713
then there should be no uniqueness errors on slave, so IGNORE is
2714
the same as DUP_ERROR. But in the unlikely case of uniqueness errors
2715
(because the data on the master and slave happen to be different
2716
(user error or bug), we want LOAD DATA to print an error message on
2717
the slave to discover the problem.
2719
If reading from net (a 3.23 master), mysql_load() will change this
2722
handle_dup= DUP_ERROR;
2725
We need to set session->lex->sql_command and session->lex->duplicates
2726
since InnoDB tests these variables to decide if this is a LOAD
2727
DATA ... REPLACE INTO ... statement even though mysql_parse()
2728
is not called. This is not needed in 5.0 since there the LOAD
2729
DATA ... statement is replicated using mysql_parse(), which
2730
sets the session->lex fields correctly.
2732
session->lex->sql_command= SQLCOM_LOAD;
2733
session->lex->duplicates= handle_dup;
2735
sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
2736
String field_term(sql_ex.field_term,sql_ex.field_term_len,&my_charset_utf8_general_ci);
2737
String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,&my_charset_utf8_general_ci);
2738
String line_term(sql_ex.line_term,sql_ex.line_term_len,&my_charset_utf8_general_ci);
2739
String line_start(sql_ex.line_start,sql_ex.line_start_len,&my_charset_utf8_general_ci);
2740
String escaped(sql_ex.escaped,sql_ex.escaped_len, &my_charset_utf8_general_ci);
2741
ex.field_term= &field_term;
2742
ex.enclosed= &enclosed;
2743
ex.line_term= &line_term;
2744
ex.line_start= &line_start;
2745
ex.escaped= &escaped;
2747
ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
2748
if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
2749
ex.field_term->length(0);
2751
ex.skip_lines = skip_lines;
2752
List<Item> field_list;
2753
session->lex->select_lex.context.resolve_in_table_list_only(&tables);
2754
set_fields(tables.db, field_list, &session->lex->select_lex.context);
2755
session->variables.pseudo_thread_id= thread_id;
2758
// mysql_load will use session->net to read the file
2759
session->net.vio = net->vio;
2761
Make sure the client does not get confused about the packet sequence
2763
session->net.pkt_nr = net->pkt_nr;
2766
It is safe to use tmp_list twice because we are not going to
2767
update it inside mysql_load().
2769
List<Item> tmp_list;
2770
if (mysql_load(session, &ex, &tables, field_list, tmp_list, tmp_list,
2771
handle_dup, ignore, net != 0))
2772
session->is_slave_error= 1;
2773
if (session->cuted_fields)
2775
/* log_pos is the position of the LOAD event in the master log */
2776
sql_print_warning(_("Slave: load data infile on table '%s' at "
2777
"log position %s in log '%s' produced %ld "
2778
"warning(s). Default database: '%s'"),
2780
llstr(log_pos,llbuff), RPL_LOG_NAME,
2781
(ulong) session->cuted_fields,
2782
print_slave_db_safe(session->db));
2785
net->pkt_nr= session->net.pkt_nr;
2791
We will just ask the master to send us /dev/null if we do not
2792
want to load the data.
2793
TODO: this a bug - needs to be done in I/O thread
2796
skip_load_data_infile(net);
2800
session->net.vio = 0;
2801
const char *remember_db= session->db;
2802
pthread_mutex_lock(&LOCK_thread_count);
2803
session->catalog= 0;
2804
session->set_db(NULL, 0); /* will free the current database */
2806
session->query_length= 0;
2807
pthread_mutex_unlock(&LOCK_thread_count);
2808
close_thread_tables(session);
2810
if (session->is_slave_error)
2812
/* this err/sql_errno code is copy-paste from net_send_error() */
2815
if (session->is_error())
2817
err= session->main_da.message();
2818
sql_errno= session->main_da.sql_errno();
2822
sql_errno=ER_UNKNOWN_ERROR;
2825
rli->report(ERROR_LEVEL, sql_errno,
2826
_("Error '%s' running LOAD DATA INFILE on table '%s'. "
2827
"Default database: '%s'"),
2828
err, (char*)table_name, print_slave_db_safe(remember_db));
2829
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
2832
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
2834
if (session->is_fatal_error)
2837
snprintf(buf, sizeof(buf),
2838
_("Running LOAD DATA INFILE on table '%-.64s'."
2839
" Default database: '%-.64s'"),
2841
print_slave_db_safe(remember_db));
2843
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2844
ER(ER_SLAVE_FATAL_ERROR), buf);
2848
return ( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) );
2852
/**************************************************************************
2853
Rotate_log_event methods
2854
**************************************************************************/
2857
Rotate_log_event::pack_info()
2860
void Rotate_log_event::pack_info(Protocol *protocol)
2862
char buf1[256], buf[22];
2863
String tmp(buf1, sizeof(buf1), &my_charset_utf8_general_ci);
2865
tmp.append(new_log_ident, ident_len);
2866
tmp.append(STRING_WITH_LEN(";pos="));
2867
tmp.append(llstr(pos,buf));
2868
protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
2873
Rotate_log_event::Rotate_log_event() (2 constructors)
2877
Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
2878
uint32_t ident_len_arg, uint64_t pos_arg,
2880
:Log_event(), new_log_ident(new_log_ident_arg),
2881
pos(pos_arg),ident_len(ident_len_arg ? ident_len_arg :
2882
(uint) strlen(new_log_ident_arg)), flags(flags_arg)
2884
if (flags & DUP_NAME)
2885
new_log_ident= strndup(new_log_ident_arg, ident_len);
2890
Rotate_log_event::Rotate_log_event(const char* buf, uint32_t event_len,
2891
const Format_description_log_event* description_event)
2892
:Log_event(buf, description_event) ,new_log_ident(0), flags(DUP_NAME)
2894
// The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
2895
uint8_t header_size= description_event->common_header_len;
2896
uint8_t post_header_len= description_event->post_header_len[ROTATE_EVENT-1];
2897
uint32_t ident_offset;
2898
if (event_len < header_size)
2901
pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
2902
ident_len = (uint)(event_len -
2903
(header_size+post_header_len));
2904
ident_offset = post_header_len;
2905
set_if_smaller(ident_len,FN_REFLEN-1);
2906
new_log_ident= strndup(buf + ident_offset, ident_len);
2912
Rotate_log_event::write()
2915
bool Rotate_log_event::write(IO_CACHE* file)
2917
char buf[ROTATE_HEADER_LEN];
2918
int8store(buf + R_POS_OFFSET, pos);
2919
return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
2920
my_b_safe_write(file, (unsigned char*)buf, ROTATE_HEADER_LEN) ||
2921
my_b_safe_write(file, (unsigned char*)new_log_ident, (uint) ident_len));
2926
Got a rotate log event from the master.
2928
This is mainly used so that we can later figure out the logname and
2929
position for the master.
2931
We can't rotate the slave's BINlog as this will cause infinitive rotations
2932
in a A -> B -> A setup.
2933
The NOTES below is a wrong comment which will disappear when 4.1 is merged.
2938
int Rotate_log_event::do_update_pos(Relay_log_info *rli)
2940
pthread_mutex_lock(&rli->data_lock);
2941
rli->event_relay_log_pos= my_b_tell(rli->cur_log);
2943
If we are in a transaction or in a group: the only normal case is
2944
when the I/O thread was copying a big transaction, then it was
2945
stopped and restarted: we have this in the relay log:
2953
In that case, we don't want to touch the coordinates which
2954
correspond to the beginning of the transaction. Starting from
2955
5.0.0, there also are some rotates from the slave itself, in the
2956
relay log, which shall not change the group positions.
2958
if ((server_id != ::server_id || rli->replicate_same_server_id) &&
2959
!rli->is_in_group())
2961
rli->group_master_log_name.assign(new_log_ident, ident_len+1);
2962
rli->notify_group_master_log_name_update();
2963
rli->group_master_log_pos= pos;
2964
rli->group_relay_log_name.assign(rli->event_relay_log_name);
2965
rli->notify_group_relay_log_name_update();
2966
rli->group_relay_log_pos= rli->event_relay_log_pos;
2968
Reset session->options and sql_mode etc, because this could be the signal of
2969
a master's downgrade from 5.0 to 4.0.
2970
However, no need to reset description_event_for_exec: indeed, if the next
2971
master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
2972
master is 4.0 then the events are in the slave's format (conversion).
2974
set_slave_thread_options(session);
2975
session->variables.auto_increment_increment=
2976
session->variables.auto_increment_offset= 1;
2978
pthread_mutex_unlock(&rli->data_lock);
2979
pthread_cond_broadcast(&rli->data_cond);
2980
flush_relay_log_info(rli);
2986
Log_event::enum_skip_reason
2987
Rotate_log_event::do_shall_skip(Relay_log_info *rli)
2989
enum_skip_reason reason= Log_event::do_shall_skip(rli);
2992
case Log_event::EVENT_SKIP_NOT:
2993
case Log_event::EVENT_SKIP_COUNT:
2994
return Log_event::EVENT_SKIP_NOT;
2996
case Log_event::EVENT_SKIP_IGNORE:
2997
return Log_event::EVENT_SKIP_IGNORE;
3000
return Log_event::EVENT_SKIP_NOT; // To keep compiler happy
3004
/**************************************************************************
3005
Xid_log_event methods
3006
**************************************************************************/
3008
void Xid_log_event::pack_info(Protocol *protocol)
3010
char buf[128], *pos;
3011
pos= strcpy(buf, "COMMIT /* xid=")+14;
3012
pos= int64_t10_to_str(xid, pos, 10);
3013
pos= strcpy(pos, " */")+3;
3014
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3019
It's ok not to use int8store here,
3020
as long as XID::set(uint64_t) and
3021
XID::get_my_xid doesn't do it either.
3022
We don't care about actual values of xids as long as
3023
identical numbers compare identically
3027
Xid_log_event(const char* buf,
3028
const Format_description_log_event *description_event)
3029
:Log_event(buf, description_event)
3031
buf+= description_event->common_header_len;
3032
memcpy(&xid, buf, sizeof(xid));
3036
bool Xid_log_event::write(IO_CACHE* file)
3038
return write_header(file, sizeof(xid)) ||
3039
my_b_safe_write(file, (unsigned char*) &xid, sizeof(xid));
3043
int Xid_log_event::do_apply_event(const Relay_log_info *)
3045
return end_trans(session, COMMIT);
3048
Log_event::enum_skip_reason
3049
Xid_log_event::do_shall_skip(Relay_log_info *rli)
3051
if (rli->slave_skip_counter > 0) {
3052
session->options&= ~OPTION_BEGIN;
3053
return(Log_event::EVENT_SKIP_COUNT);
3055
return(Log_event::do_shall_skip(rli));
3059
/**************************************************************************
3060
Slave_log_event methods
3061
**************************************************************************/
3063
void Slave_log_event::pack_info(Protocol *protocol)
3065
char buf[256+HOSTNAME_LENGTH], *pos;
3066
pos= strcpy(buf, "host=")+5;
3067
pos= my_stpncpy(pos, master_host.c_str(), HOSTNAME_LENGTH);
3068
pos= strcpy(pos, ",port=")+6;
3069
pos= int10_to_str((long) master_port, pos, 10);
3070
pos= strcpy(pos, ",log=")+5;
3071
pos= strcpy(pos, master_log.c_str())+master_log.length();
3072
pos= strcpy(pos, ",pos=")+5;
3073
pos= int64_t10_to_str(master_pos, pos, 10);
3074
protocol->store(buf, pos-buf, &my_charset_bin);
3080
re-write this better without holding both locks at the same time
3082
Slave_log_event::Slave_log_event(Session* session_arg,
3083
Relay_log_info* rli)
3084
:Log_event(session_arg, 0, 0) , mem_pool(0), master_host(0)
3086
if (!rli->inited) // QQ When can this happen ?
3089
Master_info* mi = rli->mi;
3090
// TODO: re-write this better without holding both locks at the same time
3091
pthread_mutex_lock(&mi->data_lock);
3092
pthread_mutex_lock(&rli->data_lock);
3093
// on OOM, just do not initialize the structure and print the error
3094
if ((mem_pool = (char*)malloc(get_data_size() + 1)))
3096
master_host.assign(mi->getHostname());
3097
master_log.assign(rli->group_master_log_name);
3098
master_port = mi->getPort();
3099
master_pos = rli->group_master_log_pos;
3102
sql_print_error(_("Out of memory while recording slave event"));
3103
pthread_mutex_unlock(&rli->data_lock);
3104
pthread_mutex_unlock(&mi->data_lock);
3109
Slave_log_event::~Slave_log_event()
3115
int Slave_log_event::get_data_size()
3117
return master_host.length() + master_log.length() + 1 + SL_MASTER_HOST_OFFSET;
3121
bool Slave_log_event::write(IO_CACHE* file)
3123
ulong event_length= get_data_size();
3124
int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
3125
int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
3126
// log and host are already there
3128
return (write_header(file, event_length) ||
3129
my_b_safe_write(file, (unsigned char*) mem_pool, event_length));
3133
void Slave_log_event::init_from_mem_pool()
3135
master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
3136
master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
3138
/* Assign these correctly */
3139
master_host.assign(mem_pool + SL_MASTER_HOST_OFFSET);
3140
master_log.assign();
3145
int Slave_log_event::do_apply_event(const Relay_log_info *)
3147
if (drizzle_bin_log.is_open())
3148
drizzle_bin_log.write(this);
3153
/**************************************************************************
3154
Stop_log_event methods
3155
**************************************************************************/
3158
The master stopped. We used to clean up all temporary tables but
3159
this is useless as, as the master has shut down properly, it has
3160
written all DROP TEMPORARY Table (prepared statements' deletion is
3161
TODO only when we binlog prep stmts). We used to clean up
3162
slave_load_tmpdir, but this is useless as it has been cleared at the
3163
end of LOAD DATA INFILE. So we have nothing to do here. The place
3164
were we must do this cleaning is in
3165
Start_log_event_v3::do_apply_event(), not here. Because if we come
3166
here, the master was sane.
3168
int Stop_log_event::do_update_pos(Relay_log_info *rli)
3171
We do not want to update master_log pos because we get a rotate event
3172
before stop, so by now group_master_log_name is set to the next log.
3173
If we updated it, we will have incorrect master coordinates and this
3174
could give false triggers in MASTER_POS_WAIT() that we have reached
3175
the target position when in fact we have not.
3177
if (session->options & OPTION_BEGIN)
3178
rli->inc_event_relay_log_pos();
3181
rli->inc_group_relay_log_pos(0);
3182
flush_relay_log_info(rli);
3188
/**************************************************************************
3189
Create_file_log_event methods
3190
**************************************************************************/
3193
Create_file_log_event ctor
3196
Create_file_log_event::
3197
Create_file_log_event(Session* session_arg, sql_exchange* ex,
3198
const char* db_arg, const char* table_name_arg,
3199
List<Item>& fields_arg, enum enum_duplicates handle_dup,
3201
unsigned char* block_arg, uint32_t block_len_arg, bool using_trans)
3202
:Load_log_event(session_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
3204
fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
3205
file_id(session_arg->file_id = drizzle_bin_log.next_file_id())
3207
sql_ex.force_new_format();
3213
Create_file_log_event::write_data_body()
3216
bool Create_file_log_event::write_data_body(IO_CACHE* file)
3219
if ((res= Load_log_event::write_data_body(file)) || fake_base)
3221
return (my_b_safe_write(file, (unsigned char*) "", 1) ||
3222
my_b_safe_write(file, (unsigned char*) block, block_len));
3227
Create_file_log_event::write_data_header()
3230
bool Create_file_log_event::write_data_header(IO_CACHE* file)
3233
unsigned char buf[CREATE_FILE_HEADER_LEN];
3234
if ((res= Load_log_event::write_data_header(file)) || fake_base)
3236
int4store(buf + CF_FILE_ID_OFFSET, file_id);
3237
return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
3242
Create_file_log_event::write_base()
3245
bool Create_file_log_event::write_base(IO_CACHE* file)
3248
fake_base= 1; // pretend we are Load event
3255
Create_file_log_event ctor
3258
Create_file_log_event::Create_file_log_event(const char* buf, uint32_t len,
3259
const Format_description_log_event* description_event)
3260
:Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0)
3262
uint32_t block_offset;
3263
uint32_t header_len= description_event->common_header_len;
3264
uint8_t load_header_len= description_event->post_header_len[LOAD_EVENT-1];
3265
uint8_t create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
3266
if (!(event_buf= (const char*)malloc(len)) ||
3267
memcpy((char *)event_buf, buf, len) ||
3268
copy_log_event(event_buf,len,
3269
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
3270
load_header_len + header_len :
3271
(fake_base ? (header_len+load_header_len) :
3272
(header_len+load_header_len) +
3273
create_file_header_len)),
3276
if (description_event->binlog_version!=1)
3278
file_id= uint4korr(buf +
3280
load_header_len + CF_FILE_ID_OFFSET);
3282
Note that it's ok to use get_data_size() below, because it is computed
3283
with values we have already read from this event (because we called
3284
copy_log_event()); we are not using slave's format info to decode
3285
master's format, we are really using master's format info.
3286
Anyway, both formats should be identical (except the common_header_len)
3287
as these Load events are not changed between 4.0 and 5.0 (as logging of
3288
LOAD DATA INFILE does not use Load_log_event in 5.0).
3290
The + 1 is for \0 terminating fname
3292
block_offset= (description_event->common_header_len +
3293
Load_log_event::get_data_size() +
3294
create_file_header_len + 1);
3295
if (len < block_offset)
3297
block = (unsigned char*)buf + block_offset;
3298
block_len = len - block_offset;
3302
sql_ex.force_new_format();
3303
inited_from_old = 1;
3310
Create_file_log_event::pack_info()
3313
void Create_file_log_event::pack_info(Protocol *protocol)
3315
char buf[NAME_LEN*2 + 30 + 21*2], *pos;
3316
pos= strcpy(buf, "db=")+3;
3317
memcpy(pos, db, db_len);
3318
pos= strcpy(pos + db_len, ";table=")+7;
3319
memcpy(pos, table_name, table_name_len);
3320
pos= strcpy(pos + table_name_len, ";file_id=")+9;
3321
pos= int10_to_str((long) file_id, pos, 10);
3322
pos= strcpy(pos, ";block_len=")+11;
3323
pos= int10_to_str((long) block_len, pos, 10);
3324
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3329
Create_file_log_event::do_apply_event()
3332
int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
3334
char proc_info[17+FN_REFLEN+10], *fname_buf;
3340
memset(&file, 0, sizeof(file));
3341
fname_buf= strcpy(proc_info, "Making temp file ")+17;
3342
ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
3343
session->set_proc_info(proc_info);
3344
my_delete(fname_buf, MYF(0)); // old copy may exist already
3345
if ((fd= my_create(fname_buf, CREATE_MODE,
3347
MYF(MY_WME))) < 0 ||
3348
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
3349
MYF(MY_WME|MY_NABP)))
3351
rli->report(ERROR_LEVEL, my_errno,
3352
_("Error in Create_file event: could not open file '%s'"),
3357
// a trick to avoid allocating another buffer
3359
fname_len= (uint) ((strcpy(ext, ".data") + 5) - fname);
3360
if (write_base(&file))
3362
strcpy(ext, ".info"); // to have it right in the error message
3363
rli->report(ERROR_LEVEL, my_errno,
3364
_("Error in Create_file event: could not write to file '%s'"),
3368
end_io_cache(&file);
3369
my_close(fd, MYF(0));
3371
// fname_buf now already has .data, not .info, because we did our trick
3372
my_delete(fname_buf, MYF(0)); // old copy may exist already
3373
if ((fd= my_create(fname_buf, CREATE_MODE,
3377
rli->report(ERROR_LEVEL, my_errno,
3378
_("Error in Create_file event: could not open file '%s'"),
3382
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
3384
rli->report(ERROR_LEVEL, my_errno,
3385
_("Error in Create_file event: write to '%s' failed"),
3389
error=0; // Everything is ok
3393
end_io_cache(&file);
3395
my_close(fd, MYF(0));
3396
session->set_proc_info(0);
3401
/**************************************************************************
3402
Append_block_log_event methods
3403
**************************************************************************/
3406
Append_block_log_event ctor
3409
Append_block_log_event::Append_block_log_event(Session *session_arg,
3411
unsigned char *block_arg,
3412
uint32_t block_len_arg,
3414
:Log_event(session_arg,0, using_trans), block(block_arg),
3415
block_len(block_len_arg), file_id(session_arg->file_id), db(db_arg)
3421
Append_block_log_event ctor
3424
Append_block_log_event::Append_block_log_event(const char* buf, uint32_t len,
3425
const Format_description_log_event* description_event)
3426
:Log_event(buf, description_event),block(0)
3428
uint8_t common_header_len= description_event->common_header_len;
3429
uint8_t append_block_header_len=
3430
description_event->post_header_len[APPEND_BLOCK_EVENT-1];
3431
uint32_t total_header_len= common_header_len+append_block_header_len;
3432
if (len < total_header_len)
3434
file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
3435
block= (unsigned char*)buf + total_header_len;
3436
block_len= len - total_header_len;
3442
Append_block_log_event::write()
3445
bool Append_block_log_event::write(IO_CACHE* file)
3447
unsigned char buf[APPEND_BLOCK_HEADER_LEN];
3448
int4store(buf + AB_FILE_ID_OFFSET, file_id);
3449
return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
3450
my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
3451
my_b_safe_write(file, (unsigned char*) block, block_len));
3456
Append_block_log_event::pack_info()
3459
void Append_block_log_event::pack_info(Protocol *protocol)
3463
length= (uint) sprintf(buf, ";file_id=%u;block_len=%u", file_id,
3465
protocol->store(buf, length, &my_charset_bin);
3470
Append_block_log_event::get_create_or_append()
3473
int Append_block_log_event::get_create_or_append() const
3475
return 0; /* append to the file, fail if not exists */
3479
Append_block_log_event::do_apply_event()
3482
int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
3484
char proc_info[17+FN_REFLEN+10], *fname= proc_info+17;
3488
fname= strcpy(proc_info, "Making temp file ")+17;
3489
slave_load_file_stem(fname, file_id, server_id, ".data");
3490
session->set_proc_info(proc_info);
3491
if (get_create_or_append())
3493
my_delete(fname, MYF(0)); // old copy may exist already
3494
if ((fd= my_create(fname, CREATE_MODE,
3498
rli->report(ERROR_LEVEL, my_errno,
3499
_("Error in %s event: could not create file '%s'"),
3500
get_type_str(), fname);
3504
else if ((fd = my_open(fname, O_WRONLY | O_APPEND,
3507
rli->report(ERROR_LEVEL, my_errno,
3508
_("Error in %s event: could not open file '%s'"),
3509
get_type_str(), fname);
3512
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
3514
rli->report(ERROR_LEVEL, my_errno,
3515
_("Error in %s event: write to '%s' failed"),
3516
get_type_str(), fname);
3523
my_close(fd, MYF(0));
3524
session->set_proc_info(0);
3529
/**************************************************************************
3530
Delete_file_log_event methods
3531
**************************************************************************/
3534
Delete_file_log_event ctor
3537
Delete_file_log_event::Delete_file_log_event(Session *session_arg, const char* db_arg,
3539
:Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
3544
Delete_file_log_event ctor
3547
Delete_file_log_event::Delete_file_log_event(const char* buf, uint32_t len,
3548
const Format_description_log_event* description_event)
3549
:Log_event(buf, description_event),file_id(0)
3551
uint8_t common_header_len= description_event->common_header_len;
3552
uint8_t delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1];
3553
if (len < (uint)(common_header_len + delete_file_header_len))
3555
file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET);
3560
Delete_file_log_event::write()
3563
bool Delete_file_log_event::write(IO_CACHE* file)
3565
unsigned char buf[DELETE_FILE_HEADER_LEN];
3566
int4store(buf + DF_FILE_ID_OFFSET, file_id);
3567
return (write_header(file, sizeof(buf)) ||
3568
my_b_safe_write(file, buf, sizeof(buf)));
3573
Delete_file_log_event::pack_info()
3576
void Delete_file_log_event::pack_info(Protocol *protocol)
3580
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
3581
protocol->store(buf, (int32_t) length, &my_charset_bin);
3585
Delete_file_log_event::do_apply_event()
3588
int Delete_file_log_event::do_apply_event(const Relay_log_info *)
3590
char fname[FN_REFLEN+10];
3591
char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
3592
(void) my_delete(fname, MYF(MY_WME));
3593
strcpy(ext, ".info");
3594
(void) my_delete(fname, MYF(MY_WME));
3599
/**************************************************************************
3600
Execute_load_log_event methods
3601
**************************************************************************/
3604
Execute_load_log_event ctor
3607
Execute_load_log_event::Execute_load_log_event(Session *session_arg,
3610
:Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
3616
Execute_load_log_event ctor
3619
Execute_load_log_event::Execute_load_log_event(const char* buf, uint32_t len,
3620
const Format_description_log_event* description_event)
3621
:Log_event(buf, description_event), file_id(0)
3623
uint8_t common_header_len= description_event->common_header_len;
3624
uint8_t exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1];
3625
if (len < (uint)(common_header_len+exec_load_header_len))
3627
file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET);
3632
Execute_load_log_event::write()
3635
bool Execute_load_log_event::write(IO_CACHE* file)
3637
unsigned char buf[EXEC_LOAD_HEADER_LEN];
3638
int4store(buf + EL_FILE_ID_OFFSET, file_id);
3639
return (write_header(file, sizeof(buf)) ||
3640
my_b_safe_write(file, buf, sizeof(buf)));
3645
Execute_load_log_event::pack_info()
3648
void Execute_load_log_event::pack_info(Protocol *protocol)
3652
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
3653
protocol->store(buf, (int32_t) length, &my_charset_bin);
3658
Execute_load_log_event::do_apply_event()
3661
int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
3663
char fname[FN_REFLEN+10];
3668
Load_log_event *lev= 0;
3670
ext= slave_load_file_stem(fname, file_id, server_id, ".info");
3671
if ((fd = my_open(fname, O_RDONLY,
3672
MYF(MY_WME))) < 0 ||
3673
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
3674
MYF(MY_WME|MY_NABP)))
3676
rli->report(ERROR_LEVEL, my_errno,
3677
_("Error in Exec_load event: could not open file '%s'"),
3681
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
3682
(pthread_mutex_t*)0,
3683
rli->relay_log.description_event_for_exec)) ||
3684
lev->get_type_code() != NEW_LOAD_EVENT)
3686
rli->report(ERROR_LEVEL, 0,
3687
_("Error in Exec_load event: "
3688
"file '%s' appears corrupted"),
3693
lev->session = session;
3695
lev->do_apply_event should use rli only for errors i.e. should
3696
not advance rli's position.
3698
lev->do_apply_event is the place where the table is loaded (it
3699
calls mysql_load()).
3702
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
3703
if (lev->do_apply_event(0,rli,1))
3706
We want to indicate the name of the file that could not be loaded
3708
But as we are here we are sure the error is in rli->last_slave_error and
3709
rli->last_slave_errno (example of error: duplicate entry for key), so we
3710
don't want to overwrite it with the filename.
3711
What we want instead is add the filename to the current error message.
3713
char *tmp= strdup(rli->last_error().message);
3716
rli->report(ERROR_LEVEL, rli->last_error().number,
3717
_("%s. Failed executing load from '%s'"),
3724
We have an open file descriptor to the .info file; we need to close it
3725
or Windows will refuse to delete the file in my_delete().
3729
my_close(fd, MYF(0));
3730
end_io_cache(&file);
3733
(void) my_delete(fname, MYF(MY_WME));
3734
memcpy(ext, ".data", 6);
3735
(void) my_delete(fname, MYF(MY_WME));
3742
my_close(fd, MYF(0));
3743
end_io_cache(&file);
3749
/**************************************************************************
3750
Begin_load_query_log_event methods
3751
**************************************************************************/
3753
Begin_load_query_log_event::
3754
Begin_load_query_log_event(Session* session_arg, const char* db_arg, unsigned char* block_arg,
3755
uint32_t block_len_arg, bool using_trans)
3756
:Append_block_log_event(session_arg, db_arg, block_arg, block_len_arg,
3759
file_id= session_arg->file_id= drizzle_bin_log.next_file_id();
3763
Begin_load_query_log_event::
3764
Begin_load_query_log_event(const char* buf, uint32_t len,
3765
const Format_description_log_event* desc_event)
3766
:Append_block_log_event(buf, len, desc_event)
3771
int Begin_load_query_log_event::get_create_or_append() const
3773
return 1; /* create the file */
3777
Log_event::enum_skip_reason
3778
Begin_load_query_log_event::do_shall_skip(Relay_log_info *rli)
3781
If the slave skip counter is 1, then we should not start executing
3784
return continue_group(rli);
3788
/**************************************************************************
3789
Execute_load_query_log_event methods
3790
**************************************************************************/
3793
Execute_load_query_log_event::
3794
Execute_load_query_log_event(Session *session_arg, const char* query_arg,
3795
ulong query_length_arg, uint32_t fn_pos_start_arg,
3796
uint32_t fn_pos_end_arg,
3797
enum_load_dup_handling dup_handling_arg,
3798
bool using_trans, bool suppress_use,
3799
Session::killed_state killed_err_arg):
3800
Query_log_event(session_arg, query_arg, query_length_arg, using_trans,
3801
suppress_use, killed_err_arg),
3802
file_id(session_arg->file_id), fn_pos_start(fn_pos_start_arg),
3803
fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
3808
Execute_load_query_log_event::
3809
Execute_load_query_log_event(const char* buf, uint32_t event_len,
3810
const Format_description_log_event* desc_event):
3811
Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT),
3812
file_id(0), fn_pos_start(0), fn_pos_end(0)
3814
if (!Query_log_event::is_valid())
3817
buf+= desc_event->common_header_len;
3819
fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET);
3820
fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET);
3821
dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET));
3823
if (fn_pos_start > q_len || fn_pos_end > q_len ||
3824
dup_handling > LOAD_DUP_REPLACE)
3827
file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET);
3831
ulong Execute_load_query_log_event::get_post_header_size_for_derived()
3833
return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN;
3838
Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
3840
unsigned char buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
3841
int4store(buf, file_id);
3842
int4store(buf + 4, fn_pos_start);
3843
int4store(buf + 4 + 4, fn_pos_end);
3844
*(buf + 4 + 4 + 4)= (unsigned char) dup_handling;
3845
return my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
3849
void Execute_load_query_log_event::pack_info(Protocol *protocol)
3852
if (!(buf= (char*) malloc(9 + db_len + q_len + 10 + 21)))
3857
pos= strcpy(buf, "use `")+5;
3858
memcpy(pos, db, db_len);
3859
pos= strcpy(pos+db_len, "`; ")+3;
3863
memcpy(pos, query, q_len);
3866
pos= strcpy(pos, " ;file_id=")+10;
3867
pos= int10_to_str((long) file_id, pos, 10);
3868
protocol->store(buf, pos-buf, &my_charset_bin);
3874
Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli)
3882
buf= (char*) malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
3883
(FN_REFLEN + 10) + 10 + 8 + 5);
3885
/* Replace filename and LOCAL keyword in query before executing it */
3888
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
3889
ER(ER_SLAVE_FATAL_ERROR),
3890
_("Not enough memory"));
3895
memcpy(p, query, fn_pos_start);
3897
fname= (p= strncpy(p, STRING_WITH_LEN(" INFILE \'")) + 9);
3898
p= slave_load_file_stem(p, file_id, server_id, ".data");
3899
fname_end= p= strchr(p, '\0'); // Safer than p=p+5
3901
switch (dup_handling) {
3902
case LOAD_DUP_IGNORE:
3903
p= strncpy(p, STRING_WITH_LEN(" IGNORE")) + 7;
3905
case LOAD_DUP_REPLACE:
3906
p= strncpy(p, STRING_WITH_LEN(" REPLACE")) + 8;
3909
/* Ordinary load data */
3912
size_t end_len = q_len-fn_pos_end;
3913
p= strncpy(p, STRING_WITH_LEN(" INTO")) + 5;
3914
p= strncpy(p, query+fn_pos_end, end_len);
3917
error= Query_log_event::do_apply_event(rli, buf, p-buf);
3919
/* Forging file name for deletion in same buffer */
3923
If there was an error the slave is going to stop, leave the
3924
file so that we can re-execute this event at START SLAVE.
3927
(void) my_delete(fname, MYF(MY_WME));
3934
/**************************************************************************
3936
**************************************************************************/
3939
sql_ex_info::write_data()
3942
bool sql_ex_info::write_data(IO_CACHE* file)
3946
return (write_str(file, field_term, (uint) field_term_len) ||
3947
write_str(file, enclosed, (uint) enclosed_len) ||
3948
write_str(file, line_term, (uint) line_term_len) ||
3949
write_str(file, line_start, (uint) line_start_len) ||
3950
write_str(file, escaped, (uint) escaped_len) ||
3951
my_b_safe_write(file,(unsigned char*) &opt_flags,1));
3963
const char *sql_ex_info::init(const char *buf, const char *buf_end,
3964
bool use_new_format)
3966
cached_new_format = use_new_format;
3971
The code below assumes that buf will not disappear from
3972
under our feet during the lifetime of the event. This assumption
3973
holds true in the slave thread if the log is in new format, but is not
3974
the case when we have old format because we will be reusing net buffer
3975
to read the actual file before we write out the Create_file event.
3977
if (read_str(&buf, buf_end, &field_term, &field_term_len) ||
3978
read_str(&buf, buf_end, &enclosed, &enclosed_len) ||
3979
read_str(&buf, buf_end, &line_term, &line_term_len) ||
3980
read_str(&buf, buf_end, &line_start, &line_start_len) ||
3981
read_str(&buf, buf_end, &escaped, &escaped_len))
3987
field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
3988
field_term = buf++; // Use first byte in string
3994
empty_flags= *buf++;
3995
if (empty_flags & FIELD_TERM_EMPTY)
3997
if (empty_flags & ENCLOSED_EMPTY)
3999
if (empty_flags & LINE_TERM_EMPTY)
4001
if (empty_flags & LINE_START_EMPTY)
4003
if (empty_flags & ESCAPED_EMPTY)
4010
/**************************************************************************
4011
Rows_log_event member functions
4012
**************************************************************************/
4014
Rows_log_event::Rows_log_event(Session *session_arg, Table *tbl_arg, ulong tid,
4015
MY_BITMAP const *cols, bool is_transactional)
4016
: Log_event(session_arg, 0, is_transactional),
4020
m_width(tbl_arg ? tbl_arg->s->fields : 1),
4021
m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
4022
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4025
We allow a special form of dummy event when the table, and cols
4026
are null and the table id is UINT32_MAX. This is a temporary
4027
solution, to be able to terminate a started statement in the
4028
binary log: the extraneous events will be removed in the future.
4030
assert((tbl_arg && tbl_arg->s && tid != UINT32_MAX) || (!tbl_arg && !cols && tid == UINT32_MAX));
4032
if (session_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
4033
set_flags(NO_FOREIGN_KEY_CHECKS_F);
4034
if (session_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
4035
set_flags(RELAXED_UNIQUE_CHECKS_F);
4036
/* if bitmap_init fails, caught in is_valid() */
4037
if (likely(!bitmap_init(&m_cols,
4038
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4042
/* Cols can be zero if this is a dummy binrows event */
4043
if (likely(cols != NULL))
4045
memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
4046
create_last_word_mask(&m_cols);
4051
// Needed because bitmap_init() does not set it to null on failure
4057
Rows_log_event::Rows_log_event(const char *buf, uint32_t event_len,
4058
Log_event_type event_type,
4059
const Format_description_log_event
4061
: Log_event(buf, description_event),
4064
m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
4065
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4067
uint8_t const common_header_len= description_event->common_header_len;
4068
uint8_t const post_header_len= description_event->post_header_len[event_type-1];
4070
const char *post_start= buf + common_header_len;
4071
post_start+= RW_MAPID_OFFSET;
4072
if (post_header_len == 6)
4074
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
4075
m_table_id= uint4korr(post_start);
4080
m_table_id= (ulong) uint6korr(post_start);
4081
post_start+= RW_FLAGS_OFFSET;
4084
m_flags= uint2korr(post_start);
4086
unsigned char const *const var_start=
4087
(const unsigned char *)buf + common_header_len + post_header_len;
4088
unsigned char const *const ptr_width= var_start;
4089
unsigned char *ptr_after_width= (unsigned char*) ptr_width;
4090
m_width = net_field_length(&ptr_after_width);
4091
/* if bitmap_init fails, catched in is_valid() */
4092
if (likely(!bitmap_init(&m_cols,
4093
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4097
memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
4098
create_last_word_mask(&m_cols);
4099
ptr_after_width+= (m_width + 7) / 8;
4103
// Needed because bitmap_init() does not set it to null on failure
4104
m_cols.bitmap= NULL;
4108
m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
4110
if (event_type == UPDATE_ROWS_EVENT)
4112
/* if bitmap_init fails, caught in is_valid() */
4113
if (likely(!bitmap_init(&m_cols_ai,
4114
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
4118
memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
4119
create_last_word_mask(&m_cols_ai);
4120
ptr_after_width+= (m_width + 7) / 8;
4124
// Needed because bitmap_init() does not set it to null on failure
4125
m_cols_ai.bitmap= 0;
4130
const unsigned char* const ptr_rows_data= (const unsigned char*) ptr_after_width;
4132
size_t const data_size= event_len - (ptr_rows_data - (const unsigned char *) buf);
4134
m_rows_buf= (unsigned char*) malloc(data_size);
4135
if (likely((bool)m_rows_buf))
4137
m_curr_row= m_rows_buf;
4138
m_rows_end= m_rows_buf + data_size;
4139
m_rows_cur= m_rows_end;
4140
memcpy(m_rows_buf, ptr_rows_data, data_size);
4143
m_cols.bitmap= 0; // to not free it
4148
Rows_log_event::~Rows_log_event()
4150
if (m_cols.bitmap == m_bitbuf) // no malloc happened
4151
m_cols.bitmap= 0; // so no free in bitmap_free
4152
bitmap_free(&m_cols); // To pair with bitmap_init().
4153
free((unsigned char*)m_rows_buf);
4156
int Rows_log_event::get_data_size()
4158
int const type_code= get_type_code();
4160
unsigned char buf[sizeof(m_width)+1];
4161
unsigned char *end= net_store_length(buf, (m_width + 7) / 8);
4163
int data_size= ROWS_HEADER_LEN;
4164
data_size+= no_bytes_in_map(&m_cols);
4165
data_size+= end - buf;
4167
if (type_code == UPDATE_ROWS_EVENT)
4168
data_size+= no_bytes_in_map(&m_cols_ai);
4170
data_size+= (m_rows_cur - m_rows_buf);
4175
int Rows_log_event::do_add_row_data(unsigned char *row_data, size_t length)
4178
When the table has a primary key, we would probably want, by default, to
4179
log only the primary key value instead of the entire "before image". This
4180
would save binlog space. TODO
4184
If length is zero, there is nothing to write, so we just
4185
return. Note that this is not an optimization, since calling
4186
realloc() with size 0 means free().
4194
assert(m_rows_buf <= m_rows_cur);
4195
assert(!m_rows_buf || (m_rows_end && m_rows_buf <= m_rows_end));
4196
assert(m_rows_cur <= m_rows_end);
4198
/* The cast will always work since m_rows_cur <= m_rows_end */
4199
if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
4201
size_t const block_size= 1024;
4202
const size_t cur_size= m_rows_cur - m_rows_buf;
4203
const size_t new_alloc=
4204
block_size * ((cur_size + length + block_size - 1) / block_size);
4206
unsigned char* const new_buf=
4207
(m_rows_buf) ? (unsigned char*)realloc(m_rows_buf, new_alloc)
4208
: (unsigned char*)malloc(new_alloc);
4209
if (unlikely(!new_buf))
4210
return(HA_ERR_OUT_OF_MEM);
4212
/* If the memory moved, we need to move the pointers */
4213
if (new_buf != m_rows_buf)
4215
m_rows_buf= new_buf;
4216
m_rows_cur= m_rows_buf + cur_size;
4220
The end pointer should always be changed to point to the end of
4221
the allocated memory.
4223
m_rows_end= m_rows_buf + new_alloc;
4226
assert(m_rows_cur + length <= m_rows_end);
4227
memcpy(m_rows_cur, row_data, length);
4228
m_rows_cur+= length;
4233
int Rows_log_event::do_apply_event(Relay_log_info const *rli)
4237
If m_table_id == UINT32_MAX, then we have a dummy event that does not
4238
contain any data. In that case, we just remove all tables in the
4239
tables_to_lock list, close the thread tables, and return with
4242
if (m_table_id == UINT32_MAX)
4245
This one is supposed to be set: just an extra check so that
4246
nothing strange has happened.
4248
assert(get_flags(STMT_END_F));
4250
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4251
close_thread_tables(session);
4252
session->clear_error();
4257
'session' has been set by exec_relay_log_event(), just before calling
4258
do_apply_event(). We still check here to prevent future coding
4261
assert(rli->sql_session == session);
4264
If there is no locks taken, this is the first binrow event seen
4265
after the table map events. We should then lock all the tables
4266
used in the transaction and proceed with execution of the actual
4271
bool need_reopen= 1; /* To execute the first lap of the loop below */
4274
lock_tables() reads the contents of session->lex, so they must be
4275
initialized. Contrary to in
4276
Table_map_log_event::do_apply_event() we don't call
4277
mysql_init_query() as that may reset the binlog format.
4282
There are a few flags that are replicated with each row event.
4283
Make sure to set/clear them before executing the main body of
4286
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
4287
session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
4289
session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
4291
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
4292
session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
4294
session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
4295
/* A small test to verify that objects have consistent types */
4296
assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
4299
while ((error= lock_tables(session, rli->tables_to_lock,
4300
rli->tables_to_lock_count, &need_reopen)))
4304
if (session->is_slave_error || session->is_fatal_error)
4307
Error reporting borrowed from Query_log_event with many excessive
4308
simplifications (we don't honour --slave-skip-errors)
4310
uint32_t actual_error= session->main_da.sql_errno();
4311
rli->report(ERROR_LEVEL, actual_error,
4312
_("Error '%s' in %s event: when locking tables"),
4314
? session->main_da.message()
4315
: _("unexpected success or fatal error")),
4317
session->is_fatal_error= 1;
4321
rli->report(ERROR_LEVEL, error,
4322
_("Error in %s event: when locking tables"),
4325
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4330
So we need to reopen the tables.
4332
We need to flush the pending RBR event, since it keeps a
4333
pointer to an open table.
4335
ALTERNATIVE SOLUTION (not implemented): Extract a pointer to
4336
the pending RBR event and reset the table pointer after the
4337
tables has been reopened.
4339
NOTE: For this new scheme there should be no pending event:
4340
need to add code to assert that is the case.
4342
session->binlog_flush_pending_rows_event(false);
4343
TableList *tables= rli->tables_to_lock;
4344
close_tables_for_reopen(session, &tables);
4346
uint32_t tables_count= rli->tables_to_lock_count;
4347
if ((error= open_tables(session, &tables, &tables_count, 0)))
4349
if (session->is_slave_error || session->is_fatal_error)
4352
Error reporting borrowed from Query_log_event with many excessive
4353
simplifications (we don't honour --slave-skip-errors)
4355
uint32_t actual_error= session->main_da.sql_errno();
4356
rli->report(ERROR_LEVEL, actual_error,
4357
_("Error '%s' on reopening tables"),
4359
? session->main_da.message()
4360
: _("unexpected success or fatal error")));
4361
session->is_slave_error= 1;
4363
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4369
When the open and locking succeeded, we check all tables to
4370
ensure that they still have the correct type.
4372
We can use a down cast here since we know that every table added
4373
to the tables_to_lock is a RPL_TableList.
4377
RPL_TableList *ptr= rli->tables_to_lock;
4378
for ( ; ptr ; ptr= static_cast<RPL_TableList*>(ptr->next_global))
4380
if (ptr->m_tabledef.compatible_with(rli, ptr->table))
4382
mysql_unlock_tables(session, session->lock);
4384
session->is_slave_error= 1;
4385
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4386
return(ERR_BAD_TABLE_DEF);
4392
... and then we add all the tables to the table map and remove
4393
them from tables to lock.
4395
We also invalidate the query cache for all the tables, since
4396
they will now be changed.
4398
TODO [/Matz]: Maybe the query cache should not be invalidated
4399
here? It might be that a table is not changed, even though it
4400
was locked for the statement. We do know that each
4401
Rows_log_event contain at least one row, so after processing one
4402
Rows_log_event, we can invalidate the query cache for the
4405
for (TableList *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
4407
const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
4413
m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
4418
table == NULL means that this table should not be replicated
4419
(this was set up by Table_map_log_event::do_apply_event()
4420
which tested replicate-* rules).
4424
It's not needed to set_time() but
4425
1) it continues the property that "Time" in SHOW PROCESSLIST shows how
4426
much slave is behind
4427
2) it will be needed when we allow replication from a table with no
4428
TIMESTAMP column to a table with one.
4429
So we call set_time(), like in SBR. Presently it changes nothing.
4431
session->set_time((time_t)when);
4433
There are a few flags that are replicated with each row event.
4434
Make sure to set/clear them before executing the main body of
4437
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
4438
session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
4440
session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
4442
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
4443
session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
4445
session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
4447
if (slave_allow_batching)
4448
session->options|= OPTION_ALLOW_BATCH;
4450
session->options&= ~OPTION_ALLOW_BATCH;
4452
/* A small test to verify that objects have consistent types */
4453
assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
4456
Now we are in a statement and will stay in a statement until we
4459
We set this flag here, before actually applying any rows, in
4460
case the SQL thread is stopped and we need to detect that we're
4461
inside a statement and halting abruptly might cause problems
4464
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
4466
if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
4467
set_flags(COMPLETE_ROWS_F);
4470
Set tables write and read sets.
4472
Read_set contains all slave columns (in case we are going to fetch
4473
a complete record from slave)
4475
Write_set equals the m_cols bitmap sent from master but it can be
4476
longer if slave has extra columns.
4479
bitmap_set_all(table->read_set);
4480
bitmap_set_all(table->write_set);
4481
if (!get_flags(COMPLETE_ROWS_F))
4482
bitmap_intersect(table->write_set,&m_cols);
4484
this->slave_exec_mode= slave_exec_mode_options; // fix the mode
4486
// Do event specific preparations
4487
error= do_before_row_operations(rli);
4489
// row processing loop
4491
while (error == 0 && m_curr_row < m_rows_end)
4493
/* in_use can have been set to NULL in close_tables_for_reopen */
4494
Session* old_session= table->in_use;
4496
table->in_use= session;
4498
error= do_exec_row(rli);
4500
table->in_use = old_session;
4506
The following list of "idempotent" errors
4507
means that an error from the list might happen
4508
because of idempotent (more than once)
4509
applying of a binlog file.
4510
Notice, that binlog has a ddl operation its
4511
second applying may cause
4513
case HA_ERR_TABLE_DEF_CHANGED:
4514
case HA_ERR_CANNOT_ADD_FOREIGN:
4516
which are not included into to the list.
4518
case HA_ERR_RECORD_CHANGED:
4519
case HA_ERR_RECORD_DELETED:
4520
case HA_ERR_KEY_NOT_FOUND:
4521
case HA_ERR_END_OF_FILE:
4522
case HA_ERR_FOUND_DUPP_KEY:
4523
case HA_ERR_FOUND_DUPP_UNIQUE:
4524
case HA_ERR_FOREIGN_DUPLICATE_KEY:
4525
case HA_ERR_NO_REFERENCED_ROW:
4526
case HA_ERR_ROW_IS_REFERENCED:
4527
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
4529
if (global_system_variables.log_warnings)
4530
slave_rows_error_report(WARNING_LEVEL, error, rli, session, table,
4532
RPL_LOG_NAME, (ulong) log_pos);
4538
session->is_slave_error= 1;
4543
If m_curr_row_end was not set during event execution (e.g., because
4544
of errors) we can't proceed to the next row. If the error is transient
4545
(i.e., error==0 at this point) we must call unpack_current_row() to set
4548
if (!m_curr_row_end && !error)
4549
unpack_current_row(rli, &m_cols);
4551
// at this moment m_curr_row_end should be set
4552
assert(error || m_curr_row_end != NULL);
4553
assert(error || m_curr_row < m_curr_row_end);
4554
assert(error || m_curr_row_end <= m_rows_end);
4556
m_curr_row= m_curr_row_end;
4558
} // row processing loop
4560
error= do_after_row_operations(rli, error);
4563
session->options|= OPTION_KEEP_LOG;
4568
We need to delay this clear until here bacause unpack_current_row() uses
4569
master-side table definitions stored in rli.
4571
if (rli->tables_to_lock && get_flags(STMT_END_F))
4572
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4573
/* reset OPTION_ALLOW_BATCH as not affect later events */
4574
session->options&= ~OPTION_ALLOW_BATCH;
4577
{ /* error has occured during the transaction */
4578
slave_rows_error_report(ERROR_LEVEL, error, rli, session, table,
4579
get_type_str(), RPL_LOG_NAME, (ulong) log_pos);
4584
If one day we honour --skip-slave-errors in row-based replication, and
4585
the error should be skipped, then we would clear mappings, rollback,
4586
close tables, but the slave SQL thread would not stop and then may
4587
assume the mapping is still available, the tables are still open...
4588
So then we should clear mappings/rollback/close here only if this is a
4590
For now we code, knowing that error is not skippable and so slave SQL
4591
thread is certainly going to stop.
4592
rollback at the caller along with sbr.
4594
const_cast<Relay_log_info*>(rli)->cleanup_context(session, error);
4595
session->is_slave_error= 1;
4600
This code would ideally be placed in do_update_pos() instead, but
4601
since we have no access to table there, we do the setting of
4602
last_event_start_time here instead.
4604
if (table && (table->s->primary_key == MAX_KEY) &&
4605
!cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS)
4608
------------ Temporary fix until WL#2975 is implemented ---------
4610
This event is not the last one (no STMT_END_F). If we stop now
4611
(in case of terminate_slave_thread()), how will we restart? We
4612
have to restart from Table_map_log_event, but as this table is
4613
not transactional, the rows already inserted will still be
4614
present, and idempotency is not guaranteed (no PK) so we risk
4615
that repeating leads to double insert. So we desperately try to
4616
continue, hope we'll eventually leave this buggy situation (by
4617
executing the final Rows_log_event). If we are in a hopeless
4618
wait (reached end of last relay log and nothing gets appended
4619
there), we timeout after one minute, and notify DBA about the
4620
problem. When WL#2975 is implemented, just remove the member
4621
Relay_log_info::last_event_start_time and all its occurrences.
4623
const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
4629
Log_event::enum_skip_reason
4630
Rows_log_event::do_shall_skip(Relay_log_info *rli)
4633
If the slave skip counter is 1 and this event does not end a
4634
statement, then we should not start executing on the next event.
4635
Otherwise, we defer the decision to the normal skipping logic.
4637
if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
4638
return Log_event::EVENT_SKIP_IGNORE;
4640
return Log_event::do_shall_skip(rli);
4644
Rows_log_event::do_update_pos(Relay_log_info *rli)
4648
if (get_flags(STMT_END_F))
4651
This is the end of a statement or transaction, so close (and
4652
unlock) the tables we opened when processing the
4653
Table_map_log_event starting the statement.
4655
OBSERVER. This will clear *all* mappings, not only those that
4656
are open for the table. There is not good handle for on-close
4659
NOTE. Even if we have no table ('table' == 0) we still need to be
4660
here, so that we increase the group relay log position. If we didn't, we
4661
could have a group relay log position which lags behind "forever"
4662
(assume the last master's transaction is ignored by the slave because of
4663
replicate-ignore rules).
4665
session->binlog_flush_pending_rows_event(true);
4668
If this event is not in a transaction, the call below will, if some
4669
transactional storage engines are involved, commit the statement into
4670
them and flush the pending event to binlog.
4671
If this event is in a transaction, the call will do nothing, but a
4672
Xid_log_event will come next which will, if some transactional engines
4673
are involved, commit the transaction and flush the pending event to the
4676
error= ha_autocommit_or_rollback(session, 0);
4679
Now what if this is not a transactional engine? we still need to
4680
flush the pending event to the binlog; we did it with
4681
session->binlog_flush_pending_rows_event(). Note that we imitate
4682
what is done for real queries: a call to
4683
ha_autocommit_or_rollback() (sometimes only if involves a
4684
transactional engine), and a call to be sure to have the pending
4688
rli->cleanup_context(session, 0);
4692
Indicate that a statement is finished.
4693
Step the group log position if we are not in a transaction,
4694
otherwise increase the event log position.
4696
rli->stmt_done(log_pos, when);
4699
Clear any errors pushed in session->net.last_err* if for example "no key
4700
found" (as this is allowed). This is a safety measure; apparently
4701
those errors (e.g. when executing a Delete_rows_log_event of a
4702
non-existing row, like in rpl_row_mystery22.test,
4703
session->net.last_error = "Can't find record in 't1'" and last_errno=1032)
4704
do not become visible. We still prefer to wipe them out.
4706
session->clear_error();
4709
rli->report(ERROR_LEVEL, error,
4710
_("Error in %s event: commit of row events failed, "
4712
get_type_str(), m_table->s->db.str,
4713
m_table->s->table_name.str);
4717
rli->inc_event_relay_log_pos();
4723
bool Rows_log_event::write_data_header(IO_CACHE *file)
4725
unsigned char buf[ROWS_HEADER_LEN]; // No need to init the buffer
4726
assert(m_table_id != UINT32_MAX);
4727
int6store(buf + RW_MAPID_OFFSET, (uint64_t)m_table_id);
4728
int2store(buf + RW_FLAGS_OFFSET, m_flags);
4729
return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
4732
bool Rows_log_event::write_data_body(IO_CACHE*file)
4735
Note that this should be the number of *bits*, not the number of
4738
unsigned char sbuf[sizeof(m_width)];
4739
my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
4741
unsigned char *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
4742
assert(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
4744
res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
4746
res= res || my_b_safe_write(file, (unsigned char*) m_cols.bitmap,
4747
no_bytes_in_map(&m_cols));
4749
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
4751
if (get_type_code() == UPDATE_ROWS_EVENT)
4753
res= res || my_b_safe_write(file, (unsigned char*) m_cols_ai.bitmap,
4754
no_bytes_in_map(&m_cols_ai));
4756
res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
4763
void Rows_log_event::pack_info(Protocol *protocol)
4766
char const *const flagstr=
4767
get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
4768
size_t bytes= snprintf(buf, sizeof(buf),
4769
"table_id: %lu%s", m_table_id, flagstr);
4770
protocol->store(buf, bytes, &my_charset_bin);
4774
/**************************************************************************
4775
Table_map_log_event member functions and support functions
4776
**************************************************************************/
4779
@page How replication of field metadata works.
4781
When a table map is created, the master first calls
4782
Table_map_log_event::save_field_metadata() which calculates how many
4783
values will be in the field metadata. Only those fields that require the
4784
extra data are added. The method also loops through all of the fields in
4785
the table calling the method Field::save_field_metadata() which returns the
4786
values for the field that will be saved in the metadata and replicated to
4787
the slave. Once all fields have been processed, the table map is written to
4788
the binlog adding the size of the field metadata and the field metadata to
4789
the end of the body of the table map.
4791
When a table map is read on the slave, the field metadata is read from the
4792
table map and passed to the table_def class constructor which saves the
4793
field metadata from the table map into an array based on the type of the
4794
field. Field metadata values not present (those fields that do not use extra
4795
data) in the table map are initialized as zero (0). The array size is the
4796
same as the columns for the table on the slave.
4798
Additionally, values saved for field metadata on the master are saved as a
4799
string of bytes (unsigned char) in the binlog. A field may require 1 or more bytes
4800
to store the information. In cases where values require multiple bytes
4801
(e.g. values > 255), the endian-safe methods are used to properly encode
4802
the values on the master and decode them on the slave. When the field
4803
metadata values are captured on the slave, they are stored in an array of
4804
type uint16_t. This allows the least number of casts to prevent casting bugs
4805
when the field metadata is used in comparisons of field attributes. When
4806
the field metadata is used for calculating addresses in pointer math, the
4807
type used is uint32_t.
4811
Save the field metadata based on the real_type of the field.
4812
The metadata saved depends on the type of the field. Some fields
4813
store a single byte for pack_length() while others store two bytes
4814
for field_length (max length).
4819
We may want to consider changing the encoding of the information.
4820
Currently, the code attempts to minimize the number of bytes written to
4821
the tablemap. There are at least two other alternatives; 1) using
4822
net_store_length() to store the data allowing it to choose the number of
4823
bytes that are appropriate thereby making the code much easier to
4824
maintain (only 1 place to change the encoding), or 2) use a fixed number
4825
of bytes for each field. The problem with option 1 is that net_store_length()
4826
will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
4827
for fields like CHAR which can be no larger than 255 characters, the method
4828
will use 3 bytes when the value is > 250. Further, every value that is
4829
encoded using 2 parts (e.g., pack_length, field_length) will be numerically
4830
> 250 therefore will use 3 bytes for eah value. The problem with option 2
4831
is less wasteful for space but does waste 1 byte for every field that does
4834
int Table_map_log_event::save_field_metadata()
4837
for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
4838
index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
4843
Constructor used to build an event for writing to the binary log.
4844
Mats says tbl->s lives longer than this event so it's ok to copy pointers
4845
(tbl->s->db etc) and not pointer content.
4847
Table_map_log_event::Table_map_log_event(Session *session, Table *tbl,
4848
ulong tid, bool, uint16_t flags)
4849
: Log_event(session, 0, true),
4851
m_dbnam(tbl->s->db.str),
4852
m_dblen(m_dbnam ? tbl->s->db.length : 0),
4853
m_tblnam(tbl->s->table_name.str),
4854
m_tbllen(tbl->s->table_name.length),
4855
m_colcnt(tbl->s->fields),
4860
m_field_metadata(0),
4861
m_field_metadata_size(0),
4865
assert(m_table_id != UINT32_MAX);
4867
In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
4868
table.cc / alloc_table_share():
4869
Use the fact the key is db/0/table_name/0
4870
As we rely on this let's assert it.
4872
assert((tbl->s->db.str == 0) ||
4873
(tbl->s->db.str[tbl->s->db.length] == 0));
4874
assert(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
4877
m_data_size= TABLE_MAP_HEADER_LEN;
4878
m_data_size+= m_dblen + 2; // Include length and terminating \0
4879
m_data_size+= m_tbllen + 2; // Include length and terminating \0
4880
m_data_size+= 1 + m_colcnt; // COLCNT and column types
4882
/* If malloc fails, caught in is_valid() */
4883
if ((m_memory= (unsigned char*) malloc(m_colcnt)))
4885
m_coltype= reinterpret_cast<unsigned char*>(m_memory);
4886
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
4887
m_coltype[i]= m_table->field[i]->type();
4891
Calculate a bitmap for the results of maybe_null() for all columns.
4892
The bitmap is used to determine when there is a column from the master
4893
that is not on the slave and is null and thus not in the row data during
4896
uint32_t num_null_bytes= (m_table->s->fields + 7) / 8;
4897
m_data_size+= num_null_bytes;
4898
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
4899
&m_null_bits, num_null_bytes,
4900
&m_field_metadata, (m_colcnt * 2),
4903
memset(m_field_metadata, 0, (m_colcnt * 2));
4906
Create an array for the field metadata and store it.
4908
m_field_metadata_size= save_field_metadata();
4909
assert(m_field_metadata_size <= (m_colcnt * 2));
4912
Now set the size of the data to the size of the field metadata array
4913
plus one or two bytes for number of elements in the field metadata array.
4915
if (m_field_metadata_size > 255)
4916
m_data_size+= m_field_metadata_size + 2;
4918
m_data_size+= m_field_metadata_size + 1;
4920
memset(m_null_bits, 0, num_null_bytes);
4921
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
4922
if (m_table->field[i]->maybe_null())
4923
m_null_bits[(i / 8)]+= 1 << (i % 8);
4929
Constructor used by slave to read the event from the binary log.
4931
Table_map_log_event::Table_map_log_event(const char *buf, uint32_t event_len,
4932
const Format_description_log_event
4935
: Log_event(buf, description_event),
4937
m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
4938
m_colcnt(0), m_coltype(0),
4939
m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
4940
m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
4941
m_null_bits(0), m_meta_memory(NULL)
4943
unsigned int bytes_read= 0;
4945
uint8_t common_header_len= description_event->common_header_len;
4946
uint8_t post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
4948
/* Read the post-header */
4949
const char *post_start= buf + common_header_len;
4951
post_start+= TM_MAPID_OFFSET;
4952
if (post_header_len == 6)
4954
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
4955
m_table_id= uint4korr(post_start);
4960
assert(post_header_len == TABLE_MAP_HEADER_LEN);
4961
m_table_id= (ulong) uint6korr(post_start);
4962
post_start+= TM_FLAGS_OFFSET;
4965
assert(m_table_id != UINT32_MAX);
4967
m_flags= uint2korr(post_start);
4969
/* Read the variable part of the event */
4970
const char *const vpart= buf + common_header_len + post_header_len;
4972
/* Extract the length of the various parts from the buffer */
4973
unsigned char const *const ptr_dblen= (unsigned char const*)vpart + 0;
4974
m_dblen= *(unsigned char*) ptr_dblen;
4976
/* Length of database name + counter + terminating null */
4977
unsigned char const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
4978
m_tbllen= *(unsigned char*) ptr_tbllen;
4980
/* Length of table name + counter + terminating null */
4981
unsigned char const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
4982
unsigned char *ptr_after_colcnt= (unsigned char*) ptr_colcnt;
4983
m_colcnt= net_field_length(&ptr_after_colcnt);
4985
/* Allocate mem for all fields in one go. If fails, caught in is_valid() */
4986
m_memory= (unsigned char*) my_multi_malloc(MYF(MY_WME),
4987
&m_dbnam, (uint) m_dblen + 1,
4988
&m_tblnam, (uint) m_tbllen + 1,
4989
&m_coltype, (uint) m_colcnt,
4994
/* Copy the different parts into their memory */
4995
strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
4996
strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
4997
memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
4999
ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
5000
bytes_read= ptr_after_colcnt - (unsigned char *)buf;
5001
if (bytes_read < event_len)
5003
m_field_metadata_size= net_field_length(&ptr_after_colcnt);
5004
assert(m_field_metadata_size <= (m_colcnt * 2));
5005
uint32_t num_null_bytes= (m_colcnt + 7) / 8;
5006
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
5007
&m_null_bits, num_null_bytes,
5008
&m_field_metadata, m_field_metadata_size,
5010
memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
5011
ptr_after_colcnt= (unsigned char*)ptr_after_colcnt + m_field_metadata_size;
5012
memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
5019
Table_map_log_event::~Table_map_log_event()
5021
free(m_meta_memory);
5026
Return value is an error code, one of:
5028
-1 Failure to open table [from open_tables()]
5030
1 No room for more tables [from set_table()]
5031
2 Out of memory [from set_table()]
5032
3 Wrong table definition
5033
4 Daisy-chaining RBR with SBR not possible
5036
int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
5038
RPL_TableList *table_list;
5039
char *db_mem, *tname_mem;
5040
Query_id &query_id= Query_id::get_query_id();
5042
assert(rli->sql_session == session);
5044
/* Step the query id to mark what columns that are actually used. */
5045
session->query_id= query_id.next();
5047
if (!(memory= my_multi_malloc(MYF(MY_WME),
5048
&table_list, (uint) sizeof(RPL_TableList),
5049
&db_mem, (uint) NAME_LEN + 1,
5050
&tname_mem, (uint) NAME_LEN + 1,
5052
return(HA_ERR_OUT_OF_MEM);
5054
memset(table_list, 0, sizeof(*table_list));
5055
table_list->db = db_mem;
5056
table_list->alias= table_list->table_name = tname_mem;
5057
table_list->lock_type= TL_WRITE;
5058
table_list->next_global= table_list->next_local= 0;
5059
table_list->table_id= m_table_id;
5060
table_list->updating= 1;
5061
strcpy(table_list->db, m_dbnam);
5062
strcpy(table_list->table_name, m_tblnam);
5068
open_tables() reads the contents of session->lex, so they must be
5069
initialized, so we should call lex_start(); to be even safer, we
5070
call mysql_init_query() which does a more complete set of inits.
5073
mysql_reset_session_for_next_command(session);
5076
Open the table if it is not already open and add the table to
5077
table map. Note that for any table that should not be
5078
replicated, a filter is needed.
5080
The creation of a new TableList is used to up-cast the
5081
table_list consisting of RPL_TableList items. This will work
5082
since the only case where the argument to open_tables() is
5083
changed, is when session->lex->query_tables == table_list, i.e.,
5084
when the statement requires prelocking. Since this is not
5085
executed when a statement is executed, this case will not occur.
5086
As a precaution, an assertion is added to ensure that the bad
5089
Either way, the memory in the list is *never* released
5090
internally in the open_tables() function, hence we take a copy
5091
of the pointer to make sure that it's not lost.
5094
assert(session->lex->query_tables != table_list);
5095
TableList *tmp_table_list= table_list;
5096
if ((error= open_tables(session, &tmp_table_list, &count, 0)))
5098
if (session->is_slave_error || session->is_fatal_error)
5101
Error reporting borrowed from Query_log_event with many excessive
5102
simplifications (we don't honour --slave-skip-errors)
5104
uint32_t actual_error= session->main_da.sql_errno();
5105
rli->report(ERROR_LEVEL, actual_error,
5106
_("Error '%s' on opening table `%s`.`%s`"),
5108
? session->main_da.message()
5109
: _("unexpected success or fatal error")),
5110
table_list->db, table_list->table_name);
5111
session->is_slave_error= 1;
5116
m_table= table_list->table;
5119
This will fail later otherwise, the 'in_use' field should be
5120
set to the current thread.
5122
assert(m_table->in_use);
5125
Use placement new to construct the table_def instance in the
5126
memory allocated for it inside table_list.
5128
The memory allocated by the table_def structure (i.e., not the
5129
memory allocated *for* the table_def structure) is released
5130
inside Relay_log_info::clear_tables_to_lock() by calling the
5131
table_def destructor explicitly.
5133
new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt,
5134
m_field_metadata, m_field_metadata_size, m_null_bits);
5135
table_list->m_tabledef_valid= true;
5138
We record in the slave's information that the table should be
5139
locked by linking the table into the list of tables to lock.
5141
table_list->next_global= table_list->next_local= rli->tables_to_lock;
5142
const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
5143
const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
5144
/* 'memory' is freed in clear_tables_to_lock */
5154
Log_event::enum_skip_reason
5155
Table_map_log_event::do_shall_skip(Relay_log_info *rli)
5158
If the slave skip counter is 1, then we should not start executing
5161
return continue_group(rli);
5164
int Table_map_log_event::do_update_pos(Relay_log_info *rli)
5166
rli->inc_event_relay_log_pos();
5171
bool Table_map_log_event::write_data_header(IO_CACHE *file)
5173
assert(m_table_id != UINT32_MAX);
5174
unsigned char buf[TABLE_MAP_HEADER_LEN];
5175
int6store(buf + TM_MAPID_OFFSET, (uint64_t)m_table_id);
5176
int2store(buf + TM_FLAGS_OFFSET, m_flags);
5177
return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
5180
bool Table_map_log_event::write_data_body(IO_CACHE *file)
5182
assert(m_dbnam != NULL);
5183
assert(m_tblnam != NULL);
5184
/* We use only one byte per length for storage in event: */
5185
assert(m_dblen < 128);
5186
assert(m_tbllen < 128);
5188
unsigned char const dbuf[]= { (unsigned char) m_dblen };
5189
unsigned char const tbuf[]= { (unsigned char) m_tbllen };
5191
unsigned char cbuf[sizeof(m_colcnt)];
5192
unsigned char *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
5193
assert(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
5196
Store the size of the field metadata.
5198
unsigned char mbuf[sizeof(m_field_metadata_size)];
5199
unsigned char *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
5201
return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
5202
my_b_safe_write(file, (const unsigned char*)m_dbnam, m_dblen+1) ||
5203
my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
5204
my_b_safe_write(file, (const unsigned char*)m_tblnam, m_tbllen+1) ||
5205
my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
5206
my_b_safe_write(file, m_coltype, m_colcnt) ||
5207
my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
5208
my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
5209
my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
5214
Print some useful information for the SHOW BINARY LOG information
5218
void Table_map_log_event::pack_info(Protocol *protocol)
5221
size_t bytes= snprintf(buf, sizeof(buf),
5222
"table_id: %lu (%s.%s)",
5223
m_table_id, m_dbnam, m_tblnam);
5224
protocol->store(buf, bytes, &my_charset_bin);
5228
/**************************************************************************
5229
Write_rows_log_event member functions
5230
**************************************************************************/
5233
Constructor used to build an event for writing to the binary log.
5235
Write_rows_log_event::Write_rows_log_event(Session *session_arg, Table *tbl_arg,
5237
bool is_transactional)
5238
: Rows_log_event(session_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
5243
Constructor used by slave to read the event from the binary log.
5245
Write_rows_log_event::Write_rows_log_event(const char *buf, uint32_t event_len,
5246
const Format_description_log_event
5248
: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
5253
Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
5258
todo: to introduce a property for the event (handler?) which forces
5259
applying the event in the replace (idempotent) fashion.
5261
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5264
We are using REPLACE semantics and not INSERT IGNORE semantics
5265
when writing rows, that is: new rows replace old rows. We need to
5266
inform the storage engine that it should use this behaviour.
5269
/* Tell the storage engine that we are using REPLACE semantics. */
5270
session->lex->duplicates= DUP_REPLACE;
5273
Pretend we're executing a REPLACE command: this is needed for
5274
InnoDB since it is not (properly) checking the
5275
lex->duplicates flag.
5277
session->lex->sql_command= SQLCOM_REPLACE;
5279
Do not raise the error flag in case of hitting to an unique attribute
5281
m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
5284
m_table->file->ha_start_bulk_insert(0);
5286
We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
5287
any TIMESTAMP column with data from the row but instead will use
5288
the event's current time.
5289
As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
5290
columns, we know that all TIMESTAMP columns on slave will receive explicit
5291
data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
5292
When we allow a table without TIMESTAMP to be replicated to a table having
5293
more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
5294
column to be replicated into a BIGINT column and the slave's table has a
5295
TIMESTAMP column, then the slave's TIMESTAMP column will take its value
5296
from set_time() which we called earlier (consistent with SBR). And then in
5297
some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
5298
analyze if explicit data is provided for slave's TIMESTAMP columns).
5300
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
5306
Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
5310
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5312
m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
5313
m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
5315
resetting the extra with
5316
table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
5318
explanation: file->reset() performs this duty
5319
ultimately. Still todo: fix
5322
if ((local_error= m_table->file->ha_end_bulk_insert()))
5324
m_table->file->print_error(local_error, MYF(0));
5326
return error? error : local_error;
5331
Check if there are more UNIQUE keys after the given key.
5334
last_uniq_key(Table *table, uint32_t keyno)
5336
while (++keyno < table->s->keys)
5337
if (table->key_info[keyno].flags & HA_NOSAME)
5343
Check if an error is a duplicate key error.
5345
This function is used to check if an error code is one of the
5346
duplicate key error, i.e., and error code for which it is sensible
5347
to do a <code>get_dup_key()</code> to retrieve the duplicate key.
5349
@param errcode The error code to check.
5351
@return <code>true</code> if the error code is such that
5352
<code>get_dup_key()</code> will return true, <code>false</code>
5356
is_duplicate_key_error(int errcode)
5360
case HA_ERR_FOUND_DUPP_KEY:
5361
case HA_ERR_FOUND_DUPP_UNIQUE:
5368
Write the current row into event's table.
5370
The row is located in the row buffer, pointed by @c m_curr_row member.
5371
Number of columns of the row is stored in @c m_width member (it can be
5372
different from the number of columns in the table to which we insert).
5373
Bitmap @c m_cols indicates which columns are present in the row. It is assumed
5374
that event's table is already open and pointed by @c m_table.
5376
If the same record already exists in the table it can be either overwritten
5377
or an error is reported depending on the value of @c overwrite flag
5378
(error reporting not yet implemented). Note that the matching record can be
5379
different from the row we insert if we use primary keys to identify records in
5382
The row to be inserted can contain values only for selected columns. The
5383
missing columns are filled with default values using @c prepare_record()
5384
function. If a matching record is found in the table and @c overwritte is
5385
true, the missing columns are taken from it.
5387
@param rli Relay log info (needed for row unpacking).
5389
Shall we overwrite if the row already exists or signal
5390
error (currently ignored).
5392
@returns Error code on failure, 0 on success.
5394
This method, if successful, sets @c m_curr_row_end pointer to point at the
5395
next row in the rows buffer. This is done when unpacking the row to be
5398
@note If a matching record is found, it is either updated using
5399
@c ha_update_row() or first deleted and then new record written.
5403
Rows_log_event::write_row(const Relay_log_info *const rli,
5404
const bool overwrite)
5406
assert(m_table != NULL && session != NULL);
5408
Table *table= m_table; // pointer to event's table
5411
basic_string<unsigned char> key;
5413
/* fill table->record[0] with default values */
5416
We only check if the columns have default values for non-NDB
5417
engines, for NDB we ignore the check since updates are sent as
5418
writes, causing errors when trying to prepare the record.
5420
TODO[ndb]: Elimiate this hard-coded dependency on NDB. Ideally,
5421
the engine should be able to set a flag that it want the default
5422
values filled in and one flag to handle the case that the default
5423
values should be checked. Maybe these two flags can be combined.
5425
if ((error= prepare_record(table, &m_cols, m_width, true)))
5428
/* unpack row into table->record[0] */
5429
error= unpack_current_row(rli, &m_cols);
5431
// Temporary fix to find out why it fails [/Matz]
5432
memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
5435
Try to write record. If a corresponding record already exists in the table,
5436
we try to change it using ha_update_row() if possible. Otherwise we delete
5437
it and repeat the whole process again.
5439
TODO: Add safety measures against infinite looping.
5442
while ((error= table->file->ha_write_row(table->record[0])))
5444
if (error == HA_ERR_LOCK_DEADLOCK ||
5445
error == HA_ERR_LOCK_WAIT_TIMEOUT ||
5446
(keynum= table->file->get_dup_key(error)) < 0 ||
5450
Deadlock, waiting for lock or just an error from the handler
5451
such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
5452
Retrieval of the duplicate key number may fail
5453
- either because the error was not "duplicate key" error
5454
- or because the information which key is not available
5456
table->file->print_error(error, MYF(0));
5460
We need to retrieve the old row into record[1] to be able to
5461
either update or delete the offending record. We either:
5463
- use rnd_pos() with a row-id (available as dupp_row) to the
5464
offending row, if that is possible (MyISAM and Blackhole), or else
5466
- use index_read_idx() with the key that is duplicated, to
5467
retrieve the offending row.
5469
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
5471
if (table->file->inited && (error= table->file->ha_index_end()))
5473
if ((error= table->file->ha_rnd_init(false)))
5476
error= table->file->rnd_pos(table->record[1], table->file->dup_ref);
5477
table->file->ha_rnd_end();
5480
table->file->print_error(error, MYF(0));
5486
if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
5491
key.reserve(table->s->max_unique_length);
5493
key_copy(key, table->record[0], table->key_info + keynum, 0);
5494
error= table->file->index_read_idx_map(table->record[1], keynum,
5500
table->file->print_error(error, MYF(0));
5506
Now, record[1] should contain the offending row. That
5507
will enable us to update it or, alternatively, delete it (so
5508
that we can insert the new row afterwards).
5512
If row is incomplete we will use the record found to fill
5515
if (!get_flags(COMPLETE_ROWS_F))
5517
restore_record(table,record[1]);
5518
error= unpack_current_row(rli, &m_cols);
5522
REPLACE is defined as either INSERT or DELETE + INSERT. If
5523
possible, we can replace it with an UPDATE, but that will not
5524
work on InnoDB if FOREIGN KEY checks are necessary.
5526
I (Matz) am not sure of the reason for the last_uniq_key()
5527
check as, but I'm guessing that it's something along the
5530
Suppose that we got the duplicate key to be a key that is not
5531
the last unique key for the table and we perform an update:
5532
then there might be another key for which the unique check will
5533
fail, so we're better off just deleting the row and inserting
5536
if (last_uniq_key(table, keynum) &&
5537
!table->file->referenced_by_foreign_key())
5539
error=table->file->ha_update_row(table->record[1],
5543
case HA_ERR_RECORD_IS_THE_SAME:
5550
table->file->print_error(error, MYF(0));
5557
if ((error= table->file->ha_delete_row(table->record[1])))
5559
table->file->print_error(error, MYF(0));
5562
/* Will retry ha_write_row() with the offending row removed. */
5570
int Rows_log_event::unpack_current_row(const Relay_log_info *const rli,
5571
MY_BITMAP const *cols)
5574
ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT);
5575
int const result= ::unpack_row(rli, m_table, m_width, m_curr_row, cols,
5576
&m_curr_row_end, &m_master_reclength);
5577
if (m_curr_row_end > m_rows_end)
5578
my_error(ER_SLAVE_CORRUPT_EVENT, MYF(0));
5579
ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT);
5585
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
5587
assert(m_table != NULL);
5589
write_row(rli, /* if 1 then overwrite */
5590
bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
5592
if (error && !session->is_error())
5595
my_error(ER_UNKNOWN_ERROR, MYF(0));
5602
/**************************************************************************
5603
Delete_rows_log_event member functions
5604
**************************************************************************/
5607
Compares table->record[0] and table->record[1]
5609
Returns TRUE if different.
5611
static bool record_compare(Table *table)
5614
Need to set the X bit and the filler bits in both records since
5615
there are engines that do not set it correctly.
5617
In addition, since MyISAM checks that one hasn't tampered with the
5618
record, it is necessary to restore the old bytes into the record
5619
after doing the comparison.
5621
TODO[record format ndb]: Remove it once NDB returns correct
5622
records. Check that the other engines also return correct records.
5625
unsigned char saved_x[2], saved_filler[2];
5627
if (table->s->null_bytes > 0)
5629
for (int i = 0 ; i < 2 ; ++i)
5631
saved_x[i]= table->record[i][0];
5632
saved_filler[i]= table->record[i][table->s->null_bytes - 1];
5633
table->record[i][0]|= 1U;
5634
table->record[i][table->s->null_bytes - 1]|=
5635
256U - (1U << table->s->last_null_bit_pos);
5639
if (table->s->blob_fields + table->s->varchar_fields == 0)
5641
result= cmp_record(table,record[1]);
5642
goto record_compare_exit;
5645
/* Compare null bits */
5646
if (memcmp(table->null_flags,
5647
table->null_flags+table->s->rec_buff_length,
5648
table->s->null_bytes))
5650
result= true; // Diff in NULL value
5651
goto record_compare_exit;
5654
/* Compare updated fields */
5655
for (Field **ptr=table->field ; *ptr ; ptr++)
5657
if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
5660
goto record_compare_exit;
5664
record_compare_exit:
5666
Restore the saved bytes.
5668
TODO[record format ndb]: Remove this code once NDB returns the
5669
correct record format.
5671
if (table->s->null_bytes > 0)
5673
for (int i = 0 ; i < 2 ; ++i)
5675
table->record[i][0]= saved_x[i];
5676
table->record[i][table->s->null_bytes - 1]= saved_filler[i];
5684
Locate the current row in event's table.
5686
The current row is pointed by @c m_curr_row. Member @c m_width tells how many
5687
columns are there in the row (this can be differnet from the number of columns
5688
in the table). It is assumed that event's table is already open and pointed
5691
If a corresponding record is found in the table it is stored in
5692
@c m_table->record[0]. Note that when record is located based on a primary
5693
key, it is possible that the record found differs from the row being located.
5695
If no key is specified or table does not have keys, a table scan is used to
5696
find the row. In that case the row should be complete and contain values for
5697
all columns. However, it can still be shorter than the table, i.e. the table
5698
can contain extra columns not present in the row. It is also possible that
5699
the table has fewer columns than the row being located.
5701
@returns Error code on failure, 0 on success.
5703
@post In case of success @c m_table->record[0] contains the record found.
5704
Also, the internal "cursor" of the table is positioned at the record found.
5706
@note If the engine allows random access of the records, a combination of
5707
@c position() and @c rnd_pos() will be used.
5710
int Rows_log_event::find_row(const Relay_log_info *rli)
5712
assert(m_table && m_table->in_use != NULL);
5714
Table *table= m_table;
5717
/* unpack row - missing fields get default values */
5718
prepare_record(table, &m_cols, m_width, false/* don't check errors */);
5719
error= unpack_current_row(rli, &m_cols);
5721
// Temporary fix to find out why it fails [/Matz]
5722
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
5724
if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
5725
table->s->primary_key < MAX_KEY)
5728
Use a more efficient method to fetch the record given by
5729
table->record[0] if the engine allows it. We first compute a
5730
row reference using the position() member function (it will be
5731
stored in table->file->ref) and the use rnd_pos() to position
5732
the "cursor" (i.e., record[0] in this case) at the correct row.
5734
TODO: Add a check that the correct record has been fetched by
5735
comparing with the original record. Take into account that the
5736
record on the master and slave can be of different
5737
length. Something along these lines should work:
5739
ADD>>> store_record(table,record[1]);
5740
int error= table->file->rnd_pos(table->record[0], table->file->ref);
5741
ADD>>> assert(memcmp(table->record[1], table->record[0],
5742
table->s->reclength) == 0);
5745
int error= table->file->rnd_pos_by_record(table->record[0]);
5746
table->file->ha_rnd_end();
5749
table->file->print_error(error, MYF(0));
5754
// We can't use position() - try other methods.
5757
Save copy of the record in table->record[1]. It might be needed
5758
later if linear search is used to find exact match.
5760
store_record(table,record[1]);
5762
if (table->s->keys > 0)
5764
/* We have a key: search the table using the index */
5765
if (!table->file->inited && (error= table->file->ha_index_init(0, false)))
5767
table->file->print_error(error, MYF(0));
5771
/* Fill key data for the row */
5774
key_copy(m_key, table->record[0], table->key_info, 0);
5777
We need to set the null bytes to ensure that the filler bit are
5778
all set when returning. There are storage engines that just set
5779
the necessary bits on the bytes and don't set the filler bits
5782
my_ptrdiff_t const pos=
5783
table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
5784
table->record[0][pos]= 0xFF;
5786
if ((error= table->file->index_read_map(table->record[0], m_key,
5788
HA_READ_KEY_EXACT)))
5790
table->file->print_error(error, MYF(0));
5791
table->file->ha_index_end();
5796
Below is a minor "optimization". If the key (i.e., key number
5797
0) has the HA_NOSAME flag set, we know that we have found the
5798
correct record (since there can be no duplicates); otherwise, we
5799
have to compare the record with the one found to see if it is
5802
CAVEAT! This behaviour is essential for the replication of,
5803
e.g., the mysql.proc table since the correct record *shall* be
5804
found using the primary key *only*. There shall be no
5805
comparison of non-PK columns to decide if the correct record is
5806
found. I can see no scenario where it would be incorrect to
5807
chose the row to change only using a PK or an UNNI.
5809
if (table->key_info->flags & HA_NOSAME)
5811
table->file->ha_index_end();
5816
In case key is not unique, we still have to iterate over records found
5817
and find the one which is identical to the row given. A copy of the
5818
record we are looking for is stored in record[1].
5820
while (record_compare(table))
5823
We need to set the null bytes to ensure that the filler bit
5824
are all set when returning. There are storage engines that
5825
just set the necessary bits on the bytes and don't set the
5826
filler bits correctly.
5828
TODO[record format ndb]: Remove this code once NDB returns the
5829
correct record format.
5831
if (table->s->null_bytes > 0)
5833
table->record[0][table->s->null_bytes - 1]|=
5834
256U - (1U << table->s->last_null_bit_pos);
5837
if ((error= table->file->index_next(table->record[0])))
5839
table->file->print_error(error, MYF(0));
5840
table->file->ha_index_end();
5846
Have to restart the scan to be able to fetch the next row.
5848
table->file->ha_index_end();
5852
int restart_count= 0; // Number of times scanning has restarted from top
5854
/* We don't have a key: search the table using rnd_next() */
5855
if ((error= table->file->ha_rnd_init(1)))
5857
table->file->print_error(error, MYF(0));
5861
/* Continue until we find the right record or have made a full loop */
5864
error= table->file->rnd_next(table->record[0]);
5869
case HA_ERR_RECORD_DELETED:
5872
case HA_ERR_END_OF_FILE:
5873
if (++restart_count < 2)
5874
table->file->ha_rnd_init(1);
5878
table->file->print_error(error, MYF(0));
5879
table->file->ha_rnd_end();
5883
while (restart_count < 2 && record_compare(table));
5886
Note: above record_compare will take into accout all record fields
5887
which might be incorrect in case a partial row was given in the event
5889
table->file->ha_rnd_end();
5891
assert(error == HA_ERR_END_OF_FILE || error == HA_ERR_RECORD_DELETED || error == 0);
5895
table->default_column_bitmaps();
5898
table->default_column_bitmaps();
5904
Constructor used to build an event for writing to the binary log.
5907
Delete_rows_log_event::Delete_rows_log_event(Session *session_arg, Table *tbl_arg,
5909
bool is_transactional)
5910
: Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
5915
Constructor used by slave to read the event from the binary log.
5917
Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint32_t event_len,
5918
const Format_description_log_event
5920
: Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
5926
Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
5928
if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
5929
m_table->s->primary_key < MAX_KEY)
5932
We don't need to allocate any memory for m_key since it is not used.
5937
if (m_table->s->keys > 0)
5939
// Allocate buffer for key searches
5940
m_key= (unsigned char*)malloc(m_table->key_info->key_length);
5942
return HA_ERR_OUT_OF_MEM;
5949
Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
5952
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
5953
m_table->file->ha_index_or_rnd_end();
5960
int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
5963
assert(m_table != NULL);
5965
if (!(error= find_row(rli)))
5968
Delete the record found, located in record[0]
5970
error= m_table->file->ha_delete_row(m_table->record[0]);
5976
/**************************************************************************
5977
Update_rows_log_event member functions
5978
**************************************************************************/
5981
Constructor used to build an event for writing to the binary log.
5983
Update_rows_log_event::Update_rows_log_event(Session *session_arg, Table *tbl_arg,
5985
bool is_transactional)
5986
: Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
5988
init(tbl_arg->write_set);
5991
void Update_rows_log_event::init(MY_BITMAP const *cols)
5993
/* if bitmap_init fails, caught in is_valid() */
5994
if (likely(!bitmap_init(&m_cols_ai,
5995
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
5999
/* Cols can be zero if this is a dummy binrows event */
6000
if (likely(cols != NULL))
6002
memcpy(m_cols_ai.bitmap, cols->bitmap, no_bytes_in_map(cols));
6003
create_last_word_mask(&m_cols_ai);
6009
Update_rows_log_event::~Update_rows_log_event()
6011
if (m_cols_ai.bitmap == m_bitbuf_ai) // no malloc happened
6012
m_cols_ai.bitmap= 0; // so no free in bitmap_free
6013
bitmap_free(&m_cols_ai); // To pair with bitmap_init().
6018
Constructor used by slave to read the event from the binary log.
6020
Update_rows_log_event::Update_rows_log_event(const char *buf, uint32_t event_len,
6022
Format_description_log_event
6024
: Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
6030
Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6032
if (m_table->s->keys > 0)
6034
// Allocate buffer for key searches
6035
m_key= (unsigned char*)malloc(m_table->key_info->key_length);
6037
return HA_ERR_OUT_OF_MEM;
6040
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
6046
Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6049
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6050
m_table->file->ha_index_or_rnd_end();
6051
free(m_key); // Free for multi_malloc
6058
Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6060
assert(m_table != NULL);
6062
int error= find_row(rli);
6066
We need to read the second image in the event of error to be
6067
able to skip to the next pair of updates
6069
m_curr_row= m_curr_row_end;
6070
unpack_current_row(rli, &m_cols_ai);
6075
This is the situation after locating BI:
6077
===|=== before image ====|=== after image ===|===
6079
m_curr_row m_curr_row_end
6081
BI found in the table is stored in record[0]. We copy it to record[1]
6082
and unpack AI to record[0].
6085
store_record(m_table,record[1]);
6087
m_curr_row= m_curr_row_end;
6088
error= unpack_current_row(rli, &m_cols_ai); // this also updates m_curr_row_end
6091
Now we have the right row to update. The old row (the one we're
6092
looking for) is in record[1] and the new row is in record[0].
6095
// Temporary fix to find out why it fails [/Matz]
6096
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
6097
memcpy(m_table->write_set->bitmap, m_cols_ai.bitmap, (m_table->write_set->n_bits + 7) / 8);
6099
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
6100
if (error == HA_ERR_RECORD_IS_THE_SAME)
6107
Incident_log_event::Incident_log_event(const char *buf, uint32_t event_len,
6108
const Format_description_log_event *descr_event)
6109
: Log_event(buf, descr_event)
6111
uint8_t const common_header_len=
6112
descr_event->common_header_len;
6113
uint8_t const post_header_len=
6114
descr_event->post_header_len[INCIDENT_EVENT-1];
6116
m_incident= static_cast<Incident>(uint2korr(buf + common_header_len));
6117
char const *ptr= buf + common_header_len + post_header_len;
6118
char const *const str_end= buf + event_len;
6119
uint8_t len= 0; // Assignment to keep compiler happy
6120
const char *str= NULL; // Assignment to keep compiler happy
6121
read_str(&ptr, str_end, &str, &len);
6122
m_message.str= const_cast<char*>(str);
6123
m_message.length= len;
6128
Incident_log_event::~Incident_log_event()
6134
Incident_log_event::description() const
6136
static const char *const description[]= {
6137
"NOTHING", // Not used
6141
assert(0 <= m_incident);
6142
assert((size_t) m_incident <= sizeof(description)/sizeof(*description));
6144
return description[m_incident];
6148
void Incident_log_event::pack_info(Protocol *protocol)
6152
if (m_message.length > 0)
6153
bytes= snprintf(buf, sizeof(buf), "#%d (%s)",
6154
m_incident, description());
6156
bytes= snprintf(buf, sizeof(buf), "#%d (%s): %s",
6157
m_incident, description(), m_message.str);
6158
protocol->store(buf, bytes, &my_charset_bin);
6163
Incident_log_event::do_apply_event(Relay_log_info const *rli)
6165
rli->report(ERROR_LEVEL, ER_SLAVE_INCIDENT,
6166
ER(ER_SLAVE_INCIDENT),
6168
m_message.length > 0 ? m_message.str : "<none>");
6174
Incident_log_event::write_data_header(IO_CACHE *file)
6176
unsigned char buf[sizeof(int16_t)];
6177
int2store(buf, (int16_t) m_incident);
6178
return(my_b_safe_write(file, buf, sizeof(buf)));
6182
Incident_log_event::write_data_body(IO_CACHE *file)
6184
return(write_str(file, m_message.str, m_message.length));
6187
Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint32_t event_len,
6188
const Format_description_log_event* description_event)
6189
:Log_event(buf, description_event)
6191
uint8_t header_size= description_event->common_header_len;
6192
ident_len = event_len - header_size;
6193
set_if_smaller(ident_len,FN_REFLEN-1);
6194
log_ident= buf + header_size;