1
/* Copyright (C) 2000-2004 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
#include <drizzled/server_includes.h>
20
#include "rpl_filter.h"
21
#include "rpl_utility.h"
22
#include "rpl_record.h"
23
#include <mysys/my_dir.h>
24
#include <drizzled/drizzled_error_messages.h>
28
#include <mysys/base64.h>
29
#include <mysys/my_bitmap.h>
31
#include <libdrizzle/gettext.h>
32
#include <libdrizzle/libdrizzle.h>
34
#define log_cs &my_charset_utf8_general_ci
36
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
40
Size of buffer for printing a double in format %.<PREC>g
42
optional '-' + optional zero + '.' + PREC digits + 'e' + sign +
43
exponent digits + '\0'
45
#define FMT_G_BUFSIZE(PREC) (3 + (PREC) + 5 + 1)
48
static const char *HA_ERR(int i)
51
case HA_ERR_KEY_NOT_FOUND: return "HA_ERR_KEY_NOT_FOUND";
52
case HA_ERR_FOUND_DUPP_KEY: return "HA_ERR_FOUND_DUPP_KEY";
53
case HA_ERR_RECORD_CHANGED: return "HA_ERR_RECORD_CHANGED";
54
case HA_ERR_WRONG_INDEX: return "HA_ERR_WRONG_INDEX";
55
case HA_ERR_CRASHED: return "HA_ERR_CRASHED";
56
case HA_ERR_WRONG_IN_RECORD: return "HA_ERR_WRONG_IN_RECORD";
57
case HA_ERR_OUT_OF_MEM: return "HA_ERR_OUT_OF_MEM";
58
case HA_ERR_NOT_A_TABLE: return "HA_ERR_NOT_A_TABLE";
59
case HA_ERR_WRONG_COMMAND: return "HA_ERR_WRONG_COMMAND";
60
case HA_ERR_OLD_FILE: return "HA_ERR_OLD_FILE";
61
case HA_ERR_NO_ACTIVE_RECORD: return "HA_ERR_NO_ACTIVE_RECORD";
62
case HA_ERR_RECORD_DELETED: return "HA_ERR_RECORD_DELETED";
63
case HA_ERR_RECORD_FILE_FULL: return "HA_ERR_RECORD_FILE_FULL";
64
case HA_ERR_INDEX_FILE_FULL: return "HA_ERR_INDEX_FILE_FULL";
65
case HA_ERR_END_OF_FILE: return "HA_ERR_END_OF_FILE";
66
case HA_ERR_UNSUPPORTED: return "HA_ERR_UNSUPPORTED";
67
case HA_ERR_TO_BIG_ROW: return "HA_ERR_TO_BIG_ROW";
68
case HA_WRONG_CREATE_OPTION: return "HA_WRONG_CREATE_OPTION";
69
case HA_ERR_FOUND_DUPP_UNIQUE: return "HA_ERR_FOUND_DUPP_UNIQUE";
70
case HA_ERR_UNKNOWN_CHARSET: return "HA_ERR_UNKNOWN_CHARSET";
71
case HA_ERR_WRONG_MRG_TABLE_DEF: return "HA_ERR_WRONG_MRG_TABLE_DEF";
72
case HA_ERR_CRASHED_ON_REPAIR: return "HA_ERR_CRASHED_ON_REPAIR";
73
case HA_ERR_CRASHED_ON_USAGE: return "HA_ERR_CRASHED_ON_USAGE";
74
case HA_ERR_LOCK_WAIT_TIMEOUT: return "HA_ERR_LOCK_WAIT_TIMEOUT";
75
case HA_ERR_LOCK_TABLE_FULL: return "HA_ERR_LOCK_TABLE_FULL";
76
case HA_ERR_READ_ONLY_TRANSACTION: return "HA_ERR_READ_ONLY_TRANSACTION";
77
case HA_ERR_LOCK_DEADLOCK: return "HA_ERR_LOCK_DEADLOCK";
78
case HA_ERR_CANNOT_ADD_FOREIGN: return "HA_ERR_CANNOT_ADD_FOREIGN";
79
case HA_ERR_NO_REFERENCED_ROW: return "HA_ERR_NO_REFERENCED_ROW";
80
case HA_ERR_ROW_IS_REFERENCED: return "HA_ERR_ROW_IS_REFERENCED";
81
case HA_ERR_NO_SAVEPOINT: return "HA_ERR_NO_SAVEPOINT";
82
case HA_ERR_NON_UNIQUE_BLOCK_SIZE: return "HA_ERR_NON_UNIQUE_BLOCK_SIZE";
83
case HA_ERR_NO_SUCH_TABLE: return "HA_ERR_NO_SUCH_TABLE";
84
case HA_ERR_TABLE_EXIST: return "HA_ERR_TABLE_EXIST";
85
case HA_ERR_NO_CONNECTION: return "HA_ERR_NO_CONNECTION";
86
case HA_ERR_NULL_IN_SPATIAL: return "HA_ERR_NULL_IN_SPATIAL";
87
case HA_ERR_TABLE_DEF_CHANGED: return "HA_ERR_TABLE_DEF_CHANGED";
88
case HA_ERR_NO_PARTITION_FOUND: return "HA_ERR_NO_PARTITION_FOUND";
89
case HA_ERR_RBR_LOGGING_FAILED: return "HA_ERR_RBR_LOGGING_FAILED";
90
case HA_ERR_DROP_INDEX_FK: return "HA_ERR_DROP_INDEX_FK";
91
case HA_ERR_FOREIGN_DUPLICATE_KEY: return "HA_ERR_FOREIGN_DUPLICATE_KEY";
92
case HA_ERR_TABLE_NEEDS_UPGRADE: return "HA_ERR_TABLE_NEEDS_UPGRADE";
93
case HA_ERR_TABLE_READONLY: return "HA_ERR_TABLE_READONLY";
94
case HA_ERR_AUTOINC_READ_FAILED: return "HA_ERR_AUTOINC_READ_FAILED";
95
case HA_ERR_AUTOINC_ERANGE: return "HA_ERR_AUTOINC_ERANGE";
96
case HA_ERR_GENERIC: return "HA_ERR_GENERIC";
97
case HA_ERR_RECORD_IS_THE_SAME: return "HA_ERR_RECORD_IS_THE_SAME";
98
case HA_ERR_LOGGING_IMPOSSIBLE: return "HA_ERR_LOGGING_IMPOSSIBLE";
99
case HA_ERR_CORRUPT_EVENT: return "HA_ERR_CORRUPT_EVENT";
100
case HA_ERR_ROWS_EVENT_APPLY : return "HA_ERR_ROWS_EVENT_APPLY";
106
Error reporting facility for Rows_log_event::do_apply_event
108
@param level error, warning or info
109
@param ha_error HA_ERR_ code
110
@param rli pointer to the active Relay_log_info instance
111
@param thd pointer to the slave thread's thd
112
@param table pointer to the event's table object
113
@param type the type of the event
114
@param log_name the master binlog file name
115
@param pos the master binlog file pos (the next after the event)
118
static void inline slave_rows_error_report(enum loglevel level, int ha_error,
119
Relay_log_info const *rli, THD *thd,
120
Table *table, const char * type,
121
const char *log_name, ulong pos)
123
const char *handler_error= HA_ERR(ha_error);
124
char buff[MAX_SLAVE_ERRMSG], *slider;
125
const char *buff_end= buff + sizeof(buff);
127
List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
131
for (err= it++, slider= buff; err && slider < buff_end - 1;
132
slider += len, err= it++)
134
len= snprintf(slider, buff_end - slider,
135
_(" %s, Error_code: %d;"), err->msg, err->code);
138
rli->report(level, thd->is_error()? thd->main_da.sql_errno() : 0,
139
_("Could not execute %s event on table %s.%s;"
140
"%s handler error %s; "
141
"the event's master log %s, end_log_pos %lu"),
142
type, table->s->db.str,
143
table->s->table_name.str,
145
handler_error == NULL? _("<unknown>") : handler_error,
151
Cache that will automatically be written to a dedicated file on
157
class Write_on_release_cache
165
typedef unsigned short flag_set;
171
Write_on_release_cache
172
cache Pointer to cache to use
173
file File to write cache to upon destruction
174
flags Flags for the cache
178
Class used to guarantee copy of cache to file before exiting the
179
current block. On successful copy of the cache, the cache will
180
be reinited as a WRITE_CACHE.
182
Currently, a pointer to the cache is provided in the
183
constructor, but it would be possible to create a subclass
184
holding the IO_CACHE itself.
186
Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0)
187
: m_cache(cache), m_file(file), m_flags(flags)
189
reinit_io_cache(m_cache, WRITE_CACHE, 0L, false, true);
192
~Write_on_release_cache()
194
copy_event_cache_to_file_and_reinit(m_cache, m_file);
195
if (m_flags | FLUSH_F)
200
Return a pointer to the internal IO_CACHE.
207
Function to return a pointer to the internal cache, so that the
208
object can be treated as a IO_CACHE and used with the my_b_*
212
A pointer to the internal IO_CACHE.
214
IO_CACHE *operator&()
220
// Hidden, to prevent usage.
221
Write_on_release_cache(Write_on_release_cache const&);
228
uint32_t debug_not_change_ts_if_art_event= 1; // bug#29309 simulation
234
static void clear_all_errors(THD *thd, Relay_log_info *rli)
236
thd->is_slave_error = 0;
243
Ignore error code specified on command line.
246
inline int ignored_error_code(int err_code)
248
return ((err_code == ER_SLAVE_IGNORED_TABLE) ||
249
(use_slave_mask && bitmap_is_set(&slave_error_mask, err_code)));
257
static char *pretty_print_str(char *packet, const char *str, int len)
259
const char *end= str + len;
265
switch ((c=*str++)) {
266
case '\n': *pos++= '\\'; *pos++= 'n'; break;
267
case '\r': *pos++= '\\'; *pos++= 'r'; break;
268
case '\\': *pos++= '\\'; *pos++= '\\'; break;
269
case '\b': *pos++= '\\'; *pos++= 'b'; break;
270
case '\t': *pos++= '\\'; *pos++= 't'; break;
271
case '\'': *pos++= '\\'; *pos++= '\''; break;
272
case 0 : *pos++= '\\'; *pos++= '0'; break;
284
Creates a temporary name for load data infile:.
286
@param buf Store new filename here
287
@param file_id File_id (part of file name)
288
@param event_server_id Event_id (part of file name)
289
@param ext Extension for file name
292
Pointer to start of extension
295
static char *slave_load_file_stem(char *buf, uint32_t file_id,
296
int event_server_id, const char *ext)
299
fn_format(buf,"SQL_LOAD-",slave_load_tmpdir, "", MY_UNPACK_FILENAME);
302
buf= strchr(buf, '\0');
303
buf= int10_to_str(::server_id, buf, 10);
305
buf= int10_to_str(event_server_id, buf, 10);
307
res= int10_to_str(file_id, buf, 10);
308
my_stpcpy(res, ext); // Add extension last
309
return res; // Pointer to extension
314
Delete all temporary files used for SQL_LOAD.
317
static void cleanup_load_tmpdir()
322
char fname[FN_REFLEN], prefbuf[31], *p;
324
if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
328
When we are deleting temporary files, we should only remove
329
the files associated with the server id of our server.
330
We don't use event_server_id here because since we've disabled
331
direct binlogging of Create_file/Append_file/Exec_load events
332
we cannot meet Start_log event in the middle of events from one
335
p= strmake(prefbuf, STRING_WITH_LEN("SQL_LOAD-"));
336
p= int10_to_str(::server_id, p, 10);
340
for (i=0 ; i < (uint)dirp->number_off_files; i++)
342
file=dirp->dir_entry+i;
343
if (is_prefix(file->name, prefbuf))
345
fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME);
346
my_delete(fname, MYF(0));
358
static bool write_str(IO_CACHE *file, const char *str, uint32_t length)
360
unsigned char tmp[1];
361
tmp[0]= (unsigned char) length;
362
return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
363
my_b_safe_write(file, (unsigned char*) str, length));
371
static inline int read_str(const char **buf, const char *buf_end,
372
const char **str, uint8_t *len)
374
if (*buf + ((uint) (unsigned char) **buf) >= buf_end)
376
*len= (uint8_t) **buf;
378
(*buf)+= (uint) *len+1;
384
Transforms a string into "" or its expression in 0x... form.
387
char *str_to_hex(char *to, const char *from, uint32_t len)
393
to= octet2hex(to, from, len);
396
to= my_stpcpy(to, "\"\"");
397
return to; // pointer to end 0 of 'to'
402
Append a version of the 'from' string suitable for use in a query to
403
the 'to' string. To generate a correct escaping, the character set
404
information in 'csinfo' is used.
408
append_query_string(const CHARSET_INFO * const csinfo,
409
String const *from, String *to)
412
uint32_t const orig_len= to->length();
413
if (to->reserve(orig_len + from->length()*2+3))
416
beg= to->c_ptr_quick() + to->length();
418
if (csinfo->escape_with_backslash_is_dangerous)
419
ptr= str_to_hex(ptr, from->ptr(), from->length());
423
ptr+= drizzle_escape_string(ptr, from->ptr(), from->length());
426
to->length(orig_len + ptr - beg);
431
/**************************************************************************
432
Log_event methods (= the parent class of all events)
433
**************************************************************************/
437
returns the human readable name of the event's type
440
const char* Log_event::get_type_str(Log_event_type type)
443
case START_EVENT_V3: return "Start_v3";
444
case STOP_EVENT: return "Stop";
445
case QUERY_EVENT: return "Query";
446
case ROTATE_EVENT: return "Rotate";
447
case INTVAR_EVENT: return "Intvar";
448
case LOAD_EVENT: return "Load";
449
case NEW_LOAD_EVENT: return "New_load";
450
case SLAVE_EVENT: return "Slave";
451
case CREATE_FILE_EVENT: return "Create_file";
452
case APPEND_BLOCK_EVENT: return "Append_block";
453
case DELETE_FILE_EVENT: return "Delete_file";
454
case EXEC_LOAD_EVENT: return "Exec_load";
455
case RAND_EVENT: return "RAND";
456
case XID_EVENT: return "Xid";
457
case USER_VAR_EVENT: return "User var";
458
case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
459
case TABLE_MAP_EVENT: return "Table_map";
460
case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old";
461
case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old";
462
case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old";
463
case WRITE_ROWS_EVENT: return "Write_rows";
464
case UPDATE_ROWS_EVENT: return "Update_rows";
465
case DELETE_ROWS_EVENT: return "Delete_rows";
466
case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
467
case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
468
case INCIDENT_EVENT: return "Incident";
469
default: return "Unknown"; /* impossible */
473
const char* Log_event::get_type_str()
475
return get_type_str(get_type_code());
480
Log_event::Log_event()
483
Log_event::Log_event(THD* thd_arg, uint16_t flags_arg, bool using_trans)
484
:log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), thd(thd_arg)
486
server_id= thd->server_id;
487
when= thd->start_time;
488
cache_stmt= using_trans;
493
This minimal constructor is for when you are not even sure that there
494
is a valid THD. For example in the server when we are shutting down or
495
flushing logs after receiving a SIGHUP (then we must write a Rotate to
496
the binlog but we have no THD, so we need this minimal constructor).
499
Log_event::Log_event()
500
:temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
503
server_id= ::server_id;
505
We can't call my_time() here as this would cause a call before
514
Log_event::Log_event()
517
Log_event::Log_event(const char* buf,
518
const Format_description_log_event* description_event)
519
:temp_buf(0), cache_stmt(0)
522
when= uint4korr(buf);
523
server_id= uint4korr(buf + SERVER_ID_OFFSET);
524
data_written= uint4korr(buf + EVENT_LEN_OFFSET);
525
if (description_event->binlog_version==1)
532
log_pos= uint4korr(buf + LOG_POS_OFFSET);
534
If the log is 4.0 (so here it can only be a 4.0 relay log read by
535
the SQL thread or a 4.0 master binlog read by the I/O thread),
536
log_pos is the beginning of the event: we transform it into the end
537
of the event, which is more useful.
538
But how do you know that the log is 4.0: you know it if
539
description_event is version 3 *and* you are not reading a
540
Format_desc (remember that mysqlbinlog starts by assuming that 5.0
541
logs are in 4.0 format, until it finds a Format_desc).
543
if (description_event->binlog_version==3 &&
544
buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
547
If log_pos=0, don't change it. log_pos==0 is a marker to mean
548
"don't change rli->group_master_log_pos" (see
549
inc_group_relay_log_pos()). As it is unreal log_pos, adding the
550
event len's is nonsense. For example, a fake Rotate event should
551
not have its log_pos (which is 0) changed or it will modify
552
Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense
553
value of (a non-zero offset which does not exist in the master's
554
binlog, so which will cause problems if the user uses this value
557
log_pos+= data_written; /* purecov: inspected */
560
flags= uint2korr(buf + FLAGS_OFFSET);
561
if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
562
(buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
565
These events always have a header which stops here (i.e. their
569
Initialization to zero of all other Log_event members as they're
570
not specified. Currently there are no such members; in the future
571
there will be an event UID (but Format_description and Rotate
572
don't need this UID, as they are not propagated through
573
--log-slave-updates (remember the UID is used to not play a query
574
twice when you have two masters which are slaves of a 3rd master).
579
/* otherwise, go on with reading the header from buf (nothing now) */
583
int Log_event::do_update_pos(Relay_log_info *rli)
586
rli is null when (as far as I (Guilhem) know) the caller is
587
Load_log_event::do_apply_event *and* that one is called from
588
Execute_load_log_event::do_apply_event. In this case, we don't
589
do anything here ; Execute_load_log_event::do_apply_event will
590
call Log_event::do_apply_event again later with the proper rli.
591
Strictly speaking, if we were sure that rli is null only in the
592
case discussed above, 'if (rli)' is useless here. But as we are
593
not 100% sure, keep it for now.
595
Matz: I don't think we will need this check with this refactoring.
600
bug#29309 simulation: resetting the flag to force
601
wrong behaviour of artificial event to update
602
rli->last_master_timestamp for only one time -
603
the first FLUSH LOGS in the test.
605
if (debug_not_change_ts_if_art_event == 1
606
&& is_artificial_event())
607
debug_not_change_ts_if_art_event= 0;
608
rli->stmt_done(log_pos,
609
is_artificial_event() &&
610
debug_not_change_ts_if_art_event > 0 ? 0 : when);
611
if (debug_not_change_ts_if_art_event == 0)
612
debug_not_change_ts_if_art_event= 2;
614
return 0; // Cannot fail currently
618
Log_event::enum_skip_reason
619
Log_event::do_shall_skip(Relay_log_info *rli)
621
if ((server_id == ::server_id && !rli->replicate_same_server_id) || (rli->slave_skip_counter == 1 && rli->is_in_group()))
622
return EVENT_SKIP_IGNORE;
623
else if (rli->slave_skip_counter > 0)
624
return EVENT_SKIP_COUNT;
626
return EVENT_SKIP_NOT;
631
Log_event::pack_info()
634
void Log_event::pack_info(Protocol *protocol)
636
protocol->store("", &my_charset_bin);
641
Only called by SHOW BINLOG EVENTS
643
int Log_event::net_send(Protocol *protocol, const char* log_name, my_off_t pos)
645
const char *p= strrchr(log_name, FN_LIBCHAR);
646
const char *event_type;
650
protocol->prepare_for_resend();
651
protocol->store(log_name, &my_charset_bin);
652
protocol->store((uint64_t) pos);
653
event_type = get_type_str();
654
protocol->store(event_type, strlen(event_type), &my_charset_bin);
655
protocol->store((uint32_t) server_id);
656
protocol->store((uint64_t) log_pos);
658
return protocol->write();
663
init_show_field_list() prepares the column names and types for the
664
output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
668
void Log_event::init_show_field_list(List<Item>* field_list)
670
field_list->push_back(new Item_empty_string("Log_name", 20));
671
field_list->push_back(new Item_return_int("Pos", MY_INT32_NUM_DECIMAL_DIGITS,
672
DRIZZLE_TYPE_LONGLONG));
673
field_list->push_back(new Item_empty_string("Event_type", 20));
674
field_list->push_back(new Item_return_int("Server_id", 10,
676
field_list->push_back(new Item_return_int("End_log_pos",
677
MY_INT32_NUM_DECIMAL_DIGITS,
678
DRIZZLE_TYPE_LONGLONG));
679
field_list->push_back(new Item_empty_string("Info", 20));
686
bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
688
unsigned char header[LOG_EVENT_HEADER_LEN];
691
/* Store number of bytes that will be written by this event */
692
data_written= event_data_length + sizeof(header);
695
log_pos != 0 if this is relay-log event. In this case we should not
699
if (is_artificial_event())
702
We should not do any cleanup on slave when reading this. We
703
mark this by setting log_pos to 0. Start_log_event_v3() will
704
detect this on reading and set artificial_event=1 for the event.
711
Calculate position of end of event
713
Note that with a SEQ_READ_APPEND cache, my_b_tell() does not
714
work well. So this will give slightly wrong positions for the
715
Format_desc/Rotate/Stop events which the slave writes to its
716
relay log. For example, the initial Format_desc will have
717
end_log_pos=91 instead of 95. Because after writing the first 4
718
bytes of the relay log, my_b_tell() still reports 0. Because
719
my_b_append() does not update the counter which my_b_tell()
720
later uses (one should probably use my_b_append_tell() to work
721
around this). To get right positions even when writing to the
722
relay log, we use the (new) my_b_safe_tell().
724
Note that this raises a question on the correctness of all these
725
assert(my_b_tell()=rli->event_relay_log_pos).
727
If in a transaction, the log_pos which we calculate below is not
728
very good (because then my_b_safe_tell() returns start position
729
of the BEGIN, so it's like the statement was at the BEGIN's
730
place), but it's not a very serious problem (as the slave, when
731
it is in a transaction, does not take those end_log_pos into
732
account (as it calls inc_event_relay_log_pos()). To be fixed
733
later, so that it looks less strange. But not bug.
736
log_pos= my_b_safe_tell(file)+data_written;
739
now= (ulong) get_time(); // Query start time
742
Header will be of size LOG_EVENT_HEADER_LEN for all events, except for
743
FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT, where it will be
744
LOG_EVENT_MINIMAL_HEADER_LEN (remember these 2 have a frozen header,
745
because we read them before knowing the format).
748
int4store(header, now); // timestamp
749
header[EVENT_TYPE_OFFSET]= get_type_code();
750
int4store(header+ SERVER_ID_OFFSET, server_id);
751
int4store(header+ EVENT_LEN_OFFSET, data_written);
752
int4store(header+ LOG_POS_OFFSET, log_pos);
753
int2store(header+ FLAGS_OFFSET, flags);
755
return(my_b_safe_write(file, header, sizeof(header)) != 0);
760
This needn't be format-tolerant, because we only read
761
LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length).
764
int Log_event::read_log_event(IO_CACHE* file, String* packet,
765
pthread_mutex_t* log_lock)
769
char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
772
pthread_mutex_lock(log_lock);
773
if (my_b_read(file, (unsigned char*) buf, sizeof(buf)))
776
If the read hits eof, we must report it as eof so the caller
777
will know it can go into cond_wait to be woken up on the next
781
result= LOG_READ_EOF;
783
result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
786
data_len= uint4korr(buf + EVENT_LEN_OFFSET);
787
if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
788
data_len > current_thd->variables.max_allowed_packet)
790
result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
795
/* Append the log event header to packet */
796
if (packet->append(buf, sizeof(buf)))
798
/* Failed to allocate packet */
799
result= LOG_READ_MEM;
802
data_len-= LOG_EVENT_MINIMAL_HEADER_LEN;
805
/* Append rest of event, read directly from file into packet */
806
if (packet->append(file, data_len))
809
Fatal error occured when appending rest of the event
810
to packet, possible failures:
811
1. EOF occured when reading from file, it's really an error
812
as data_len is >=0 there's supposed to be more bytes available.
813
file->error will have been set to number of bytes left to read
814
2. Read was interrupted, file->error would normally be set to -1
815
3. Failed to allocate memory for packet, my_errno
816
will be ENOMEM(file->error shuold be 0, but since the
817
memory allocation occurs before the call to read it might
820
result= (my_errno == ENOMEM ? LOG_READ_MEM :
821
(file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
822
/* Implicit goto end; */
828
pthread_mutex_unlock(log_lock);
832
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
833
#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
837
Allocates memory; The caller is responsible for clean-up.
839
Log_event* Log_event::read_log_event(IO_CACHE* file,
840
pthread_mutex_t* log_lock,
841
const Format_description_log_event
844
assert(description_event != 0);
845
char head[LOG_EVENT_MINIMAL_HEADER_LEN];
847
First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
848
check the event for sanity and to know its length; no need to really parse
849
it. We say "at most" because this could be a 3.23 master, which has header
850
of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's
851
"minimal" over the set {MySQL >=4.0}).
853
uint32_t header_size= cmin(description_event->common_header_len,
854
LOG_EVENT_MINIMAL_HEADER_LEN);
857
if (my_b_read(file, (unsigned char *) head, header_size))
861
No error here; it could be that we are at the file's end. However
862
if the next my_b_read() fails (below), it will be an error as we
863
were able to read the first bytes.
867
uint32_t data_len = uint4korr(head + EVENT_LEN_OFFSET);
869
const char *error= 0;
871
#ifndef max_allowed_packet
872
THD *thd=current_thd;
873
uint32_t max_allowed_packet= thd ? thd->variables.max_allowed_packet : ~(ulong)0;
876
if (data_len > max_allowed_packet)
878
error = "Event too big";
882
if (data_len < header_size)
884
error = "Event too small";
888
// some events use the extra byte to null-terminate strings
889
if (!(buf = (char*) my_malloc(data_len+1, MYF(MY_WME))))
891
error = "Out of memory";
895
memcpy(buf, head, header_size);
896
if (my_b_read(file, (unsigned char*) buf + header_size, data_len - header_size))
898
error = "read error";
901
if ((res= read_log_event(buf, data_len, &error, description_event)))
902
res->register_temp_buf(buf);
909
sql_print_error(_("Error in Log_event::read_log_event(): "
910
"'%s', data_len: %d, event_type: %d"),
911
error,data_len,head[EVENT_TYPE_OFFSET]);
914
The SQL slave thread will check if file->error<0 to know
915
if there was an I/O error. Even if there is no "low-level" I/O errors
916
with 'file', any of the high-level above errors is worrying
917
enough to stop the SQL thread now ; as we are skipping the current event,
918
going on with reading and successfully executing other events can
919
only corrupt the slave's databases. So stop.
928
Binlog format tolerance is in (buf, event_len, description_event)
932
Log_event* Log_event::read_log_event(const char* buf, uint32_t event_len,
934
const Format_description_log_event *description_event)
937
assert(description_event != 0);
939
/* Check the integrity */
940
if (event_len < EVENT_LEN_OFFSET ||
941
buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
942
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
944
*error="Sanity check failed"; // Needed to free buffer
945
return(NULL); // general sanity check - will fail on a partial read
948
uint32_t event_type= buf[EVENT_TYPE_OFFSET];
949
if (event_type > description_event->number_of_event_types &&
950
event_type != FORMAT_DESCRIPTION_EVENT)
953
It is unsafe to use the description_event if its post_header_len
954
array does not include the event type.
961
In some previuos versions (see comment in
962
Format_description_log_event::Format_description_log_event(char*,...)),
963
event types were assigned different id numbers than in the
964
present version. In order to replicate from such versions to the
965
present version, we must map those event type id's to our event
966
type id's. The mapping is done with the event_type_permutation
967
array, which was set up when the Format_description_log_event
970
if (description_event->event_type_permutation)
971
event_type= description_event->event_type_permutation[event_type];
975
ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
978
ev = new Load_log_event(buf, event_len, description_event);
981
ev = new Load_log_event(buf, event_len, description_event);
984
ev = new Rotate_log_event(buf, event_len, description_event);
986
case SLAVE_EVENT: /* can never happen (unused event) */
987
ev = new Slave_log_event(buf, event_len);
989
case CREATE_FILE_EVENT:
990
ev = new Create_file_log_event(buf, event_len, description_event);
992
case APPEND_BLOCK_EVENT:
993
ev = new Append_block_log_event(buf, event_len, description_event);
995
case DELETE_FILE_EVENT:
996
ev = new Delete_file_log_event(buf, event_len, description_event);
998
case EXEC_LOAD_EVENT:
999
ev = new Execute_load_log_event(buf, event_len, description_event);
1001
case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
1002
ev = new Start_log_event_v3(buf, description_event);
1005
ev = new Stop_log_event(buf, description_event);
1008
ev = new Intvar_log_event(buf, description_event);
1011
ev = new Xid_log_event(buf, description_event);
1014
ev = new Rand_log_event(buf, description_event);
1016
case USER_VAR_EVENT:
1017
ev = new User_var_log_event(buf, description_event);
1019
case FORMAT_DESCRIPTION_EVENT:
1020
ev = new Format_description_log_event(buf, event_len, description_event);
1022
case WRITE_ROWS_EVENT:
1023
ev = new Write_rows_log_event(buf, event_len, description_event);
1025
case UPDATE_ROWS_EVENT:
1026
ev = new Update_rows_log_event(buf, event_len, description_event);
1028
case DELETE_ROWS_EVENT:
1029
ev = new Delete_rows_log_event(buf, event_len, description_event);
1031
case TABLE_MAP_EVENT:
1032
ev = new Table_map_log_event(buf, event_len, description_event);
1034
case BEGIN_LOAD_QUERY_EVENT:
1035
ev = new Begin_load_query_log_event(buf, event_len, description_event);
1037
case EXECUTE_LOAD_QUERY_EVENT:
1038
ev= new Execute_load_query_log_event(buf, event_len, description_event);
1040
case INCIDENT_EVENT:
1041
ev = new Incident_log_event(buf, event_len, description_event);
1050
is_valid() are small event-specific sanity tests which are
1051
important; for example there are some my_malloc() in constructors
1052
(e.g. Query_log_event::Query_log_event(char*...)); when these
1053
my_malloc() fail we can't return an error out of the constructor
1054
(because constructor is "void") ; so instead we leave the pointer we
1055
wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
1056
Same for Format_description_log_event, member 'post_header_len'.
1058
if (!ev || !ev->is_valid())
1061
*error= "Found invalid event in binary log";
1067
inline Log_event::enum_skip_reason
1068
Log_event::continue_group(Relay_log_info *rli)
1070
if (rli->slave_skip_counter == 1)
1071
return Log_event::EVENT_SKIP_IGNORE;
1072
return Log_event::do_shall_skip(rli);
1075
/**************************************************************************
1076
Query_log_event methods
1077
**************************************************************************/
1080
This (which is used only for SHOW BINLOG EVENTS) could be updated to
1081
print SET @@session_var=. But this is not urgent, as SHOW BINLOG EVENTS is
1082
only an information, it does not produce suitable queries to replay (for
1083
example it does not print LOAD DATA INFILE).
1088
void Query_log_event::pack_info(Protocol *protocol)
1090
// TODO: show the catalog ??
1092
if (!(buf= (char*) my_malloc(9 + db_len + q_len, MYF(MY_WME))))
1095
if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
1098
pos= my_stpcpy(buf, "use `");
1099
memcpy(pos, db, db_len);
1100
pos= my_stpcpy(pos+db_len, "`; ");
1104
memcpy(pos, query, q_len);
1107
protocol->store(buf, pos-buf, &my_charset_bin);
1113
Utility function for the next method (Query_log_event::write()) .
1115
static void write_str_with_code_and_len(char **dst, const char *src,
1116
int len, uint32_t code)
1120
*((*dst)++)= (unsigned char) len;
1121
memcpy(*dst, src, len);
1127
Query_log_event::write().
1130
In this event we have to modify the header to have the correct
1131
EVENT_LEN_OFFSET as we don't yet know how many status variables we
1135
bool Query_log_event::write(IO_CACHE* file)
1138
@todo if catalog can be of length FN_REFLEN==512, then we are not
1139
replicating it correctly, since the length is stored in a byte
1142
unsigned char buf[QUERY_HEADER_LEN+
1143
1+4+ // code of flags2 and flags2
1144
1+8+ // code of sql_mode and sql_mode
1145
1+1+FN_REFLEN+ // code of catalog and catalog length and catalog
1146
1+4+ // code of autoinc and the 2 autoinc variables
1147
1+6+ // code of charset and charset
1148
1+1+MAX_TIME_ZONE_NAME_LENGTH+ // code of tz and tz length and tz name
1149
1+2+ // code of lc_time_names and lc_time_names_number
1150
1+2 // code of charset_database and charset_database_number
1151
], *start, *start_of_status;
1155
return 1; // Something wrong with event
1158
We want to store the thread id:
1159
(- as an information for the user when he reads the binlog)
1160
- if the query uses temporary table: for the slave SQL thread to know to
1161
which master connection the temp table belongs.
1162
Now imagine we (write()) are called by the slave SQL thread (we are
1163
logging a query executed by this thread; the slave runs with
1164
--log-slave-updates). Then this query will be logged with
1165
thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of
1166
the same name were created simultaneously on the master (in the master
1168
CREATE TEMPORARY TABLE t; (thread 1)
1169
CREATE TEMPORARY TABLE t; (thread 2)
1171
then in the slave's binlog there will be
1172
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1173
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1174
which is bad (same thread id!).
1176
To avoid this, we log the thread's thread id EXCEPT for the SQL
1177
slave thread for which we log the original (master's) thread id.
1178
Now this moves the bug: what happens if the thread id on the
1179
master was 10 and when the slave replicates the query, a
1180
connection number 10 is opened by a normal client on the slave,
1181
and updates a temp table of the same name? We get a problem
1182
again. To avoid this, in the handling of temp tables (sql_base.cc)
1183
we use thread_id AND server_id. TODO when this is merged into
1184
4.1: in 4.1, slave_proxy_id has been renamed to pseudo_thread_id
1185
and is a session variable: that's to make mysqlbinlog work with
1186
temp tables. We probably need to introduce
1188
SET PSEUDO_SERVER_ID
1189
for mysqlbinlog in 4.1. mysqlbinlog would print:
1190
SET PSEUDO_SERVER_ID=
1191
SET PSEUDO_THREAD_ID=
1192
for each query using temp tables.
1194
int4store(buf + Q_THREAD_ID_OFFSET, slave_proxy_id);
1195
int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
1196
buf[Q_DB_LEN_OFFSET] = (char) db_len;
1197
int2store(buf + Q_ERR_CODE_OFFSET, error_code);
1200
You MUST always write status vars in increasing order of code. This
1201
guarantees that a slightly older slave will be able to parse those he
1204
start_of_status= start= buf+QUERY_HEADER_LEN;
1207
*start++= Q_FLAGS2_CODE;
1208
int4store(start, flags2);
1211
if (sql_mode_inited)
1213
*start++= Q_SQL_MODE_CODE;
1214
int8store(start, (uint64_t)sql_mode);
1217
if (catalog_len) // i.e. this var is inited (false for 4.0 events)
1219
write_str_with_code_and_len((char **)(&start),
1220
catalog, catalog_len, Q_CATALOG_NZ_CODE);
1222
In 5.0.x where x<4 masters we used to store the end zero here. This was
1223
a waste of one byte so we don't do it in x>=4 masters. We change code to
1224
Q_CATALOG_NZ_CODE, because re-using the old code would make x<4 slaves
1225
of this x>=4 master segfault (expecting a zero when there is
1226
none). Remaining compatibility problems are: the older slave will not
1227
find the catalog; but it is will not crash, and it's not an issue
1228
that it does not find the catalog as catalogs were not used in these
1229
older MySQL versions (we store it in binlog and read it from relay log
1230
but do nothing useful with it). What is an issue is that the older slave
1231
will stop processing the Q_* blocks (and jumps to the db/query) as soon
1232
as it sees unknown Q_CATALOG_NZ_CODE; so it will not be able to read
1233
Q_AUTO_INCREMENT*, Q_CHARSET and so replication will fail silently in
1234
various ways. Documented that you should not mix alpha/beta versions if
1235
they are not exactly the same version, with example of 5.0.3->5.0.2 and
1236
5.0.4->5.0.3. If replication is from older to new, the new will
1237
recognize Q_CATALOG_CODE and have no problem.
1240
if (auto_increment_increment != 1 || auto_increment_offset != 1)
1242
*start++= Q_AUTO_INCREMENT;
1243
int2store(start, auto_increment_increment);
1244
int2store(start+2, auto_increment_offset);
1249
*start++= Q_CHARSET_CODE;
1250
memcpy(start, charset, 6);
1255
/* In the TZ sys table, column Name is of length 64 so this should be ok */
1256
assert(time_zone_len <= MAX_TIME_ZONE_NAME_LENGTH);
1257
*start++= Q_TIME_ZONE_CODE;
1258
*start++= time_zone_len;
1259
memcpy(start, time_zone_str, time_zone_len);
1260
start+= time_zone_len;
1262
if (lc_time_names_number)
1264
assert(lc_time_names_number <= 0xFFFF);
1265
*start++= Q_LC_TIME_NAMES_CODE;
1266
int2store(start, lc_time_names_number);
1269
if (charset_database_number)
1271
assert(charset_database_number <= 0xFFFF);
1272
*start++= Q_CHARSET_DATABASE_CODE;
1273
int2store(start, charset_database_number);
1277
Here there could be code like
1278
if (command-line-option-which-says-"log_this_variable" && inited)
1280
*start++= Q_THIS_VARIABLE_CODE;
1281
int4store(start, this_variable);
1286
/* Store length of status variables */
1287
status_vars_len= (uint) (start-start_of_status);
1288
assert(status_vars_len <= MAX_SIZE_LOG_EVENT_STATUS);
1289
int2store(buf + Q_STATUS_VARS_LEN_OFFSET, status_vars_len);
1292
Calculate length of whole event
1293
The "1" below is the \0 in the db's length
1295
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
1297
return (write_header(file, event_length) ||
1298
my_b_safe_write(file, (unsigned char*) buf, QUERY_HEADER_LEN) ||
1299
write_post_header_for_derived(file) ||
1300
my_b_safe_write(file, (unsigned char*) start_of_status,
1301
(uint) (start-start_of_status)) ||
1302
my_b_safe_write(file, (db) ? (unsigned char*) db : (unsigned char*)"", db_len + 1) ||
1303
my_b_safe_write(file, (unsigned char*) query, q_len)) ? 1 : 0;
1307
The simplest constructor that could possibly work. This is used for
1308
creating static objects that have a special meaning and are invisible
1311
Query_log_event::Query_log_event()
1312
:Log_event(), data_buf(0)
1319
Query_log_event::Query_log_event()
1320
thd_arg - thread handle
1321
query_arg - array of char representing the query
1322
query_length - size of the `query_arg' array
1323
using_trans - there is a modified transactional table
1324
suppress_use - suppress the generation of 'USE' statements
1325
killed_status_arg - an optional with default to THD::KILLED_NO_VALUE
1326
if the value is different from the default, the arg
1327
is set to the current thd->killed value.
1328
A caller might need to masquerade thd->killed with
1331
Creates an event for binlogging
1332
The value for local `killed_status' can be supplied by caller.
1334
Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
1335
ulong query_length, bool using_trans,
1337
THD::killed_state killed_status_arg)
1339
(thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
1341
(suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1343
data_buf(0), query(query_arg), catalog(thd_arg->catalog),
1344
db(thd_arg->db), q_len((uint32_t) query_length),
1345
thread_id(thd_arg->thread_id),
1346
/* save the original thread id; we already know the server id */
1347
slave_proxy_id(thd_arg->variables.pseudo_thread_id),
1348
flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1350
auto_increment_increment(thd_arg->variables.auto_increment_increment),
1351
auto_increment_offset(thd_arg->variables.auto_increment_offset),
1352
lc_time_names_number(thd_arg->variables.lc_time_names->number),
1353
charset_database_number(0)
1357
if (killed_status_arg == THD::KILLED_NO_VALUE)
1358
killed_status_arg= thd_arg->killed;
1361
(killed_status_arg == THD::NOT_KILLED) ?
1362
(thd_arg->is_error() ? thd_arg->main_da.sql_errno() : 0) :
1363
(thd_arg->killed_errno());
1366
exec_time = (ulong) (end_time - thd_arg->start_time);
1368
@todo this means that if we have no catalog, then it is replicated
1369
as an existing catalog of length zero. is that safe? /sven
1371
catalog_len = (catalog) ? (uint32_t) strlen(catalog) : 0;
1372
/* status_vars_len is set just before writing the event */
1373
db_len = (db) ? (uint32_t) strlen(db) : 0;
1374
if (thd_arg->variables.collation_database != thd_arg->db_charset)
1375
charset_database_number= thd_arg->variables.collation_database->number;
1378
If we don't use flags2 for anything else than options contained in
1379
thd_arg->options, it would be more efficient to flags2=thd_arg->options
1380
(OPTIONS_WRITTEN_TO_BIN_LOG would be used only at reading time).
1381
But it's likely that we don't want to use 32 bits for 3 bits; in the future
1382
we will probably want to reclaim the 29 bits. So we need the &.
1384
flags2= (uint32_t) (thd_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
1385
assert(thd_arg->variables.character_set_client->number < 256*256);
1386
assert(thd_arg->variables.collation_connection->number < 256*256);
1387
assert(thd_arg->variables.collation_server->number < 256*256);
1388
assert(thd_arg->variables.character_set_client->mbminlen == 1);
1389
int2store(charset, thd_arg->variables.character_set_client->number);
1390
int2store(charset+2, thd_arg->variables.collation_connection->number);
1391
int2store(charset+4, thd_arg->variables.collation_server->number);
1392
if (thd_arg->time_zone_used)
1395
Note that our event becomes dependent on the Time_zone object
1396
representing the time zone. Fortunately such objects are never deleted
1397
or changed during mysqld's lifetime.
1399
time_zone_len= thd_arg->variables.time_zone->get_name()->length();
1400
time_zone_str= thd_arg->variables.time_zone->get_name()->ptr();
1407
/* 2 utility functions for the next method */
1410
Read a string with length from memory.
1412
This function reads the string-with-length stored at
1413
<code>src</code> and extract the length into <code>*len</code> and
1414
a pointer to the start of the string into <code>*dst</code>. The
1415
string can then be copied using <code>memcpy()</code> with the
1416
number of bytes given in <code>*len</code>.
1418
@param src Pointer to variable holding a pointer to the memory to
1419
read the string from.
1420
@param dst Pointer to variable holding a pointer where the actual
1421
string starts. Starting from this position, the string
1422
can be copied using @c memcpy().
1423
@param len Pointer to variable where the length will be stored.
1424
@param end One-past-the-end of the memory where the string is
1427
@return Zero if the entire string can be copied successfully,
1428
@c UINT_MAX if the length could not be read from memory
1429
(that is, if <code>*src >= end</code>), otherwise the
1430
number of bytes that are missing to read the full
1431
string, which happends <code>*dst + *len >= end</code>.
1434
get_str_len_and_pointer(const Log_event::Byte **src,
1437
const Log_event::Byte *end)
1440
return -1; // Will be UINT_MAX in two-complement arithmetics
1441
uint32_t length= **src;
1444
if (*src + length >= end)
1445
return *src + length - end + 1; // Number of bytes missing
1446
*dst= (char *)*src + 1; // Will be copied later
1453
static void copy_str_and_move(const char **src,
1454
Log_event::Byte **dst,
1457
memcpy(*dst, *src, len);
1458
*src= (const char *)*dst;
1465
Macro to check that there is enough space to read from memory.
1467
@param PTR Pointer to memory
1468
@param END End of memory
1469
@param CNT Number of bytes that should be read.
1471
#define CHECK_SPACE(PTR,END,CNT) \
1473
assert((PTR) + (CNT) <= (END)); \
1474
if ((PTR) + (CNT) > (END)) { \
1482
This is used by the SQL slave thread to prepare the event before execution.
1484
Query_log_event::Query_log_event(const char* buf, uint32_t event_len,
1485
const Format_description_log_event
1487
Log_event_type event_type)
1488
:Log_event(buf, description_event), data_buf(0), query(NULL),
1489
db(NULL), catalog_len(0), status_vars_len(0),
1490
flags2_inited(0), sql_mode_inited(0), charset_inited(0),
1491
auto_increment_increment(1), auto_increment_offset(1),
1492
time_zone_len(0), lc_time_names_number(0), charset_database_number(0)
1496
uint8_t common_header_len, post_header_len;
1497
Log_event::Byte *start;
1498
const Log_event::Byte *end;
1501
common_header_len= description_event->common_header_len;
1502
post_header_len= description_event->post_header_len[event_type-1];
1505
We test if the event's length is sensible, and if so we compute data_len.
1506
We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
1507
We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
1509
if (event_len < (uint)(common_header_len + post_header_len))
1511
data_len = event_len - (common_header_len + post_header_len);
1512
buf+= common_header_len;
1514
slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
1515
exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
1516
db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
1517
error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
1520
5.0 format starts here.
1521
Depending on the format, we may or not have affected/warnings etc
1522
The remnent post-header to be parsed has length:
1524
tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
1527
status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
1529
Check if status variable length is corrupt and will lead to very
1530
wrong data. We could be even more strict and require data_len to
1531
be even bigger, but this will suffice to catch most corruption
1532
errors that can lead to a crash.
1534
if (status_vars_len > cmin(data_len, (uint32_t)MAX_SIZE_LOG_EVENT_STATUS))
1539
data_len-= status_vars_len;
1543
We have parsed everything we know in the post header for QUERY_EVENT,
1544
the rest of post header is either comes from older version MySQL or
1545
dedicated to derived events (e.g. Execute_load_query...)
1548
/* variable-part: the status vars; only in MySQL 5.0 */
1550
start= (Log_event::Byte*) (buf+post_header_len);
1551
end= (const Log_event::Byte*) (start+status_vars_len);
1552
for (const Log_event::Byte* pos= start; pos < end;)
1556
CHECK_SPACE(pos, end, 4);
1558
flags2= uint4korr(pos);
1561
case Q_SQL_MODE_CODE:
1563
CHECK_SPACE(pos, end, 8);
1565
sql_mode= (ulong) uint8korr(pos); // QQ: Fix when sql_mode is uint64_t
1569
case Q_CATALOG_NZ_CODE:
1570
if (get_str_len_and_pointer(&pos, &catalog, &catalog_len, end))
1576
case Q_AUTO_INCREMENT:
1577
CHECK_SPACE(pos, end, 4);
1578
auto_increment_increment= uint2korr(pos);
1579
auto_increment_offset= uint2korr(pos+2);
1582
case Q_TIME_ZONE_CODE:
1584
if (get_str_len_and_pointer(&pos, &time_zone_str, &time_zone_len, end))
1591
case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
1592
CHECK_SPACE(pos, end, 1);
1593
if ((catalog_len= *pos))
1594
catalog= (char*) pos+1; // Will be copied later
1595
CHECK_SPACE(pos, end, catalog_len + 2);
1596
pos+= catalog_len+2; // leap over end 0
1597
catalog_nz= 0; // catalog has end 0 in event
1599
case Q_LC_TIME_NAMES_CODE:
1600
CHECK_SPACE(pos, end, 2);
1601
lc_time_names_number= uint2korr(pos);
1604
case Q_CHARSET_DATABASE_CODE:
1605
CHECK_SPACE(pos, end, 2);
1606
charset_database_number= uint2korr(pos);
1610
/* That's why you must write status vars in growing order of code */
1611
pos= (const unsigned char*) end; // Break loop
1615
if (!(start= data_buf = (Log_event::Byte*) my_malloc(catalog_len + 1 +
1620
if (catalog_len) // If catalog is given
1623
@todo we should clean up and do only copy_str_and_move; it
1624
works for both cases. Then we can remove the catalog_nz
1627
if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
1628
copy_str_and_move(&catalog, &start, catalog_len);
1631
memcpy(start, catalog, catalog_len+1); // copy end 0
1632
catalog= (const char *)start;
1633
start+= catalog_len+1;
1637
copy_str_and_move(&time_zone_str, &start, time_zone_len);
1640
if time_zone_len or catalog_len are 0, then time_zone and catalog
1641
are uninitialized at this point. shouldn't they point to the
1642
zero-length null-terminated strings we allocated space for in the
1643
my_alloc call above? /sven
1646
/* A 2nd variable part; this is common to all versions */
1647
memcpy(start, end, data_len); // Copy db and query
1648
start[data_len]= '\0'; // End query with \0 (For safetly)
1650
query= (char *)(start + db_len + 1);
1651
q_len= data_len - db_len -1;
1657
Query_log_event::do_apply_event()
1659
int Query_log_event::do_apply_event(Relay_log_info const *rli)
1661
return do_apply_event(rli, query, q_len);
1667
Compare the values of "affected rows" around here. Something
1670
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1672
sql_print_error("Slave: did not get the expected number of affected \
1673
rows running query from master - expected %d, got %d (this numbers \
1674
should have matched modulo 4294967296).", 0, ...);
1675
thd->query_error = 1;
1678
We may also want an option to tell the slave to ignore "affected"
1679
mismatch. This mismatch could be implemented with a new ER_ code, and
1680
to ignore it you would use --slave-skip-errors...
1682
int Query_log_event::do_apply_event(Relay_log_info const *rli,
1683
const char *query_arg, uint32_t q_len_arg)
1686
int expected_error,actual_error= 0;
1688
Colleagues: please never free(thd->catalog) in MySQL. This would
1689
lead to bugs as here thd->catalog is a part of an alloced block,
1690
not an entire alloced block (see
1691
Query_log_event::do_apply_event()). Same for thd->db. Thank
1694
thd->catalog= catalog_len ? (char *) catalog : (char *)"";
1695
new_db.length= db_len;
1696
new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
1697
thd->set_db(new_db.str, new_db.length); /* allocates a copy of 'db' */
1698
thd->variables.auto_increment_increment= auto_increment_increment;
1699
thd->variables.auto_increment_offset= auto_increment_offset;
1702
InnoDB internally stores the master log position it has executed so far,
1703
i.e. the position just after the COMMIT event.
1704
When InnoDB will want to store, the positions in rli won't have
1705
been updated yet, so group_master_log_* will point to old BEGIN
1706
and event_master_log* will point to the beginning of current COMMIT.
1707
But log_pos of the COMMIT Query event is what we want, i.e. the pos of the
1708
END of the current log event (COMMIT). We save it in rli so that InnoDB can
1711
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
1713
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
1714
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
1717
Note: We do not need to execute reset_one_shot_variables() if this
1719
Reason: The db stored in binlog events is the same for SET and for
1720
its companion query. If the SET is ignored because of
1721
db_ok(), the companion query will also be ignored, and if
1722
the companion query is ignored in the db_ok() test of
1723
::do_apply_event(), then the companion SET also have so
1724
we don't need to reset_one_shot_variables().
1726
if (rpl_filter->db_ok(thd->db))
1728
thd->set_time((time_t)when);
1729
thd->query_length= q_len_arg;
1730
thd->query= (char*)query_arg;
1731
pthread_mutex_lock(&LOCK_thread_count);
1732
thd->query_id = next_query_id();
1733
pthread_mutex_unlock(&LOCK_thread_count);
1734
thd->variables.pseudo_thread_id= thread_id; // for temp tables
1736
if (ignored_error_code((expected_error= error_code)) ||
1737
!check_expected_error(thd,rli,expected_error))
1741
all bits of thd->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
1742
must take their value from flags2.
1744
thd->options= flags2|(thd->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
1747
String tmp(time_zone_str, time_zone_len, &my_charset_bin);
1748
if (!(thd->variables.time_zone= my_tz_find(thd, &tmp)))
1750
my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr());
1751
thd->variables.time_zone= global_system_variables.time_zone;
1752
goto compare_errors;
1755
if (lc_time_names_number)
1757
if (!(thd->variables.lc_time_names=
1758
my_locale_by_number(lc_time_names_number)))
1760
my_printf_error(ER_UNKNOWN_ERROR,
1761
"Unknown locale: '%d'", MYF(0), lc_time_names_number);
1762
thd->variables.lc_time_names= &my_locale_en_US;
1763
goto compare_errors;
1767
thd->variables.lc_time_names= &my_locale_en_US;
1768
if (charset_database_number)
1770
const CHARSET_INFO *cs;
1771
if (!(cs= get_charset(charset_database_number, MYF(0))))
1774
int10_to_str((int) charset_database_number, buf, -10);
1775
my_error(ER_UNKNOWN_COLLATION, MYF(0), buf);
1776
goto compare_errors;
1778
thd->variables.collation_database= cs;
1781
thd->variables.collation_database= thd->db_charset;
1783
/* Execute the query (note that we bypass dispatch_command()) */
1784
const char* found_semicolon= NULL;
1785
mysql_parse(thd, thd->query, thd->query_length, &found_semicolon);
1786
log_slow_statement(thd);
1791
The query got a really bad error on the master (thread killed etc),
1792
which could be inconsistent. Parse it to test the table names: if the
1793
replicate-*-do|ignore-table rules say "this query must be ignored" then
1794
we exit gracefully; otherwise we warn about the bad error and tell DBA
1797
if (mysql_test_parse_for_slave(thd, thd->query, thd->query_length))
1798
clear_all_errors(thd, const_cast<Relay_log_info*>(rli)); /* Can ignore query */
1801
rli->report(ERROR_LEVEL, expected_error,
1802
_("Query partially completed on the master "
1803
"(error on master: %d) and was aborted. There is a "
1804
"chance that your master is inconsistent at this "
1805
"point. If you are sure that your master is ok, run "
1806
"this query manually on the slave and then restart the "
1807
"slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1; "
1808
"START SLAVE; . Query: '%s'"),
1809
expected_error, thd->query);
1810
thd->is_slave_error= 1;
1818
If we expected a non-zero error code, and we don't get the same error
1819
code, and none of them should be ignored.
1821
actual_error= thd->is_error() ? thd->main_da.sql_errno() : 0;
1822
if ((expected_error != actual_error) &&
1824
!ignored_error_code(actual_error) &&
1825
!ignored_error_code(expected_error))
1827
rli->report(ERROR_LEVEL, 0,
1828
_("Query caused differenxt errors on master and slave.\n"
1829
"Error on master: '%s' (%d), Error on slave: '%s' (%d).\n"
1830
"Default database: '%s'. Query: '%s'"),
1831
ER_SAFE(expected_error),
1833
actual_error ? thd->main_da.message() : _("no error"),
1835
print_slave_db_safe(db), query_arg);
1836
thd->is_slave_error= 1;
1839
If we get the same error code as expected, or they should be ignored.
1841
else if (expected_error == actual_error ||
1842
ignored_error_code(actual_error))
1844
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
1845
thd->killed= THD::NOT_KILLED;
1848
Other cases: mostly we expected no error and get one.
1850
else if (thd->is_slave_error || thd->is_fatal_error)
1852
rli->report(ERROR_LEVEL, actual_error,
1853
_("Error '%s' on query. Default database: '%s'. Query: '%s'"),
1854
(actual_error ? thd->main_da.message() :
1855
_("unexpected success or fatal error")),
1856
print_slave_db_safe(thd->db), query_arg);
1857
thd->is_slave_error= 1;
1861
TODO: compare the values of "affected rows" around here. Something
1863
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1865
sql_print_error("Slave: did not get the expected number of affected \
1866
rows running query from master - expected %d, got %d (this numbers \
1867
should have matched modulo 4294967296).", 0, ...);
1868
thd->is_slave_error = 1;
1870
We may also want an option to tell the slave to ignore "affected"
1871
mismatch. This mismatch could be implemented with a new ER_ code, and
1872
to ignore it you would use --slave-skip-errors...
1874
To do the comparison we need to know the value of "affected" which the
1875
above mysql_parse() computed. And we need to know the value of
1876
"affected" in the master's binlog. Both will be implemented later. The
1877
important thing is that we now have the format ready to log the values
1878
of "affected" in the binlog. So we can release 5.0.0 before effectively
1879
logging "affected" and effectively comparing it.
1881
} /* End of if (db_ok(... */
1884
pthread_mutex_lock(&LOCK_thread_count);
1886
Probably we have set thd->query, thd->db, thd->catalog to point to places
1887
in the data_buf of this event. Now the event is going to be deleted
1888
probably, so data_buf will be freed, so the thd->... listed above will be
1889
pointers to freed memory.
1890
So we must set them to 0, so that those bad pointers values are not later
1891
used. Note that "cleanup" queries like automatic DROP TEMPORARY Table
1892
don't suffer from these assignments to 0 as DROP TEMPORARY
1893
Table uses the db.table syntax.
1896
thd->set_db(NULL, 0); /* will free the current database */
1897
thd->query= 0; // just to be sure
1898
thd->query_length= 0;
1899
pthread_mutex_unlock(&LOCK_thread_count);
1900
close_thread_tables(thd);
1902
As a disk space optimization, future masters will not log an event for
1903
LAST_INSERT_ID() if that function returned 0 (and thus they will be able
1904
to replace the THD::stmt_depends_on_first_successful_insert_id_in_prev_stmt
1905
variable by (THD->first_successful_insert_id_in_prev_stmt > 0) ; with the
1906
resetting below we are ready to support that.
1908
thd->first_successful_insert_id_in_prev_stmt_for_binlog= 0;
1909
thd->first_successful_insert_id_in_prev_stmt= 0;
1910
thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
1911
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
1912
return thd->is_slave_error;
1915
int Query_log_event::do_update_pos(Relay_log_info *rli)
1918
Note that we will not increment group* positions if we are just
1919
after a SET ONE_SHOT, because SET ONE_SHOT should not be separated
1920
from its following updating query.
1922
if (thd->one_shot_set)
1924
rli->inc_event_relay_log_pos();
1928
return Log_event::do_update_pos(rli);
1932
Log_event::enum_skip_reason
1933
Query_log_event::do_shall_skip(Relay_log_info *rli)
1935
assert(query && q_len > 0);
1937
if (rli->slave_skip_counter > 0)
1939
if (strcmp("BEGIN", query) == 0)
1941
thd->options|= OPTION_BEGIN;
1942
return(Log_event::continue_group(rli));
1945
if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
1947
thd->options&= ~OPTION_BEGIN;
1948
return(Log_event::EVENT_SKIP_COUNT);
1951
return(Log_event::do_shall_skip(rli));
1955
/**************************************************************************
1956
Start_log_event_v3 methods
1957
**************************************************************************/
1959
Start_log_event_v3::Start_log_event_v3()
1960
:Log_event(), created(0), binlog_version(BINLOG_VERSION),
1961
artificial_event(0), dont_set_created(0)
1963
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
1967
Start_log_event_v3::pack_info()
1970
void Start_log_event_v3::pack_info(Protocol *protocol)
1972
char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
1973
pos= my_stpcpy(buf, "Server ver: ");
1974
pos= my_stpcpy(pos, server_version);
1975
pos= my_stpcpy(pos, ", Binlog ver: ");
1976
pos= int10_to_str(binlog_version, pos, 10);
1977
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
1982
Start_log_event_v3::Start_log_event_v3()
1985
Start_log_event_v3::Start_log_event_v3(const char* buf,
1986
const Format_description_log_event
1988
:Log_event(buf, description_event)
1990
buf+= description_event->common_header_len;
1991
binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET);
1992
memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
1994
// prevent overrun if log is corrupted on disk
1995
server_version[ST_SERVER_VER_LEN-1]= 0;
1996
created= uint4korr(buf+ST_CREATED_OFFSET);
1997
/* We use log_pos to mark if this was an artificial event or not */
1998
artificial_event= (log_pos == 0);
1999
dont_set_created= 1;
2004
Start_log_event_v3::write()
2007
bool Start_log_event_v3::write(IO_CACHE* file)
2009
char buff[START_V3_HEADER_LEN];
2010
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
2011
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
2012
if (!dont_set_created)
2013
created= when= get_time();
2014
int4store(buff + ST_CREATED_OFFSET,created);
2015
return (write_header(file, sizeof(buff)) ||
2016
my_b_safe_write(file, (unsigned char*) buff, sizeof(buff)));
2021
Start_log_event_v3::do_apply_event() .
2025
- To handle the case where the master died without having time to write
2026
DROP TEMPORARY Table, DO RELEASE_LOCK (prepared statements' deletion is
2027
TODO), we clean up all temporary tables that we got, if we are sure we
2031
- Remove all active user locks.
2032
Guilhem 2003-06: this is true but not urgent: the worst it can cause is
2033
the use of a bit of memory for a user lock which will not be used
2034
anymore. If the user lock is later used, the old one will be released. In
2035
other words, no deadlock problem.
2038
int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
2040
switch (binlog_version)
2045
This can either be 4.x (then a Start_log_event_v3 is only at master
2046
startup so we are sure the master has restarted and cleared his temp
2047
tables; the event always has 'created'>0) or 5.0 (then we have to test
2052
close_temporary_tables(thd);
2053
cleanup_load_tmpdir();
2058
Now the older formats; in that case load_tmpdir is cleaned up by the I/O
2062
if (strncmp(rli->relay_log.description_event_for_exec->server_version,
2063
"3.23.57",7) >= 0 && created)
2066
Can distinguish, based on the value of 'created': this event was
2067
generated at master startup.
2069
close_temporary_tables(thd);
2072
Otherwise, can't distinguish a Start_log_event generated at
2073
master startup and one generated by master FLUSH LOGS, so cannot
2074
be sure temp tables have to be dropped. So do nothing.
2078
/* this case is impossible */
2084
/***************************************************************************
2085
Format_description_log_event methods
2086
****************************************************************************/
2089
Format_description_log_event 1st ctor.
2091
Ctor. Can be used to create the event to write to the binary log (when the
2092
server starts or when FLUSH LOGS), or to create artificial events to parse
2093
binlogs from MySQL 3.23 or 4.x.
2094
When in a client, only the 2nd use is possible.
2096
@param binlog_version the binlog version for which we want to build
2097
an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x
2098
x>=2 and 4.1) or 4 (MySQL 5.0). Note that the
2099
old 4.0 (binlog version 2) is not supported;
2100
it should not be used for replication with
2104
Format_description_log_event::
2105
Format_description_log_event(uint8_t binlog_ver, const char* server_ver)
2106
:Start_log_event_v3(), event_type_permutation(0)
2108
binlog_version= binlog_ver;
2109
switch (binlog_ver) {
2110
case 4: /* MySQL 5.0 */
2111
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
2112
common_header_len= LOG_EVENT_HEADER_LEN;
2113
number_of_event_types= LOG_EVENT_TYPES;
2114
/* we'll catch my_malloc() error in is_valid() */
2115
post_header_len=(uint8_t*) my_malloc(number_of_event_types*sizeof(uint8_t),
2118
This long list of assignments is not beautiful, but I see no way to
2119
make it nicer, as the right members are #defines, not array members, so
2120
it's impossible to write a loop.
2122
if (post_header_len)
2124
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2125
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN;
2126
post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
2127
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2128
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2129
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2130
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2131
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2132
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
2133
post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
2134
post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN;
2135
post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2136
post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2137
post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2138
post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1];
2139
post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
2140
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
2141
post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
2146
case 3: /* 4.0.x x>=2 */
2148
We build an artificial (i.e. not sent by the master) event, which
2149
describes what those old master versions send.
2152
my_stpcpy(server_version, server_ver ? server_ver : "3.23");
2154
my_stpcpy(server_version, server_ver ? server_ver : "4.0");
2155
common_header_len= binlog_ver==1 ? OLD_HEADER_LEN :
2156
LOG_EVENT_MINIMAL_HEADER_LEN;
2158
The first new event in binlog version 4 is Format_desc. So any event type
2159
after that does not exist in older versions. We use the events known by
2160
version 3, even if version 1 had only a subset of them (this is not a
2161
problem: it uses a few bytes for nothing but unifies code; it does not
2162
make the slave detect less corruptions).
2164
number_of_event_types= FORMAT_DESCRIPTION_EVENT - 1;
2165
post_header_len=(uint8_t*) my_malloc(number_of_event_types*sizeof(uint8_t),
2167
if (post_header_len)
2169
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2170
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_MINIMAL_LEN;
2171
post_header_len[STOP_EVENT-1]= 0;
2172
post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN;
2173
post_header_len[INTVAR_EVENT-1]= 0;
2174
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2175
post_header_len[SLAVE_EVENT-1]= 0;
2176
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2177
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2178
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2179
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2180
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
2181
post_header_len[RAND_EVENT-1]= 0;
2182
post_header_len[USER_VAR_EVENT-1]= 0;
2185
default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
2186
post_header_len= 0; /* will make is_valid() fail */
2189
calc_server_version_split();
2194
The problem with this constructor is that the fixed header may have a
2195
length different from this version, but we don't know this length as we
2196
have not read the Format_description_log_event which says it, yet. This
2197
length is in the post-header of the event, but we don't know where the
2200
So this type of event HAS to:
2201
- either have the header's length at the beginning (in the header, at a
2202
fixed position which will never be changed), not in the post-header. That
2203
would make the header be "shifted" compared to other events.
2204
- or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future
2205
versions, so that we know for sure.
2207
I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because
2208
it is sent before Format_description_log_event).
2211
Format_description_log_event::
2212
Format_description_log_event(const char* buf,
2215
Format_description_log_event*
2217
:Start_log_event_v3(buf, description_event), event_type_permutation(0)
2219
buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
2220
if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
2221
return; /* sanity check */
2222
number_of_event_types=
2223
event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1);
2224
/* If alloc fails, we'll detect it in is_valid() */
2225
post_header_len= (uint8_t*) my_memdup((unsigned char*)buf+ST_COMMON_HEADER_LEN_OFFSET+1,
2226
number_of_event_types*
2227
sizeof(*post_header_len), MYF(0));
2228
calc_server_version_split();
2231
In some previous versions, the events were given other event type
2232
id numbers than in the present version. When replicating from such
2233
a version, we therefore set up an array that maps those id numbers
2234
to the id numbers of the present server.
2236
If post_header_len is null, it means malloc failed, and is_valid
2237
will fail, so there is no need to do anything.
2239
The trees in which events have wrong id's are:
2241
mysql-5.1-wl1012.old mysql-5.1-wl2325-5.0-drop6p13-alpha
2242
mysql-5.1-wl2325-5.0-drop6 mysql-5.1-wl2325-5.0
2243
mysql-5.1-wl2325-no-dd
2245
(this was found by grepping for two lines in sequence where the
2246
first matches "FORMAT_DESCRIPTION_EVENT," and the second matches
2247
"TABLE_MAP_EVENT," in log_event.h in all trees)
2249
In these trees, the following server_versions existed since
2250
TABLE_MAP_EVENT was introduced:
2252
5.1.1-a_drop5p3 5.1.1-a_drop5p4 5.1.1-alpha
2253
5.1.2-a_drop5p10 5.1.2-a_drop5p11 5.1.2-a_drop5p12
2254
5.1.2-a_drop5p13 5.1.2-a_drop5p14 5.1.2-a_drop5p15
2255
5.1.2-a_drop5p16 5.1.2-a_drop5p16b 5.1.2-a_drop5p16c
2256
5.1.2-a_drop5p17 5.1.2-a_drop5p4 5.1.2-a_drop5p5
2257
5.1.2-a_drop5p6 5.1.2-a_drop5p7 5.1.2-a_drop5p8
2258
5.1.2-a_drop5p9 5.1.3-a_drop5p17 5.1.3-a_drop5p17b
2259
5.1.3-a_drop5p17c 5.1.4-a_drop5p18 5.1.4-a_drop5p19
2260
5.1.4-a_drop5p20 5.1.4-a_drop6p0 5.1.4-a_drop6p1
2261
5.1.4-a_drop6p2 5.1.5-a_drop5p20 5.2.0-a_drop6p3
2262
5.2.0-a_drop6p4 5.2.0-a_drop6p5 5.2.0-a_drop6p6
2263
5.2.1-a_drop6p10 5.2.1-a_drop6p11 5.2.1-a_drop6p12
2264
5.2.1-a_drop6p6 5.2.1-a_drop6p7 5.2.1-a_drop6p8
2265
5.2.2-a_drop6p13 5.2.2-a_drop6p13-alpha 5.2.2-a_drop6p13b
2268
(this was found by grepping for "mysql," in all historical
2269
versions of configure.in in the trees listed above).
2271
There are 5.1.1-alpha versions that use the new event id's, so we
2272
do not test that version string. So replication from 5.1.1-alpha
2273
with the other event id's to a new version does not work.
2274
Moreover, we can safely ignore the part after drop[56]. This
2275
allows us to simplify the big list above to the following regexes:
2277
5\.1\.[1-5]-a_drop5.*
2279
5\.2\.[0-2]-a_drop6.*
2281
This is what we test for in the 'if' below.
2283
if (post_header_len &&
2284
server_version[0] == '5' && server_version[1] == '.' &&
2285
server_version[3] == '.' &&
2286
strncmp(server_version + 5, "-a_drop", 7) == 0 &&
2287
((server_version[2] == '1' &&
2288
server_version[4] >= '1' && server_version[4] <= '5' &&
2289
server_version[12] == '5') ||
2290
(server_version[2] == '1' &&
2291
server_version[4] == '4' &&
2292
server_version[12] == '6') ||
2293
(server_version[2] == '2' &&
2294
server_version[4] >= '0' && server_version[4] <= '2' &&
2295
server_version[12] == '6')))
2297
if (number_of_event_types != 22)
2299
/* this makes is_valid() return false. */
2300
free(post_header_len);
2301
post_header_len= NULL;
2304
static const uint8_t perm[23]=
2306
UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
2307
INTVAR_EVENT, LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
2308
APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
2310
RAND_EVENT, USER_VAR_EVENT,
2311
FORMAT_DESCRIPTION_EVENT,
2313
PRE_GA_WRITE_ROWS_EVENT,
2314
PRE_GA_UPDATE_ROWS_EVENT,
2315
PRE_GA_DELETE_ROWS_EVENT,
2317
BEGIN_LOAD_QUERY_EVENT,
2318
EXECUTE_LOAD_QUERY_EVENT,
2320
event_type_permutation= perm;
2322
Since we use (permuted) event id's to index the post_header_len
2323
array, we need to permute the post_header_len array too.
2325
uint8_t post_header_len_temp[23];
2326
for (int i= 1; i < 23; i++)
2327
post_header_len_temp[perm[i] - 1]= post_header_len[i - 1];
2328
for (int i= 0; i < 22; i++)
2329
post_header_len[i] = post_header_len_temp[i];
2334
bool Format_description_log_event::write(IO_CACHE* file)
2337
We don't call Start_log_event_v3::write() because this would make 2
2340
unsigned char buff[FORMAT_DESCRIPTION_HEADER_LEN];
2341
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
2342
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
2343
if (!dont_set_created)
2344
created= when= get_time();
2345
int4store(buff + ST_CREATED_OFFSET,created);
2346
buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
2347
memcpy(buff+ST_COMMON_HEADER_LEN_OFFSET+1, post_header_len,
2349
return (write_header(file, sizeof(buff)) ||
2350
my_b_safe_write(file, buff, sizeof(buff)));
2354
int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
2357
As a transaction NEVER spans on 2 or more binlogs:
2358
if we have an active transaction at this point, the master died
2359
while writing the transaction to the binary log, i.e. while
2360
flushing the binlog cache to the binlog. XA guarantees that master has
2361
rolled back. So we roll back.
2362
Note: this event could be sent by the master to inform us of the
2363
format of its binlog; in other words maybe it is not at its
2364
original place when it comes to us; we'll know this by checking
2365
log_pos ("artificial" events have log_pos == 0).
2367
if (!artificial_event && created && thd->transaction.all.ha_list)
2369
/* This is not an error (XA is safe), just an information */
2370
rli->report(INFORMATION_LEVEL, 0,
2371
_("Rolling back unfinished transaction (no COMMIT "
2372
"or ROLLBACK in relay log). A probable cause is that "
2373
"the master died while writing the transaction to "
2374
"its binary log, thus rolled back too."));
2375
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 1);
2378
If this event comes from ourselves, there is no cleaning task to
2379
perform, we don't call Start_log_event_v3::do_apply_event()
2380
(this was just to update the log's description event).
2382
if (server_id != (uint32_t) ::server_id)
2385
If the event was not requested by the slave i.e. the master sent
2386
it while the slave asked for a position >4, the event will make
2387
rli->group_master_log_pos advance. Say that the slave asked for
2388
position 1000, and the Format_desc event's end is 96. Then in
2389
the beginning of replication rli->group_master_log_pos will be
2390
0, then 96, then jump to first really asked event (which is
2391
>96). So this is ok.
2393
return(Start_log_event_v3::do_apply_event(rli));
2398
int Format_description_log_event::do_update_pos(Relay_log_info *rli)
2400
/* save the information describing this binlog */
2401
delete rli->relay_log.description_event_for_exec;
2402
rli->relay_log.description_event_for_exec= this;
2404
if (server_id == (uint32_t) ::server_id)
2407
We only increase the relay log position if we are skipping
2408
events and do not touch any group_* variables, nor flush the
2409
relay log info. If there is a crash, we will have to re-skip
2410
the events again, but that is a minor issue.
2412
If we do not skip stepping the group log position (and the
2413
server id was changed when restarting the server), it might well
2414
be that we start executing at a position that is invalid, e.g.,
2415
at a Rows_log_event or a Query_log_event preceeded by a
2416
Intvar_log_event instead of starting at a Table_map_log_event or
2417
the Intvar_log_event respectively.
2419
rli->inc_event_relay_log_pos();
2424
return Log_event::do_update_pos(rli);
2428
Log_event::enum_skip_reason
2429
Format_description_log_event::do_shall_skip(Relay_log_info *rli __attribute__((unused)))
2431
return Log_event::EVENT_SKIP_NOT;
2436
Splits the event's 'server_version' string into three numeric pieces stored
2437
into 'server_version_split':
2438
X.Y.Zabc (X,Y,Z numbers, a not a digit) -> {X,Y,Z}
2441
'server_version_split' is then used for lookups to find if the server which
2442
created this event has some known bug.
2444
void Format_description_log_event::calc_server_version_split()
2446
char *p= server_version, *r;
2448
for (uint32_t i= 0; i<=2; i++)
2450
number= strtoul(p, &r, 10);
2451
server_version_split[i]= (unsigned char)number;
2452
assert(number < 256); // fit in unsigned char
2454
assert(!((i == 0) && (*r != '.'))); // should be true in practice
2456
p++; // skip the dot
2461
/**************************************************************************
2462
Load_log_event methods
2463
General note about Load_log_event: the binlogging of LOAD DATA INFILE is
2464
going to be changed in 5.0 (or maybe in 5.1; not decided yet).
2465
However, the 5.0 slave could still have to read such events (from a 4.x
2466
master), convert them (which just means maybe expand the header, when 5.0
2467
servers have a UID in events) (remember that whatever is after the header
2468
will be like in 4.x, as this event's format is not modified in 5.0 as we
2469
will use new types of events to log the new LOAD DATA INFILE features).
2470
To be able to read/convert, we just need to not assume that the common
2471
header is of length LOG_EVENT_HEADER_LEN (we must use the description
2473
Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE
2474
between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the
2475
positions displayed in SHOW SLAVE STATUS then are fine too).
2476
**************************************************************************/
2479
Load_log_event::pack_info()
2482
uint32_t Load_log_event::get_query_buffer_length()
2485
5 + db_len + 3 + // "use DB; "
2486
18 + fname_len + 2 + // "LOAD DATA INFILE 'file''"
2488
9 + // " REPLACE or IGNORE "
2489
13 + table_name_len*2 + // "INTO Table `table`"
2490
21 + sql_ex.field_term_len*4 + 2 + // " FIELDS TERMINATED BY 'str'"
2491
23 + sql_ex.enclosed_len*4 + 2 + // " OPTIONALLY ENCLOSED BY 'str'"
2492
12 + sql_ex.escaped_len*4 + 2 + // " ESCAPED BY 'str'"
2493
21 + sql_ex.line_term_len*4 + 2 + // " LINES TERMINATED BY 'str'"
2494
19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'"
2495
15 + 22 + // " IGNORE xxx LINES"
2496
3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)"
2500
void Load_log_event::print_query(bool need_db, char *buf,
2501
char **end, char **fn_start, char **fn_end)
2505
if (need_db && db && db_len)
2507
pos= my_stpcpy(pos, "use `");
2508
memcpy(pos, db, db_len);
2509
pos= my_stpcpy(pos+db_len, "`; ");
2512
pos= my_stpcpy(pos, "LOAD DATA ");
2517
if (check_fname_outside_temp_buf())
2518
pos= my_stpcpy(pos, "LOCAL ");
2519
pos= my_stpcpy(pos, "INFILE '");
2520
memcpy(pos, fname, fname_len);
2521
pos= my_stpcpy(pos+fname_len, "' ");
2523
if (sql_ex.opt_flags & REPLACE_FLAG)
2524
pos= my_stpcpy(pos, " REPLACE ");
2525
else if (sql_ex.opt_flags & IGNORE_FLAG)
2526
pos= my_stpcpy(pos, " IGNORE ");
2528
pos= my_stpcpy(pos ,"INTO");
2533
pos= my_stpcpy(pos ," Table `");
2534
memcpy(pos, table_name, table_name_len);
2535
pos+= table_name_len;
2537
/* We have to create all optinal fields as the default is not empty */
2538
pos= my_stpcpy(pos, "` FIELDS TERMINATED BY ");
2539
pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len);
2540
if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
2541
pos= my_stpcpy(pos, " OPTIONALLY ");
2542
pos= my_stpcpy(pos, " ENCLOSED BY ");
2543
pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len);
2545
pos= my_stpcpy(pos, " ESCAPED BY ");
2546
pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
2548
pos= my_stpcpy(pos, " LINES TERMINATED BY ");
2549
pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
2550
if (sql_ex.line_start_len)
2552
pos= my_stpcpy(pos, " STARTING BY ");
2553
pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
2556
if ((long) skip_lines > 0)
2558
pos= my_stpcpy(pos, " IGNORE ");
2559
pos= int64_t10_to_str((int64_t) skip_lines, pos, 10);
2560
pos= my_stpcpy(pos," LINES ");
2566
const char *field= fields;
2567
pos= my_stpcpy(pos, " (");
2568
for (i = 0; i < num_fields; i++)
2575
memcpy(pos, field, field_lens[i]);
2576
pos+= field_lens[i];
2577
field+= field_lens[i] + 1;
2586
void Load_log_event::pack_info(Protocol *protocol)
2590
if (!(buf= (char*) my_malloc(get_query_buffer_length(), MYF(MY_WME))))
2592
print_query(true, buf, &end, 0, 0);
2593
protocol->store(buf, end-buf, &my_charset_bin);
2599
Load_log_event::write_data_header()
2602
bool Load_log_event::write_data_header(IO_CACHE* file)
2604
char buf[LOAD_HEADER_LEN];
2605
int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
2606
int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
2607
int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
2608
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
2609
buf[L_DB_LEN_OFFSET] = (char)db_len;
2610
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
2611
return my_b_safe_write(file, (unsigned char*)buf, LOAD_HEADER_LEN) != 0;
2616
Load_log_event::write_data_body()
2619
bool Load_log_event::write_data_body(IO_CACHE* file)
2621
if (sql_ex.write_data(file))
2623
if (num_fields && fields && field_lens)
2625
if (my_b_safe_write(file, (unsigned char*)field_lens, num_fields) ||
2626
my_b_safe_write(file, (unsigned char*)fields, field_block_len))
2629
return (my_b_safe_write(file, (unsigned char*)table_name, table_name_len + 1) ||
2630
my_b_safe_write(file, (unsigned char*)db, db_len + 1) ||
2631
my_b_safe_write(file, (unsigned char*)fname, fname_len));
2636
Load_log_event::Load_log_event()
2639
Load_log_event::Load_log_event(THD *thd_arg, sql_exchange *ex,
2640
const char *db_arg, const char *table_name_arg,
2641
List<Item> &fields_arg,
2642
enum enum_duplicates handle_dup,
2643
bool ignore, bool using_trans)
2645
thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
2647
thread_id(thd_arg->thread_id),
2648
slave_proxy_id(thd_arg->variables.pseudo_thread_id),
2649
num_fields(0),fields(0),
2650
field_lens(0),field_block_len(0),
2651
table_name(table_name_arg ? table_name_arg : ""),
2652
db(db_arg), fname(ex->file_name), local_fname(false)
2656
exec_time = (ulong) (end_time - thd_arg->start_time);
2657
/* db can never be a zero pointer in 4.0 */
2658
db_len = (uint32_t) strlen(db);
2659
table_name_len = (uint32_t) strlen(table_name);
2660
fname_len = (fname) ? (uint) strlen(fname) : 0;
2661
sql_ex.field_term = (char*) ex->field_term->ptr();
2662
sql_ex.field_term_len = (uint8_t) ex->field_term->length();
2663
sql_ex.enclosed = (char*) ex->enclosed->ptr();
2664
sql_ex.enclosed_len = (uint8_t) ex->enclosed->length();
2665
sql_ex.line_term = (char*) ex->line_term->ptr();
2666
sql_ex.line_term_len = (uint8_t) ex->line_term->length();
2667
sql_ex.line_start = (char*) ex->line_start->ptr();
2668
sql_ex.line_start_len = (uint8_t) ex->line_start->length();
2669
sql_ex.escaped = (char*) ex->escaped->ptr();
2670
sql_ex.escaped_len = (uint8_t) ex->escaped->length();
2671
sql_ex.opt_flags = 0;
2672
sql_ex.cached_new_format = -1;
2675
sql_ex.opt_flags|= DUMPFILE_FLAG;
2676
if (ex->opt_enclosed)
2677
sql_ex.opt_flags|= OPT_ENCLOSED_FLAG;
2679
sql_ex.empty_flags= 0;
2681
switch (handle_dup) {
2683
sql_ex.opt_flags|= REPLACE_FLAG;
2685
case DUP_UPDATE: // Impossible here
2690
sql_ex.opt_flags|= IGNORE_FLAG;
2692
if (!ex->field_term->length())
2693
sql_ex.empty_flags |= FIELD_TERM_EMPTY;
2694
if (!ex->enclosed->length())
2695
sql_ex.empty_flags |= ENCLOSED_EMPTY;
2696
if (!ex->line_term->length())
2697
sql_ex.empty_flags |= LINE_TERM_EMPTY;
2698
if (!ex->line_start->length())
2699
sql_ex.empty_flags |= LINE_START_EMPTY;
2700
if (!ex->escaped->length())
2701
sql_ex.empty_flags |= ESCAPED_EMPTY;
2703
skip_lines = ex->skip_lines;
2705
List_iterator<Item> li(fields_arg);
2706
field_lens_buf.length(0);
2707
fields_buf.length(0);
2709
while ((item = li++))
2712
unsigned char len = (unsigned char) strlen(item->name);
2713
field_block_len += len + 1;
2714
fields_buf.append(item->name, len + 1);
2715
field_lens_buf.append((char*)&len, 1);
2718
field_lens = (const unsigned char*)field_lens_buf.ptr();
2719
fields = fields_buf.ptr();
2725
The caller must do buf[event_len] = 0 before he starts using the
2728
Load_log_event::Load_log_event(const char *buf, uint32_t event_len,
2729
const Format_description_log_event *description_event)
2730
:Log_event(buf, description_event), num_fields(0), fields(0),
2731
field_lens(0),field_block_len(0),
2732
table_name(0), db(0), fname(0), local_fname(false)
2735
I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0,
2736
4.0->5.0 and 5.0->5.0 and it works.
2739
copy_log_event(buf, event_len,
2740
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
2742
description_event->common_header_len :
2743
LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
2745
/* otherwise it's a derived class, will call copy_log_event() itself */
2751
Load_log_event::copy_log_event()
2754
int Load_log_event::copy_log_event(const char *buf, ulong event_len,
2756
const Format_description_log_event *description_event)
2759
char* buf_end = (char*)buf + event_len;
2760
/* this is the beginning of the post-header */
2761
const char* data_head = buf + description_event->common_header_len;
2762
slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
2763
exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
2764
skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
2765
table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
2766
db_len = (uint)data_head[L_DB_LEN_OFFSET];
2767
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
2769
if ((int) event_len < body_offset)
2772
Sql_ex.init() on success returns the pointer to the first byte after
2773
the sql_ex structure, which is the start of field lengths array.
2775
if (!(field_lens= (unsigned char*)sql_ex.init((char*)buf + body_offset,
2777
buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
2780
data_len = event_len - body_offset;
2781
if (num_fields > data_len) // simple sanity check against corruption
2783
for (uint32_t i = 0; i < num_fields; i++)
2784
field_block_len += (uint)field_lens[i] + 1;
2786
fields = (char*)field_lens + num_fields;
2787
table_name = fields + field_block_len;
2788
db = table_name + table_name_len + 1;
2789
fname = db + db_len + 1;
2790
fname_len = strlen(fname);
2791
// null termination is accomplished by the caller doing buf[event_len]=0
2798
Load_log_event::set_fields()
2801
This function can not use the member variable
2802
for the database, since LOAD DATA INFILE on the slave
2803
can be for a different database than the current one.
2804
This is the reason for the affected_db argument to this method.
2807
void Load_log_event::set_fields(const char* affected_db,
2808
List<Item> &field_list,
2809
Name_resolution_context *context)
2812
const char* field = fields;
2813
for (i= 0; i < num_fields; i++)
2815
field_list.push_back(new Item_field(context,
2816
affected_db, table_name, field));
2817
field+= field_lens[i] + 1;
2823
Does the data loading job when executing a LOAD DATA on the slave.
2827
@param use_rli_only_for_errors If set to 1, rli is provided to
2828
Load_log_event::exec_event only for this
2829
function to have RPL_LOG_NAME and
2830
rli->last_slave_error, both being used by
2831
error reports. rli's position advancing
2832
is skipped (done by the caller which is
2833
Execute_load_log_event::exec_event).
2834
If set to 0, rli is provided for full use,
2835
i.e. for error reports and position
2839
fix this; this can be done by testing rules in
2840
Create_file_log_event::exec_event() and then discarding Append_block and
2843
this is a bug - this needs to be moved to the I/O thread
2851
int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
2852
bool use_rli_only_for_errors)
2855
new_db.length= db_len;
2856
new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
2857
thd->set_db(new_db.str, new_db.length);
2858
assert(thd->query == 0);
2859
thd->query_length= 0; // Should not be needed
2860
thd->is_slave_error= 0;
2861
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
2863
/* see Query_log_event::do_apply_event() and BUG#13360 */
2864
assert(!rli->m_table_map.count());
2866
Usually lex_start() is called by mysql_parse(), but we need it here
2867
as the present method does not call mysql_parse().
2870
mysql_reset_thd_for_next_command(thd);
2872
if (!use_rli_only_for_errors)
2875
Saved for InnoDB, see comment in
2876
Query_log_event::do_apply_event()
2878
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
2882
We test replicate_*_db rules. Note that we have already prepared
2883
the file to load, even if we are going to ignore and delete it
2884
now. So it is possible that we did a lot of disk writes for
2885
nothing. In other words, a big LOAD DATA INFILE on the master will
2886
still consume a lot of space on the slave (space in the relay log
2887
+ space of temp files: twice the space of the file to load...)
2888
even if it will finally be ignored. TODO: fix this; this can be
2889
done by testing rules in Create_file_log_event::do_apply_event()
2890
and then discarding Append_block and al. Another way is do the
2891
filtering in the I/O thread (more efficient: no disk writes at
2895
Note: We do not need to execute reset_one_shot_variables() if this
2897
Reason: The db stored in binlog events is the same for SET and for
2898
its companion query. If the SET is ignored because of
2899
db_ok(), the companion query will also be ignored, and if
2900
the companion query is ignored in the db_ok() test of
2901
::do_apply_event(), then the companion SET also have so
2902
we don't need to reset_one_shot_variables().
2904
if (rpl_filter->db_ok(thd->db))
2906
thd->set_time((time_t)when);
2907
pthread_mutex_lock(&LOCK_thread_count);
2908
thd->query_id = next_query_id();
2909
pthread_mutex_unlock(&LOCK_thread_count);
2911
Initing thd->row_count is not necessary in theory as this variable has no
2912
influence in the case of the slave SQL thread (it is used to generate a
2913
"data truncated" warning but which is absorbed and never gets to the
2914
error log); still we init it to avoid a Valgrind message.
2916
drizzle_reset_errors(thd, 0);
2919
memset(&tables, 0, sizeof(tables));
2920
tables.db= thd->strmake(thd->db, thd->db_length);
2921
tables.alias = tables.table_name = (char*) table_name;
2922
tables.lock_type = TL_WRITE;
2925
// the table will be opened in mysql_load
2926
if (rpl_filter->is_on() && !rpl_filter->tables_ok(thd->db, &tables))
2928
// TODO: this is a bug - this needs to be moved to the I/O thread
2930
skip_load_data_infile(net);
2936
enum enum_duplicates handle_dup;
2938
char *load_data_query;
2941
Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST
2942
and written to slave's binlog if binlogging is on.
2944
if (!(load_data_query= (char *)thd->alloc(get_query_buffer_length() + 1)))
2947
This will set thd->fatal_error in case of OOM. So we surely will notice
2948
that something is wrong.
2953
print_query(false, load_data_query, &end, (char **)&thd->lex->fname_start,
2954
(char **)&thd->lex->fname_end);
2956
thd->query_length= end - load_data_query;
2957
thd->query= load_data_query;
2959
if (sql_ex.opt_flags & REPLACE_FLAG)
2961
handle_dup= DUP_REPLACE;
2963
else if (sql_ex.opt_flags & IGNORE_FLAG)
2966
handle_dup= DUP_ERROR;
2971
When replication is running fine, if it was DUP_ERROR on the
2972
master then we could choose IGNORE here, because if DUP_ERROR
2973
suceeded on master, and data is identical on the master and slave,
2974
then there should be no uniqueness errors on slave, so IGNORE is
2975
the same as DUP_ERROR. But in the unlikely case of uniqueness errors
2976
(because the data on the master and slave happen to be different
2977
(user error or bug), we want LOAD DATA to print an error message on
2978
the slave to discover the problem.
2980
If reading from net (a 3.23 master), mysql_load() will change this
2983
handle_dup= DUP_ERROR;
2986
We need to set thd->lex->sql_command and thd->lex->duplicates
2987
since InnoDB tests these variables to decide if this is a LOAD
2988
DATA ... REPLACE INTO ... statement even though mysql_parse()
2989
is not called. This is not needed in 5.0 since there the LOAD
2990
DATA ... statement is replicated using mysql_parse(), which
2991
sets the thd->lex fields correctly.
2993
thd->lex->sql_command= SQLCOM_LOAD;
2994
thd->lex->duplicates= handle_dup;
2996
sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
2997
String field_term(sql_ex.field_term,sql_ex.field_term_len,log_cs);
2998
String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,log_cs);
2999
String line_term(sql_ex.line_term,sql_ex.line_term_len,log_cs);
3000
String line_start(sql_ex.line_start,sql_ex.line_start_len,log_cs);
3001
String escaped(sql_ex.escaped,sql_ex.escaped_len, log_cs);
3002
ex.field_term= &field_term;
3003
ex.enclosed= &enclosed;
3004
ex.line_term= &line_term;
3005
ex.line_start= &line_start;
3006
ex.escaped= &escaped;
3008
ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
3009
if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
3010
ex.field_term->length(0);
3012
ex.skip_lines = skip_lines;
3013
List<Item> field_list;
3014
thd->lex->select_lex.context.resolve_in_table_list_only(&tables);
3015
set_fields(tables.db, field_list, &thd->lex->select_lex.context);
3016
thd->variables.pseudo_thread_id= thread_id;
3019
// mysql_load will use thd->net to read the file
3020
thd->net.vio = net->vio;
3022
Make sure the client does not get confused about the packet sequence
3024
thd->net.pkt_nr = net->pkt_nr;
3027
It is safe to use tmp_list twice because we are not going to
3028
update it inside mysql_load().
3030
List<Item> tmp_list;
3031
if (mysql_load(thd, &ex, &tables, field_list, tmp_list, tmp_list,
3032
handle_dup, ignore, net != 0))
3033
thd->is_slave_error= 1;
3034
if (thd->cuted_fields)
3036
/* log_pos is the position of the LOAD event in the master log */
3037
sql_print_warning(_("Slave: load data infile on table '%s' at "
3038
"log position %s in log '%s' produced %ld "
3039
"warning(s). Default database: '%s'"),
3041
llstr(log_pos,llbuff), RPL_LOG_NAME,
3042
(ulong) thd->cuted_fields,
3043
print_slave_db_safe(thd->db));
3046
net->pkt_nr= thd->net.pkt_nr;
3052
We will just ask the master to send us /dev/null if we do not
3053
want to load the data.
3054
TODO: this a bug - needs to be done in I/O thread
3057
skip_load_data_infile(net);
3062
const char *remember_db= thd->db;
3063
pthread_mutex_lock(&LOCK_thread_count);
3065
thd->set_db(NULL, 0); /* will free the current database */
3067
thd->query_length= 0;
3068
pthread_mutex_unlock(&LOCK_thread_count);
3069
close_thread_tables(thd);
3071
if (thd->is_slave_error)
3073
/* this err/sql_errno code is copy-paste from net_send_error() */
3076
if (thd->is_error())
3078
err= thd->main_da.message();
3079
sql_errno= thd->main_da.sql_errno();
3083
sql_errno=ER_UNKNOWN_ERROR;
3086
rli->report(ERROR_LEVEL, sql_errno,
3087
_("Error '%s' running LOAD DATA INFILE on table '%s'. "
3088
"Default database: '%s'"),
3089
err, (char*)table_name, print_slave_db_safe(remember_db));
3090
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
3093
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
3095
if (thd->is_fatal_error)
3098
snprintf(buf, sizeof(buf),
3099
_("Running LOAD DATA INFILE on table '%-.64s'."
3100
" Default database: '%-.64s'"),
3102
print_slave_db_safe(remember_db));
3104
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
3105
ER(ER_SLAVE_FATAL_ERROR), buf);
3109
return ( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) );
3113
/**************************************************************************
3114
Rotate_log_event methods
3115
**************************************************************************/
3118
Rotate_log_event::pack_info()
3121
void Rotate_log_event::pack_info(Protocol *protocol)
3123
char buf1[256], buf[22];
3124
String tmp(buf1, sizeof(buf1), log_cs);
3126
tmp.append(new_log_ident, ident_len);
3127
tmp.append(STRING_WITH_LEN(";pos="));
3128
tmp.append(llstr(pos,buf));
3129
protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
3134
Rotate_log_event::Rotate_log_event() (2 constructors)
3138
Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
3139
uint32_t ident_len_arg, uint64_t pos_arg,
3141
:Log_event(), new_log_ident(new_log_ident_arg),
3142
pos(pos_arg),ident_len(ident_len_arg ? ident_len_arg :
3143
(uint) strlen(new_log_ident_arg)), flags(flags_arg)
3145
if (flags & DUP_NAME)
3146
new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME));
3151
Rotate_log_event::Rotate_log_event(const char* buf, uint32_t event_len,
3152
const Format_description_log_event* description_event)
3153
:Log_event(buf, description_event) ,new_log_ident(0), flags(DUP_NAME)
3155
// The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
3156
uint8_t header_size= description_event->common_header_len;
3157
uint8_t post_header_len= description_event->post_header_len[ROTATE_EVENT-1];
3158
uint32_t ident_offset;
3159
if (event_len < header_size)
3162
pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
3163
ident_len = (uint)(event_len -
3164
(header_size+post_header_len));
3165
ident_offset = post_header_len;
3166
set_if_smaller(ident_len,FN_REFLEN-1);
3167
new_log_ident= my_strndup(buf + ident_offset, (uint) ident_len, MYF(MY_WME));
3173
Rotate_log_event::write()
3176
bool Rotate_log_event::write(IO_CACHE* file)
3178
char buf[ROTATE_HEADER_LEN];
3179
int8store(buf + R_POS_OFFSET, pos);
3180
return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
3181
my_b_safe_write(file, (unsigned char*)buf, ROTATE_HEADER_LEN) ||
3182
my_b_safe_write(file, (unsigned char*)new_log_ident, (uint) ident_len));
3187
Got a rotate log event from the master.
3189
This is mainly used so that we can later figure out the logname and
3190
position for the master.
3192
We can't rotate the slave's BINlog as this will cause infinitive rotations
3193
in a A -> B -> A setup.
3194
The NOTES below is a wrong comment which will disappear when 4.1 is merged.
3199
int Rotate_log_event::do_update_pos(Relay_log_info *rli)
3201
pthread_mutex_lock(&rli->data_lock);
3202
rli->event_relay_log_pos= my_b_tell(rli->cur_log);
3204
If we are in a transaction or in a group: the only normal case is
3205
when the I/O thread was copying a big transaction, then it was
3206
stopped and restarted: we have this in the relay log:
3214
In that case, we don't want to touch the coordinates which
3215
correspond to the beginning of the transaction. Starting from
3216
5.0.0, there also are some rotates from the slave itself, in the
3217
relay log, which shall not change the group positions.
3219
if ((server_id != ::server_id || rli->replicate_same_server_id) &&
3220
!rli->is_in_group())
3222
memcpy(rli->group_master_log_name, new_log_ident, ident_len+1);
3223
rli->notify_group_master_log_name_update();
3224
rli->group_master_log_pos= pos;
3225
strmake(rli->group_relay_log_name, rli->event_relay_log_name,
3226
sizeof(rli->group_relay_log_name) - 1);
3227
rli->notify_group_relay_log_name_update();
3228
rli->group_relay_log_pos= rli->event_relay_log_pos;
3230
Reset thd->options and sql_mode etc, because this could be the signal of
3231
a master's downgrade from 5.0 to 4.0.
3232
However, no need to reset description_event_for_exec: indeed, if the next
3233
master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
3234
master is 4.0 then the events are in the slave's format (conversion).
3236
set_slave_thread_options(thd);
3237
thd->variables.auto_increment_increment=
3238
thd->variables.auto_increment_offset= 1;
3240
pthread_mutex_unlock(&rli->data_lock);
3241
pthread_cond_broadcast(&rli->data_cond);
3242
flush_relay_log_info(rli);
3248
Log_event::enum_skip_reason
3249
Rotate_log_event::do_shall_skip(Relay_log_info *rli)
3251
enum_skip_reason reason= Log_event::do_shall_skip(rli);
3254
case Log_event::EVENT_SKIP_NOT:
3255
case Log_event::EVENT_SKIP_COUNT:
3256
return Log_event::EVENT_SKIP_NOT;
3258
case Log_event::EVENT_SKIP_IGNORE:
3259
return Log_event::EVENT_SKIP_IGNORE;
3262
return Log_event::EVENT_SKIP_NOT; // To keep compiler happy
3266
/**************************************************************************
3267
Intvar_log_event methods
3268
**************************************************************************/
3271
Intvar_log_event::pack_info()
3274
void Intvar_log_event::pack_info(Protocol *protocol)
3276
char buf[256], *pos;
3277
pos= strmake(buf, get_var_type_name(), sizeof(buf)-23);
3279
pos= int64_t10_to_str(val, pos, -10);
3280
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3285
Intvar_log_event::Intvar_log_event()
3288
Intvar_log_event::Intvar_log_event(const char* buf,
3289
const Format_description_log_event* description_event)
3290
:Log_event(buf, description_event)
3292
buf+= description_event->common_header_len;
3293
type= buf[I_TYPE_OFFSET];
3294
val= uint8korr(buf+I_VAL_OFFSET);
3299
Intvar_log_event::get_var_type_name()
3302
const char* Intvar_log_event::get_var_type_name()
3305
case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
3306
case INSERT_ID_EVENT: return "INSERT_ID";
3307
default: /* impossible */ return "UNKNOWN";
3313
Intvar_log_event::write()
3316
bool Intvar_log_event::write(IO_CACHE* file)
3318
unsigned char buf[9];
3319
buf[I_TYPE_OFFSET]= (unsigned char) type;
3320
int8store(buf + I_VAL_OFFSET, val);
3321
return (write_header(file, sizeof(buf)) ||
3322
my_b_safe_write(file, buf, sizeof(buf)));
3327
Intvar_log_event::print()
3331
Intvar_log_event::do_apply_event()
3334
int Intvar_log_event::do_apply_event(Relay_log_info const *rli)
3337
We are now in a statement until the associated query log event has
3340
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3343
case LAST_INSERT_ID_EVENT:
3344
thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1;
3345
thd->first_successful_insert_id_in_prev_stmt= val;
3347
case INSERT_ID_EVENT:
3348
thd->force_one_auto_inc_interval(val);
3354
int Intvar_log_event::do_update_pos(Relay_log_info *rli)
3356
rli->inc_event_relay_log_pos();
3361
Log_event::enum_skip_reason
3362
Intvar_log_event::do_shall_skip(Relay_log_info *rli)
3365
It is a common error to set the slave skip counter to 1 instead of
3366
2 when recovering from an insert which used a auto increment,
3367
rand, or user var. Therefore, if the slave skip counter is 1, we
3368
just say that this event should be skipped by ignoring it, meaning
3369
that we do not change the value of the slave skip counter since it
3370
will be decreased by the following insert event.
3372
return continue_group(rli);
3376
/**************************************************************************
3377
Rand_log_event methods
3378
**************************************************************************/
3380
void Rand_log_event::pack_info(Protocol *protocol)
3382
char buf1[256], *pos;
3383
pos= my_stpcpy(buf1,"rand_seed1=");
3384
pos= int10_to_str((long) seed1, pos, 10);
3385
pos= my_stpcpy(pos, ",rand_seed2=");
3386
pos= int10_to_str((long) seed2, pos, 10);
3387
protocol->store(buf1, (uint) (pos-buf1), &my_charset_bin);
3391
Rand_log_event::Rand_log_event(const char* buf,
3392
const Format_description_log_event* description_event)
3393
:Log_event(buf, description_event)
3395
buf+= description_event->common_header_len;
3396
seed1= uint8korr(buf+RAND_SEED1_OFFSET);
3397
seed2= uint8korr(buf+RAND_SEED2_OFFSET);
3401
bool Rand_log_event::write(IO_CACHE* file)
3403
unsigned char buf[16];
3404
int8store(buf + RAND_SEED1_OFFSET, seed1);
3405
int8store(buf + RAND_SEED2_OFFSET, seed2);
3406
return (write_header(file, sizeof(buf)) ||
3407
my_b_safe_write(file, buf, sizeof(buf)));
3411
int Rand_log_event::do_apply_event(Relay_log_info const *rli)
3414
We are now in a statement until the associated query log event has
3417
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3419
thd->rand.seed1= (ulong) seed1;
3420
thd->rand.seed2= (ulong) seed2;
3424
int Rand_log_event::do_update_pos(Relay_log_info *rli)
3426
rli->inc_event_relay_log_pos();
3431
Log_event::enum_skip_reason
3432
Rand_log_event::do_shall_skip(Relay_log_info *rli)
3435
It is a common error to set the slave skip counter to 1 instead of
3436
2 when recovering from an insert which used a auto increment,
3437
rand, or user var. Therefore, if the slave skip counter is 1, we
3438
just say that this event should be skipped by ignoring it, meaning
3439
that we do not change the value of the slave skip counter since it
3440
will be decreased by the following insert event.
3442
return continue_group(rli);
3446
/**************************************************************************
3447
Xid_log_event methods
3448
**************************************************************************/
3450
void Xid_log_event::pack_info(Protocol *protocol)
3452
char buf[128], *pos;
3453
pos= my_stpcpy(buf, "COMMIT /* xid=");
3454
pos= int64_t10_to_str(xid, pos, 10);
3455
pos= my_stpcpy(pos, " */");
3456
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3461
It's ok not to use int8store here,
3462
as long as xid_t::set(uint64_t) and
3463
xid_t::get_my_xid doesn't do it either.
3464
We don't care about actual values of xids as long as
3465
identical numbers compare identically
3469
Xid_log_event(const char* buf,
3470
const Format_description_log_event *description_event)
3471
:Log_event(buf, description_event)
3473
buf+= description_event->common_header_len;
3474
memcpy(&xid, buf, sizeof(xid));
3478
bool Xid_log_event::write(IO_CACHE* file)
3480
return write_header(file, sizeof(xid)) ||
3481
my_b_safe_write(file, (unsigned char*) &xid, sizeof(xid));
3485
int Xid_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
3487
return end_trans(thd, COMMIT);
3490
Log_event::enum_skip_reason
3491
Xid_log_event::do_shall_skip(Relay_log_info *rli)
3493
if (rli->slave_skip_counter > 0) {
3494
thd->options&= ~OPTION_BEGIN;
3495
return(Log_event::EVENT_SKIP_COUNT);
3497
return(Log_event::do_shall_skip(rli));
3501
/**************************************************************************
3502
User_var_log_event methods
3503
**************************************************************************/
3505
void User_var_log_event::pack_info(Protocol* protocol)
3508
uint32_t val_offset= 4 + name_len;
3509
uint32_t event_len= val_offset;
3513
if (!(buf= (char*) my_malloc(val_offset + 5, MYF(MY_WME))))
3515
my_stpcpy(buf + val_offset, "NULL");
3516
event_len= val_offset + 4;
3523
float8get(real_val, val);
3524
if (!(buf= (char*) my_malloc(val_offset + MY_GCVT_MAX_FIELD_WIDTH + 1,
3527
event_len+= my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, MY_GCVT_MAX_FIELD_WIDTH,
3528
buf + val_offset, NULL);
3531
if (!(buf= (char*) my_malloc(val_offset + 22, MYF(MY_WME))))
3533
event_len= int64_t10_to_str(uint8korr(val), buf + val_offset,-10)-buf;
3535
case DECIMAL_RESULT:
3537
if (!(buf= (char*) my_malloc(val_offset + DECIMAL_MAX_STR_LENGTH,
3540
String str(buf+val_offset, DECIMAL_MAX_STR_LENGTH, &my_charset_bin);
3542
binary2my_decimal(E_DEC_FATAL_ERROR, (unsigned char*) (val+2), &dec, val[0],
3544
my_decimal2string(E_DEC_FATAL_ERROR, &dec, 0, 0, 0, &str);
3545
event_len= str.length() + val_offset;
3549
/* 15 is for 'COLLATE' and other chars */
3550
buf= (char*) my_malloc(event_len+val_len*2+1+2*MY_CS_NAME_SIZE+15,
3552
const CHARSET_INFO *cs;
3555
if (!(cs= get_charset(charset_number, MYF(0))))
3557
my_stpcpy(buf+val_offset, "???");
3562
char *p= strxmov(buf + val_offset, "_", cs->csname, " ", NULL);
3563
p= str_to_hex(p, val, val_len);
3564
p= strxmov(p, " COLLATE ", cs->name, NULL);
3576
memcpy(buf+2, name, name_len);
3577
buf[2+name_len]= '`';
3578
buf[3+name_len]= '=';
3579
protocol->store(buf, event_len, &my_charset_bin);
3584
User_var_log_event::
3585
User_var_log_event(const char* buf,
3586
const Format_description_log_event* description_event)
3587
:Log_event(buf, description_event)
3589
buf+= description_event->common_header_len;
3590
name_len= uint4korr(buf);
3591
name= (char *) buf + UV_NAME_LEN_SIZE;
3592
buf+= UV_NAME_LEN_SIZE + name_len;
3593
is_null= (bool) *buf;
3596
type= STRING_RESULT;
3597
charset_number= my_charset_bin.number;
3603
type= (Item_result) buf[UV_VAL_IS_NULL];
3604
charset_number= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE);
3605
val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3606
UV_CHARSET_NUMBER_SIZE);
3607
val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3608
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE);
3613
bool User_var_log_event::write(IO_CACHE* file)
3615
char buf[UV_NAME_LEN_SIZE];
3616
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3617
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
3618
unsigned char buf2[(8 > DECIMAL_MAX_FIELD_SIZE + 2) ? 8 : DECIMAL_MAX_FIELD_SIZE +2], *pos= buf2;
3619
uint32_t buf1_length;
3622
int4store(buf, name_len);
3624
if ((buf1[0]= is_null))
3627
val_len= 0; // Length of 'pos'
3632
int4store(buf1 + 2, charset_number);
3636
float8store(buf2, *(double*) val);
3639
int8store(buf2, *(int64_t*) val);
3641
case DECIMAL_RESULT:
3643
my_decimal *dec= (my_decimal *)val;
3644
dec->fix_buffer_pointer();
3645
buf2[0]= (char)(dec->intg + dec->frac);
3646
buf2[1]= (char)dec->frac;
3647
decimal2bin((decimal_t*)val, buf2+2, buf2[0], buf2[1]);
3648
val_len= decimal_bin_size(buf2[0], buf2[1]) + 2;
3652
pos= (unsigned char*) val;
3659
int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
3663
/* Length of the whole event */
3664
event_length= sizeof(buf)+ name_len + buf1_length + val_len;
3666
return (write_header(file, event_length) ||
3667
my_b_safe_write(file, (unsigned char*) buf, sizeof(buf)) ||
3668
my_b_safe_write(file, (unsigned char*) name, name_len) ||
3669
my_b_safe_write(file, (unsigned char*) buf1, buf1_length) ||
3670
my_b_safe_write(file, pos, val_len));
3676
User_var_log_event::do_apply_event()
3679
int User_var_log_event::do_apply_event(Relay_log_info const *rli)
3682
const CHARSET_INFO *charset;
3683
if (!(charset= get_charset(charset_number, MYF(MY_WME))))
3685
LEX_STRING user_var_name;
3686
user_var_name.str= name;
3687
user_var_name.length= name_len;
3692
We are now in a statement until the associated query log event has
3695
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3699
it= new Item_null();
3705
float8get(real_val, val);
3706
it= new Item_float(real_val, 0);
3707
val= (char*) &real_val; // Pointer to value in native format
3711
int_val= (int64_t) uint8korr(val);
3712
it= new Item_int(int_val);
3713
val= (char*) &int_val; // Pointer to value in native format
3716
case DECIMAL_RESULT:
3718
Item_decimal *dec= new Item_decimal((unsigned char*) val+2, val[0], val[1]);
3720
val= (char *)dec->val_decimal(NULL);
3721
val_len= sizeof(my_decimal);
3725
it= new Item_string(val, val_len, charset);
3733
Item_func_set_user_var e(user_var_name, it);
3735
Item_func_set_user_var can't substitute something else on its place =>
3736
0 can be passed as last argument (reference on item)
3738
e.fix_fields(thd, 0);
3740
A variable can just be considered as a table with
3741
a single record and with a single column. Thus, like
3742
a column value, it could always have IMPLICIT derivation.
3744
e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0);
3745
free_root(thd->mem_root,0);
3750
int User_var_log_event::do_update_pos(Relay_log_info *rli)
3752
rli->inc_event_relay_log_pos();
3756
Log_event::enum_skip_reason
3757
User_var_log_event::do_shall_skip(Relay_log_info *rli)
3760
It is a common error to set the slave skip counter to 1 instead
3761
of 2 when recovering from an insert which used a auto increment,
3762
rand, or user var. Therefore, if the slave skip counter is 1, we
3763
just say that this event should be skipped by ignoring it, meaning
3764
that we do not change the value of the slave skip counter since it
3765
will be decreased by the following insert event.
3767
return continue_group(rli);
3771
/**************************************************************************
3772
Slave_log_event methods
3773
**************************************************************************/
3775
void Slave_log_event::pack_info(Protocol *protocol)
3777
char buf[256+HOSTNAME_LENGTH], *pos;
3778
pos= my_stpcpy(buf, "host=");
3779
pos= my_stpncpy(pos, master_host, HOSTNAME_LENGTH);
3780
pos= my_stpcpy(pos, ",port=");
3781
pos= int10_to_str((long) master_port, pos, 10);
3782
pos= my_stpcpy(pos, ",log=");
3783
pos= my_stpcpy(pos, master_log);
3784
pos= my_stpcpy(pos, ",pos=");
3785
pos= int64_t10_to_str(master_pos, pos, 10);
3786
protocol->store(buf, pos-buf, &my_charset_bin);
3792
re-write this better without holding both locks at the same time
3794
Slave_log_event::Slave_log_event(THD* thd_arg,
3795
Relay_log_info* rli)
3796
:Log_event(thd_arg, 0, 0) , mem_pool(0), master_host(0)
3798
if (!rli->inited) // QQ When can this happen ?
3801
Master_info* mi = rli->mi;
3802
// TODO: re-write this better without holding both locks at the same time
3803
pthread_mutex_lock(&mi->data_lock);
3804
pthread_mutex_lock(&rli->data_lock);
3805
master_host_len = strlen(mi->host);
3806
master_log_len = strlen(rli->group_master_log_name);
3807
// on OOM, just do not initialize the structure and print the error
3808
if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
3811
master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
3812
memcpy(master_host, mi->host, master_host_len + 1);
3813
master_log = master_host + master_host_len + 1;
3814
memcpy(master_log, rli->group_master_log_name, master_log_len + 1);
3815
master_port = mi->port;
3816
master_pos = rli->group_master_log_pos;
3819
sql_print_error(_("Out of memory while recording slave event"));
3820
pthread_mutex_unlock(&rli->data_lock);
3821
pthread_mutex_unlock(&mi->data_lock);
3826
Slave_log_event::~Slave_log_event()
3832
int Slave_log_event::get_data_size()
3834
return master_host_len + master_log_len + 1 + SL_MASTER_HOST_OFFSET;
3838
bool Slave_log_event::write(IO_CACHE* file)
3840
ulong event_length= get_data_size();
3841
int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
3842
int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
3843
// log and host are already there
3845
return (write_header(file, event_length) ||
3846
my_b_safe_write(file, (unsigned char*) mem_pool, event_length));
3850
void Slave_log_event::init_from_mem_pool(int data_size)
3852
master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
3853
master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
3854
master_host = mem_pool + SL_MASTER_HOST_OFFSET;
3855
master_host_len = strlen(master_host);
3857
master_log = master_host + master_host_len + 1;
3858
if (master_log > mem_pool + data_size)
3863
master_log_len = strlen(master_log);
3867
/** This code is not used, so has not been updated to be format-tolerant. */
3868
Slave_log_event::Slave_log_event(const char* buf, uint32_t event_len)
3869
:Log_event(buf,0) /*unused event*/ ,mem_pool(0),master_host(0)
3871
if (event_len < LOG_EVENT_HEADER_LEN)
3873
event_len -= LOG_EVENT_HEADER_LEN;
3874
if (!(mem_pool = (char*) my_malloc(event_len + 1, MYF(MY_WME))))
3876
memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len);
3877
mem_pool[event_len] = 0;
3878
init_from_mem_pool(event_len);
3882
int Slave_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
3884
if (mysql_bin_log.is_open())
3885
mysql_bin_log.write(this);
3890
/**************************************************************************
3891
Stop_log_event methods
3892
**************************************************************************/
3895
The master stopped. We used to clean up all temporary tables but
3896
this is useless as, as the master has shut down properly, it has
3897
written all DROP TEMPORARY Table (prepared statements' deletion is
3898
TODO only when we binlog prep stmts). We used to clean up
3899
slave_load_tmpdir, but this is useless as it has been cleared at the
3900
end of LOAD DATA INFILE. So we have nothing to do here. The place
3901
were we must do this cleaning is in
3902
Start_log_event_v3::do_apply_event(), not here. Because if we come
3903
here, the master was sane.
3905
int Stop_log_event::do_update_pos(Relay_log_info *rli)
3908
We do not want to update master_log pos because we get a rotate event
3909
before stop, so by now group_master_log_name is set to the next log.
3910
If we updated it, we will have incorrect master coordinates and this
3911
could give false triggers in MASTER_POS_WAIT() that we have reached
3912
the target position when in fact we have not.
3914
if (thd->options & OPTION_BEGIN)
3915
rli->inc_event_relay_log_pos();
3918
rli->inc_group_relay_log_pos(0);
3919
flush_relay_log_info(rli);
3925
/**************************************************************************
3926
Create_file_log_event methods
3927
**************************************************************************/
3930
Create_file_log_event ctor
3933
Create_file_log_event::
3934
Create_file_log_event(THD* thd_arg, sql_exchange* ex,
3935
const char* db_arg, const char* table_name_arg,
3936
List<Item>& fields_arg, enum enum_duplicates handle_dup,
3938
unsigned char* block_arg, uint32_t block_len_arg, bool using_trans)
3939
:Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
3941
fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
3942
file_id(thd_arg->file_id = mysql_bin_log.next_file_id())
3944
sql_ex.force_new_format();
3950
Create_file_log_event::write_data_body()
3953
bool Create_file_log_event::write_data_body(IO_CACHE* file)
3956
if ((res= Load_log_event::write_data_body(file)) || fake_base)
3958
return (my_b_safe_write(file, (unsigned char*) "", 1) ||
3959
my_b_safe_write(file, (unsigned char*) block, block_len));
3964
Create_file_log_event::write_data_header()
3967
bool Create_file_log_event::write_data_header(IO_CACHE* file)
3970
unsigned char buf[CREATE_FILE_HEADER_LEN];
3971
if ((res= Load_log_event::write_data_header(file)) || fake_base)
3973
int4store(buf + CF_FILE_ID_OFFSET, file_id);
3974
return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
3979
Create_file_log_event::write_base()
3982
bool Create_file_log_event::write_base(IO_CACHE* file)
3985
fake_base= 1; // pretend we are Load event
3992
Create_file_log_event ctor
3995
Create_file_log_event::Create_file_log_event(const char* buf, uint32_t len,
3996
const Format_description_log_event* description_event)
3997
:Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0)
3999
uint32_t block_offset;
4000
uint32_t header_len= description_event->common_header_len;
4001
uint8_t load_header_len= description_event->post_header_len[LOAD_EVENT-1];
4002
uint8_t create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
4003
if (!(event_buf= (char*) my_memdup(buf, len, MYF(MY_WME))) ||
4004
copy_log_event(event_buf,len,
4005
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
4006
load_header_len + header_len :
4007
(fake_base ? (header_len+load_header_len) :
4008
(header_len+load_header_len) +
4009
create_file_header_len)),
4012
if (description_event->binlog_version!=1)
4014
file_id= uint4korr(buf +
4016
load_header_len + CF_FILE_ID_OFFSET);
4018
Note that it's ok to use get_data_size() below, because it is computed
4019
with values we have already read from this event (because we called
4020
copy_log_event()); we are not using slave's format info to decode
4021
master's format, we are really using master's format info.
4022
Anyway, both formats should be identical (except the common_header_len)
4023
as these Load events are not changed between 4.0 and 5.0 (as logging of
4024
LOAD DATA INFILE does not use Load_log_event in 5.0).
4026
The + 1 is for \0 terminating fname
4028
block_offset= (description_event->common_header_len +
4029
Load_log_event::get_data_size() +
4030
create_file_header_len + 1);
4031
if (len < block_offset)
4033
block = (unsigned char*)buf + block_offset;
4034
block_len = len - block_offset;
4038
sql_ex.force_new_format();
4039
inited_from_old = 1;
4046
Create_file_log_event::pack_info()
4049
void Create_file_log_event::pack_info(Protocol *protocol)
4051
char buf[NAME_LEN*2 + 30 + 21*2], *pos;
4052
pos= my_stpcpy(buf, "db=");
4053
memcpy(pos, db, db_len);
4054
pos= my_stpcpy(pos + db_len, ";table=");
4055
memcpy(pos, table_name, table_name_len);
4056
pos= my_stpcpy(pos + table_name_len, ";file_id=");
4057
pos= int10_to_str((long) file_id, pos, 10);
4058
pos= my_stpcpy(pos, ";block_len=");
4059
pos= int10_to_str((long) block_len, pos, 10);
4060
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
4065
Create_file_log_event::do_apply_event()
4068
int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
4070
char proc_info[17+FN_REFLEN+10], *fname_buf;
4076
memset(&file, 0, sizeof(file));
4077
fname_buf= my_stpcpy(proc_info, "Making temp file ");
4078
ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
4079
thd->set_proc_info(proc_info);
4080
my_delete(fname_buf, MYF(0)); // old copy may exist already
4081
if ((fd= my_create(fname_buf, CREATE_MODE,
4082
O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
4083
MYF(MY_WME))) < 0 ||
4084
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
4085
MYF(MY_WME|MY_NABP)))
4087
rli->report(ERROR_LEVEL, my_errno,
4088
_("Error in Create_file event: could not open file '%s'"),
4093
// a trick to avoid allocating another buffer
4095
fname_len= (uint) (my_stpcpy(ext, ".data") - fname);
4096
if (write_base(&file))
4098
my_stpcpy(ext, ".info"); // to have it right in the error message
4099
rli->report(ERROR_LEVEL, my_errno,
4100
_("Error in Create_file event: could not write to file '%s'"),
4104
end_io_cache(&file);
4105
my_close(fd, MYF(0));
4107
// fname_buf now already has .data, not .info, because we did our trick
4108
my_delete(fname_buf, MYF(0)); // old copy may exist already
4109
if ((fd= my_create(fname_buf, CREATE_MODE,
4110
O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
4113
rli->report(ERROR_LEVEL, my_errno,
4114
_("Error in Create_file event: could not open file '%s'"),
4118
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
4120
rli->report(ERROR_LEVEL, my_errno,
4121
_("Error in Create_file event: write to '%s' failed"),
4125
error=0; // Everything is ok
4129
end_io_cache(&file);
4131
my_close(fd, MYF(0));
4132
thd->set_proc_info(0);
4137
/**************************************************************************
4138
Append_block_log_event methods
4139
**************************************************************************/
4142
Append_block_log_event ctor
4145
Append_block_log_event::Append_block_log_event(THD *thd_arg,
4147
unsigned char *block_arg,
4148
uint32_t block_len_arg,
4150
:Log_event(thd_arg,0, using_trans), block(block_arg),
4151
block_len(block_len_arg), file_id(thd_arg->file_id), db(db_arg)
4157
Append_block_log_event ctor
4160
Append_block_log_event::Append_block_log_event(const char* buf, uint32_t len,
4161
const Format_description_log_event* description_event)
4162
:Log_event(buf, description_event),block(0)
4164
uint8_t common_header_len= description_event->common_header_len;
4165
uint8_t append_block_header_len=
4166
description_event->post_header_len[APPEND_BLOCK_EVENT-1];
4167
uint32_t total_header_len= common_header_len+append_block_header_len;
4168
if (len < total_header_len)
4170
file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
4171
block= (unsigned char*)buf + total_header_len;
4172
block_len= len - total_header_len;
4178
Append_block_log_event::write()
4181
bool Append_block_log_event::write(IO_CACHE* file)
4183
unsigned char buf[APPEND_BLOCK_HEADER_LEN];
4184
int4store(buf + AB_FILE_ID_OFFSET, file_id);
4185
return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
4186
my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
4187
my_b_safe_write(file, (unsigned char*) block, block_len));
4192
Append_block_log_event::pack_info()
4195
void Append_block_log_event::pack_info(Protocol *protocol)
4199
length= (uint) sprintf(buf, ";file_id=%u;block_len=%u", file_id,
4201
protocol->store(buf, length, &my_charset_bin);
4206
Append_block_log_event::get_create_or_append()
4209
int Append_block_log_event::get_create_or_append() const
4211
return 0; /* append to the file, fail if not exists */
4215
Append_block_log_event::do_apply_event()
4218
int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
4220
char proc_info[17+FN_REFLEN+10], *fname= proc_info+17;
4224
fname= my_stpcpy(proc_info, "Making temp file ");
4225
slave_load_file_stem(fname, file_id, server_id, ".data");
4226
thd->set_proc_info(proc_info);
4227
if (get_create_or_append())
4229
my_delete(fname, MYF(0)); // old copy may exist already
4230
if ((fd= my_create(fname, CREATE_MODE,
4231
O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
4234
rli->report(ERROR_LEVEL, my_errno,
4235
_("Error in %s event: could not create file '%s'"),
4236
get_type_str(), fname);
4240
else if ((fd = my_open(fname, O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW,
4243
rli->report(ERROR_LEVEL, my_errno,
4244
_("Error in %s event: could not open file '%s'"),
4245
get_type_str(), fname);
4248
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
4250
rli->report(ERROR_LEVEL, my_errno,
4251
_("Error in %s event: write to '%s' failed"),
4252
get_type_str(), fname);
4259
my_close(fd, MYF(0));
4260
thd->set_proc_info(0);
4265
/**************************************************************************
4266
Delete_file_log_event methods
4267
**************************************************************************/
4270
Delete_file_log_event ctor
4273
Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg,
4275
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
4280
Delete_file_log_event ctor
4283
Delete_file_log_event::Delete_file_log_event(const char* buf, uint32_t len,
4284
const Format_description_log_event* description_event)
4285
:Log_event(buf, description_event),file_id(0)
4287
uint8_t common_header_len= description_event->common_header_len;
4288
uint8_t delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1];
4289
if (len < (uint)(common_header_len + delete_file_header_len))
4291
file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET);
4296
Delete_file_log_event::write()
4299
bool Delete_file_log_event::write(IO_CACHE* file)
4301
unsigned char buf[DELETE_FILE_HEADER_LEN];
4302
int4store(buf + DF_FILE_ID_OFFSET, file_id);
4303
return (write_header(file, sizeof(buf)) ||
4304
my_b_safe_write(file, buf, sizeof(buf)));
4309
Delete_file_log_event::pack_info()
4312
void Delete_file_log_event::pack_info(Protocol *protocol)
4316
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
4317
protocol->store(buf, (int32_t) length, &my_charset_bin);
4321
Delete_file_log_event::do_apply_event()
4324
int Delete_file_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
4326
char fname[FN_REFLEN+10];
4327
char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
4328
(void) my_delete(fname, MYF(MY_WME));
4329
my_stpcpy(ext, ".info");
4330
(void) my_delete(fname, MYF(MY_WME));
4335
/**************************************************************************
4336
Execute_load_log_event methods
4337
**************************************************************************/
4340
Execute_load_log_event ctor
4343
Execute_load_log_event::Execute_load_log_event(THD *thd_arg,
4346
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
4352
Execute_load_log_event ctor
4355
Execute_load_log_event::Execute_load_log_event(const char* buf, uint32_t len,
4356
const Format_description_log_event* description_event)
4357
:Log_event(buf, description_event), file_id(0)
4359
uint8_t common_header_len= description_event->common_header_len;
4360
uint8_t exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1];
4361
if (len < (uint)(common_header_len+exec_load_header_len))
4363
file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET);
4368
Execute_load_log_event::write()
4371
bool Execute_load_log_event::write(IO_CACHE* file)
4373
unsigned char buf[EXEC_LOAD_HEADER_LEN];
4374
int4store(buf + EL_FILE_ID_OFFSET, file_id);
4375
return (write_header(file, sizeof(buf)) ||
4376
my_b_safe_write(file, buf, sizeof(buf)));
4381
Execute_load_log_event::pack_info()
4384
void Execute_load_log_event::pack_info(Protocol *protocol)
4388
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
4389
protocol->store(buf, (int32_t) length, &my_charset_bin);
4394
Execute_load_log_event::do_apply_event()
4397
int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
4399
char fname[FN_REFLEN+10];
4404
Load_log_event *lev= 0;
4406
ext= slave_load_file_stem(fname, file_id, server_id, ".info");
4407
if ((fd = my_open(fname, O_RDONLY | O_BINARY | O_NOFOLLOW,
4408
MYF(MY_WME))) < 0 ||
4409
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
4410
MYF(MY_WME|MY_NABP)))
4412
rli->report(ERROR_LEVEL, my_errno,
4413
_("Error in Exec_load event: could not open file '%s'"),
4417
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
4418
(pthread_mutex_t*)0,
4419
rli->relay_log.description_event_for_exec)) ||
4420
lev->get_type_code() != NEW_LOAD_EVENT)
4422
rli->report(ERROR_LEVEL, 0,
4423
_("Error in Exec_load event: "
4424
"file '%s' appears corrupted"),
4431
lev->do_apply_event should use rli only for errors i.e. should
4432
not advance rli's position.
4434
lev->do_apply_event is the place where the table is loaded (it
4435
calls mysql_load()).
4438
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
4439
if (lev->do_apply_event(0,rli,1))
4442
We want to indicate the name of the file that could not be loaded
4444
But as we are here we are sure the error is in rli->last_slave_error and
4445
rli->last_slave_errno (example of error: duplicate entry for key), so we
4446
don't want to overwrite it with the filename.
4447
What we want instead is add the filename to the current error message.
4449
char *tmp= my_strdup(rli->last_error().message, MYF(MY_WME));
4452
rli->report(ERROR_LEVEL, rli->last_error().number,
4453
_("%s. Failed executing load from '%s'"),
4460
We have an open file descriptor to the .info file; we need to close it
4461
or Windows will refuse to delete the file in my_delete().
4465
my_close(fd, MYF(0));
4466
end_io_cache(&file);
4469
(void) my_delete(fname, MYF(MY_WME));
4470
memcpy(ext, ".data", 6);
4471
(void) my_delete(fname, MYF(MY_WME));
4478
my_close(fd, MYF(0));
4479
end_io_cache(&file);
4485
/**************************************************************************
4486
Begin_load_query_log_event methods
4487
**************************************************************************/
4489
Begin_load_query_log_event::
4490
Begin_load_query_log_event(THD* thd_arg, const char* db_arg, unsigned char* block_arg,
4491
uint32_t block_len_arg, bool using_trans)
4492
:Append_block_log_event(thd_arg, db_arg, block_arg, block_len_arg,
4495
file_id= thd_arg->file_id= mysql_bin_log.next_file_id();
4499
Begin_load_query_log_event::
4500
Begin_load_query_log_event(const char* buf, uint32_t len,
4501
const Format_description_log_event* desc_event)
4502
:Append_block_log_event(buf, len, desc_event)
4507
int Begin_load_query_log_event::get_create_or_append() const
4509
return 1; /* create the file */
4513
Log_event::enum_skip_reason
4514
Begin_load_query_log_event::do_shall_skip(Relay_log_info *rli)
4517
If the slave skip counter is 1, then we should not start executing
4520
return continue_group(rli);
4524
/**************************************************************************
4525
Execute_load_query_log_event methods
4526
**************************************************************************/
4529
Execute_load_query_log_event::
4530
Execute_load_query_log_event(THD *thd_arg, const char* query_arg,
4531
ulong query_length_arg, uint32_t fn_pos_start_arg,
4532
uint32_t fn_pos_end_arg,
4533
enum_load_dup_handling dup_handling_arg,
4534
bool using_trans, bool suppress_use,
4535
THD::killed_state killed_err_arg):
4536
Query_log_event(thd_arg, query_arg, query_length_arg, using_trans,
4537
suppress_use, killed_err_arg),
4538
file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg),
4539
fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
4544
Execute_load_query_log_event::
4545
Execute_load_query_log_event(const char* buf, uint32_t event_len,
4546
const Format_description_log_event* desc_event):
4547
Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT),
4548
file_id(0), fn_pos_start(0), fn_pos_end(0)
4550
if (!Query_log_event::is_valid())
4553
buf+= desc_event->common_header_len;
4555
fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET);
4556
fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET);
4557
dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET));
4559
if (fn_pos_start > q_len || fn_pos_end > q_len ||
4560
dup_handling > LOAD_DUP_REPLACE)
4563
file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET);
4567
ulong Execute_load_query_log_event::get_post_header_size_for_derived()
4569
return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN;
4574
Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
4576
unsigned char buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
4577
int4store(buf, file_id);
4578
int4store(buf + 4, fn_pos_start);
4579
int4store(buf + 4 + 4, fn_pos_end);
4580
*(buf + 4 + 4 + 4)= (unsigned char) dup_handling;
4581
return my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
4585
void Execute_load_query_log_event::pack_info(Protocol *protocol)
4588
if (!(buf= (char*) my_malloc(9 + db_len + q_len + 10 + 21, MYF(MY_WME))))
4593
pos= my_stpcpy(buf, "use `");
4594
memcpy(pos, db, db_len);
4595
pos= my_stpcpy(pos+db_len, "`; ");
4599
memcpy(pos, query, q_len);
4602
pos= my_stpcpy(pos, " ;file_id=");
4603
pos= int10_to_str((long) file_id, pos, 10);
4604
protocol->store(buf, pos-buf, &my_charset_bin);
4610
Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli)
4618
buf= (char*) my_malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
4619
(FN_REFLEN + 10) + 10 + 8 + 5, MYF(MY_WME));
4621
/* Replace filename and LOCAL keyword in query before executing it */
4624
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
4625
ER(ER_SLAVE_FATAL_ERROR),
4626
_("Not enough memory"));
4631
memcpy(p, query, fn_pos_start);
4633
fname= (p= strmake(p, STRING_WITH_LEN(" INFILE \'")));
4634
p= slave_load_file_stem(p, file_id, server_id, ".data");
4635
fname_end= p= strchr(p, '\0'); // Safer than p=p+5
4637
switch (dup_handling) {
4638
case LOAD_DUP_IGNORE:
4639
p= strmake(p, STRING_WITH_LEN(" IGNORE"));
4641
case LOAD_DUP_REPLACE:
4642
p= strmake(p, STRING_WITH_LEN(" REPLACE"));
4645
/* Ordinary load data */
4648
p= strmake(p, STRING_WITH_LEN(" INTO"));
4649
p= strmake(p, query+fn_pos_end, q_len-fn_pos_end);
4651
error= Query_log_event::do_apply_event(rli, buf, p-buf);
4653
/* Forging file name for deletion in same buffer */
4657
If there was an error the slave is going to stop, leave the
4658
file so that we can re-execute this event at START SLAVE.
4661
(void) my_delete(fname, MYF(MY_WME));
4668
/**************************************************************************
4670
**************************************************************************/
4673
sql_ex_info::write_data()
4676
bool sql_ex_info::write_data(IO_CACHE* file)
4680
return (write_str(file, field_term, (uint) field_term_len) ||
4681
write_str(file, enclosed, (uint) enclosed_len) ||
4682
write_str(file, line_term, (uint) line_term_len) ||
4683
write_str(file, line_start, (uint) line_start_len) ||
4684
write_str(file, escaped, (uint) escaped_len) ||
4685
my_b_safe_write(file,(unsigned char*) &opt_flags,1));
4690
@todo This is sensitive to field padding. We should write a
4691
char[7], not an old_sql_ex. /sven
4694
old_ex.field_term= *field_term;
4695
old_ex.enclosed= *enclosed;
4696
old_ex.line_term= *line_term;
4697
old_ex.line_start= *line_start;
4698
old_ex.escaped= *escaped;
4699
old_ex.opt_flags= opt_flags;
4700
old_ex.empty_flags=empty_flags;
4701
return my_b_safe_write(file, (unsigned char*) &old_ex, sizeof(old_ex)) != 0;
4710
const char *sql_ex_info::init(const char *buf, const char *buf_end,
4711
bool use_new_format)
4713
cached_new_format = use_new_format;
4718
The code below assumes that buf will not disappear from
4719
under our feet during the lifetime of the event. This assumption
4720
holds true in the slave thread if the log is in new format, but is not
4721
the case when we have old format because we will be reusing net buffer
4722
to read the actual file before we write out the Create_file event.
4724
if (read_str(&buf, buf_end, &field_term, &field_term_len) ||
4725
read_str(&buf, buf_end, &enclosed, &enclosed_len) ||
4726
read_str(&buf, buf_end, &line_term, &line_term_len) ||
4727
read_str(&buf, buf_end, &line_start, &line_start_len) ||
4728
read_str(&buf, buf_end, &escaped, &escaped_len))
4734
field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
4735
field_term = buf++; // Use first byte in string
4741
empty_flags= *buf++;
4742
if (empty_flags & FIELD_TERM_EMPTY)
4744
if (empty_flags & ENCLOSED_EMPTY)
4746
if (empty_flags & LINE_TERM_EMPTY)
4748
if (empty_flags & LINE_START_EMPTY)
4750
if (empty_flags & ESCAPED_EMPTY)
4757
/**************************************************************************
4758
Rows_log_event member functions
4759
**************************************************************************/
4761
Rows_log_event::Rows_log_event(THD *thd_arg, Table *tbl_arg, ulong tid,
4762
MY_BITMAP const *cols, bool is_transactional)
4763
: Log_event(thd_arg, 0, is_transactional),
4767
m_width(tbl_arg ? tbl_arg->s->fields : 1),
4768
m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
4769
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4772
We allow a special form of dummy event when the table, and cols
4773
are null and the table id is UINT32_MAX. This is a temporary
4774
solution, to be able to terminate a started statement in the
4775
binary log: the extraneous events will be removed in the future.
4777
assert((tbl_arg && tbl_arg->s && tid != UINT32_MAX) || (!tbl_arg && !cols && tid == UINT32_MAX));
4779
if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
4780
set_flags(NO_FOREIGN_KEY_CHECKS_F);
4781
if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
4782
set_flags(RELAXED_UNIQUE_CHECKS_F);
4783
/* if bitmap_init fails, caught in is_valid() */
4784
if (likely(!bitmap_init(&m_cols,
4785
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4789
/* Cols can be zero if this is a dummy binrows event */
4790
if (likely(cols != NULL))
4792
memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
4793
create_last_word_mask(&m_cols);
4798
// Needed because bitmap_init() does not set it to null on failure
4804
Rows_log_event::Rows_log_event(const char *buf, uint32_t event_len,
4805
Log_event_type event_type,
4806
const Format_description_log_event
4808
: Log_event(buf, description_event),
4811
m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
4812
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4814
uint8_t const common_header_len= description_event->common_header_len;
4815
uint8_t const post_header_len= description_event->post_header_len[event_type-1];
4817
const char *post_start= buf + common_header_len;
4818
post_start+= RW_MAPID_OFFSET;
4819
if (post_header_len == 6)
4821
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
4822
m_table_id= uint4korr(post_start);
4827
m_table_id= (ulong) uint6korr(post_start);
4828
post_start+= RW_FLAGS_OFFSET;
4831
m_flags= uint2korr(post_start);
4833
unsigned char const *const var_start=
4834
(const unsigned char *)buf + common_header_len + post_header_len;
4835
unsigned char const *const ptr_width= var_start;
4836
unsigned char *ptr_after_width= (unsigned char*) ptr_width;
4837
m_width = net_field_length(&ptr_after_width);
4838
/* if bitmap_init fails, catched in is_valid() */
4839
if (likely(!bitmap_init(&m_cols,
4840
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4844
memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
4845
create_last_word_mask(&m_cols);
4846
ptr_after_width+= (m_width + 7) / 8;
4850
// Needed because bitmap_init() does not set it to null on failure
4851
m_cols.bitmap= NULL;
4855
m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
4857
if (event_type == UPDATE_ROWS_EVENT)
4859
/* if bitmap_init fails, caught in is_valid() */
4860
if (likely(!bitmap_init(&m_cols_ai,
4861
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
4865
memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
4866
create_last_word_mask(&m_cols_ai);
4867
ptr_after_width+= (m_width + 7) / 8;
4871
// Needed because bitmap_init() does not set it to null on failure
4872
m_cols_ai.bitmap= 0;
4877
const unsigned char* const ptr_rows_data= (const unsigned char*) ptr_after_width;
4879
size_t const data_size= event_len - (ptr_rows_data - (const unsigned char *) buf);
4881
m_rows_buf= (unsigned char*) my_malloc(data_size, MYF(MY_WME));
4882
if (likely((bool)m_rows_buf))
4884
m_curr_row= m_rows_buf;
4885
m_rows_end= m_rows_buf + data_size;
4886
m_rows_cur= m_rows_end;
4887
memcpy(m_rows_buf, ptr_rows_data, data_size);
4890
m_cols.bitmap= 0; // to not free it
4895
Rows_log_event::~Rows_log_event()
4897
if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
4898
m_cols.bitmap= 0; // so no free in bitmap_free
4899
bitmap_free(&m_cols); // To pair with bitmap_init().
4900
free((unsigned char*)m_rows_buf);
4903
int Rows_log_event::get_data_size()
4905
int const type_code= get_type_code();
4907
unsigned char buf[sizeof(m_width)+1];
4908
unsigned char *end= net_store_length(buf, (m_width + 7) / 8);
4910
int data_size= ROWS_HEADER_LEN;
4911
data_size+= no_bytes_in_map(&m_cols);
4912
data_size+= end - buf;
4914
if (type_code == UPDATE_ROWS_EVENT)
4915
data_size+= no_bytes_in_map(&m_cols_ai);
4917
data_size+= (m_rows_cur - m_rows_buf);
4922
int Rows_log_event::do_add_row_data(unsigned char *row_data, size_t length)
4925
When the table has a primary key, we would probably want, by default, to
4926
log only the primary key value instead of the entire "before image". This
4927
would save binlog space. TODO
4931
If length is zero, there is nothing to write, so we just
4932
return. Note that this is not an optimization, since calling
4933
realloc() with size 0 means free().
4941
assert(m_rows_buf <= m_rows_cur);
4942
assert(!m_rows_buf || (m_rows_end && m_rows_buf <= m_rows_end));
4943
assert(m_rows_cur <= m_rows_end);
4945
/* The cast will always work since m_rows_cur <= m_rows_end */
4946
if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
4948
size_t const block_size= 1024;
4949
my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
4950
my_ptrdiff_t const new_alloc=
4951
block_size * ((cur_size + length + block_size - 1) / block_size);
4953
unsigned char* const new_buf= (unsigned char*)my_realloc((unsigned char*)m_rows_buf, (uint) new_alloc,
4954
MYF(MY_ALLOW_ZERO_PTR|MY_WME));
4955
if (unlikely(!new_buf))
4956
return(HA_ERR_OUT_OF_MEM);
4958
/* If the memory moved, we need to move the pointers */
4959
if (new_buf != m_rows_buf)
4961
m_rows_buf= new_buf;
4962
m_rows_cur= m_rows_buf + cur_size;
4966
The end pointer should always be changed to point to the end of
4967
the allocated memory.
4969
m_rows_end= m_rows_buf + new_alloc;
4972
assert(m_rows_cur + length <= m_rows_end);
4973
memcpy(m_rows_cur, row_data, length);
4974
m_rows_cur+= length;
4979
int Rows_log_event::do_apply_event(Relay_log_info const *rli)
4983
If m_table_id == UINT32_MAX, then we have a dummy event that does not
4984
contain any data. In that case, we just remove all tables in the
4985
tables_to_lock list, close the thread tables, and return with
4988
if (m_table_id == UINT32_MAX)
4991
This one is supposed to be set: just an extra check so that
4992
nothing strange has happened.
4994
assert(get_flags(STMT_END_F));
4996
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4997
close_thread_tables(thd);
5003
'thd' has been set by exec_relay_log_event(), just before calling
5004
do_apply_event(). We still check here to prevent future coding
5007
assert(rli->sql_thd == thd);
5010
If there is no locks taken, this is the first binrow event seen
5011
after the table map events. We should then lock all the tables
5012
used in the transaction and proceed with execution of the actual
5017
bool need_reopen= 1; /* To execute the first lap of the loop below */
5020
lock_tables() reads the contents of thd->lex, so they must be
5021
initialized. Contrary to in
5022
Table_map_log_event::do_apply_event() we don't call
5023
mysql_init_query() as that may reset the binlog format.
5028
There are a few flags that are replicated with each row event.
5029
Make sure to set/clear them before executing the main body of
5032
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
5033
thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
5035
thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
5037
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
5038
thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
5040
thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
5041
/* A small test to verify that objects have consistent types */
5042
assert(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
5045
while ((error= lock_tables(thd, rli->tables_to_lock,
5046
rli->tables_to_lock_count, &need_reopen)))
5050
if (thd->is_slave_error || thd->is_fatal_error)
5053
Error reporting borrowed from Query_log_event with many excessive
5054
simplifications (we don't honour --slave-skip-errors)
5056
uint32_t actual_error= thd->main_da.sql_errno();
5057
rli->report(ERROR_LEVEL, actual_error,
5058
_("Error '%s' in %s event: when locking tables"),
5060
? thd->main_da.message()
5061
: _("unexpected success or fatal error")),
5063
thd->is_fatal_error= 1;
5067
rli->report(ERROR_LEVEL, error,
5068
_("Error in %s event: when locking tables"),
5071
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5076
So we need to reopen the tables.
5078
We need to flush the pending RBR event, since it keeps a
5079
pointer to an open table.
5081
ALTERNATIVE SOLUTION (not implemented): Extract a pointer to
5082
the pending RBR event and reset the table pointer after the
5083
tables has been reopened.
5085
NOTE: For this new scheme there should be no pending event:
5086
need to add code to assert that is the case.
5088
thd->binlog_flush_pending_rows_event(false);
5089
TableList *tables= rli->tables_to_lock;
5090
close_tables_for_reopen(thd, &tables);
5092
uint32_t tables_count= rli->tables_to_lock_count;
5093
if ((error= open_tables(thd, &tables, &tables_count, 0)))
5095
if (thd->is_slave_error || thd->is_fatal_error)
5098
Error reporting borrowed from Query_log_event with many excessive
5099
simplifications (we don't honour --slave-skip-errors)
5101
uint32_t actual_error= thd->main_da.sql_errno();
5102
rli->report(ERROR_LEVEL, actual_error,
5103
_("Error '%s' on reopening tables"),
5105
? thd->main_da.message()
5106
: _("unexpected success or fatal error")));
5107
thd->is_slave_error= 1;
5109
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5115
When the open and locking succeeded, we check all tables to
5116
ensure that they still have the correct type.
5118
We can use a down cast here since we know that every table added
5119
to the tables_to_lock is a RPL_TableList.
5123
RPL_TableList *ptr= rli->tables_to_lock;
5124
for ( ; ptr ; ptr= static_cast<RPL_TableList*>(ptr->next_global))
5126
if (ptr->m_tabledef.compatible_with(rli, ptr->table))
5128
mysql_unlock_tables(thd, thd->lock);
5130
thd->is_slave_error= 1;
5131
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5132
return(ERR_BAD_TABLE_DEF);
5138
... and then we add all the tables to the table map and remove
5139
them from tables to lock.
5141
We also invalidate the query cache for all the tables, since
5142
they will now be changed.
5144
TODO [/Matz]: Maybe the query cache should not be invalidated
5145
here? It might be that a table is not changed, even though it
5146
was locked for the statement. We do know that each
5147
Rows_log_event contain at least one row, so after processing one
5148
Rows_log_event, we can invalidate the query cache for the
5151
for (TableList *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
5153
const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
5159
m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
5164
table == NULL means that this table should not be replicated
5165
(this was set up by Table_map_log_event::do_apply_event()
5166
which tested replicate-* rules).
5170
It's not needed to set_time() but
5171
1) it continues the property that "Time" in SHOW PROCESSLIST shows how
5172
much slave is behind
5173
2) it will be needed when we allow replication from a table with no
5174
TIMESTAMP column to a table with one.
5175
So we call set_time(), like in SBR. Presently it changes nothing.
5177
thd->set_time((time_t)when);
5179
There are a few flags that are replicated with each row event.
5180
Make sure to set/clear them before executing the main body of
5183
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
5184
thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
5186
thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
5188
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
5189
thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
5191
thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
5193
if (slave_allow_batching)
5194
thd->options|= OPTION_ALLOW_BATCH;
5196
thd->options&= ~OPTION_ALLOW_BATCH;
5198
/* A small test to verify that objects have consistent types */
5199
assert(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
5202
Now we are in a statement and will stay in a statement until we
5205
We set this flag here, before actually applying any rows, in
5206
case the SQL thread is stopped and we need to detect that we're
5207
inside a statement and halting abruptly might cause problems
5210
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
5212
if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
5213
set_flags(COMPLETE_ROWS_F);
5216
Set tables write and read sets.
5218
Read_set contains all slave columns (in case we are going to fetch
5219
a complete record from slave)
5221
Write_set equals the m_cols bitmap sent from master but it can be
5222
longer if slave has extra columns.
5225
bitmap_set_all(table->read_set);
5226
bitmap_set_all(table->write_set);
5227
if (!get_flags(COMPLETE_ROWS_F))
5228
bitmap_intersect(table->write_set,&m_cols);
5230
this->slave_exec_mode= slave_exec_mode_options; // fix the mode
5232
// Do event specific preparations
5233
error= do_before_row_operations(rli);
5235
// row processing loop
5237
while (error == 0 && m_curr_row < m_rows_end)
5239
/* in_use can have been set to NULL in close_tables_for_reopen */
5240
THD* old_thd= table->in_use;
5244
error= do_exec_row(rli);
5246
table->in_use = old_thd;
5252
The following list of "idempotent" errors
5253
means that an error from the list might happen
5254
because of idempotent (more than once)
5255
applying of a binlog file.
5256
Notice, that binlog has a ddl operation its
5257
second applying may cause
5259
case HA_ERR_TABLE_DEF_CHANGED:
5260
case HA_ERR_CANNOT_ADD_FOREIGN:
5262
which are not included into to the list.
5264
case HA_ERR_RECORD_CHANGED:
5265
case HA_ERR_RECORD_DELETED:
5266
case HA_ERR_KEY_NOT_FOUND:
5267
case HA_ERR_END_OF_FILE:
5268
case HA_ERR_FOUND_DUPP_KEY:
5269
case HA_ERR_FOUND_DUPP_UNIQUE:
5270
case HA_ERR_FOREIGN_DUPLICATE_KEY:
5271
case HA_ERR_NO_REFERENCED_ROW:
5272
case HA_ERR_ROW_IS_REFERENCED:
5273
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5275
if (global_system_variables.log_warnings)
5276
slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
5278
RPL_LOG_NAME, (ulong) log_pos);
5284
thd->is_slave_error= 1;
5289
If m_curr_row_end was not set during event execution (e.g., because
5290
of errors) we can't proceed to the next row. If the error is transient
5291
(i.e., error==0 at this point) we must call unpack_current_row() to set
5294
if (!m_curr_row_end && !error)
5295
unpack_current_row(rli, &m_cols);
5297
// at this moment m_curr_row_end should be set
5298
assert(error || m_curr_row_end != NULL);
5299
assert(error || m_curr_row < m_curr_row_end);
5300
assert(error || m_curr_row_end <= m_rows_end);
5302
m_curr_row= m_curr_row_end;
5304
} // row processing loop
5306
error= do_after_row_operations(rli, error);
5309
thd->options|= OPTION_KEEP_LOG;
5314
We need to delay this clear until here bacause unpack_current_row() uses
5315
master-side table definitions stored in rli.
5317
if (rli->tables_to_lock && get_flags(STMT_END_F))
5318
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5319
/* reset OPTION_ALLOW_BATCH as not affect later events */
5320
thd->options&= ~OPTION_ALLOW_BATCH;
5323
{ /* error has occured during the transaction */
5324
slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
5325
get_type_str(), RPL_LOG_NAME, (ulong) log_pos);
5330
If one day we honour --skip-slave-errors in row-based replication, and
5331
the error should be skipped, then we would clear mappings, rollback,
5332
close tables, but the slave SQL thread would not stop and then may
5333
assume the mapping is still available, the tables are still open...
5334
So then we should clear mappings/rollback/close here only if this is a
5336
For now we code, knowing that error is not skippable and so slave SQL
5337
thread is certainly going to stop.
5338
rollback at the caller along with sbr.
5340
thd->reset_current_stmt_binlog_row_based();
5341
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
5342
thd->is_slave_error= 1;
5347
This code would ideally be placed in do_update_pos() instead, but
5348
since we have no access to table there, we do the setting of
5349
last_event_start_time here instead.
5351
if (table && (table->s->primary_key == MAX_KEY) &&
5352
!cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS)
5355
------------ Temporary fix until WL#2975 is implemented ---------
5357
This event is not the last one (no STMT_END_F). If we stop now
5358
(in case of terminate_slave_thread()), how will we restart? We
5359
have to restart from Table_map_log_event, but as this table is
5360
not transactional, the rows already inserted will still be
5361
present, and idempotency is not guaranteed (no PK) so we risk
5362
that repeating leads to double insert. So we desperately try to
5363
continue, hope we'll eventually leave this buggy situation (by
5364
executing the final Rows_log_event). If we are in a hopeless
5365
wait (reached end of last relay log and nothing gets appended
5366
there), we timeout after one minute, and notify DBA about the
5367
problem. When WL#2975 is implemented, just remove the member
5368
Relay_log_info::last_event_start_time and all its occurrences.
5370
const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
5376
Log_event::enum_skip_reason
5377
Rows_log_event::do_shall_skip(Relay_log_info *rli)
5380
If the slave skip counter is 1 and this event does not end a
5381
statement, then we should not start executing on the next event.
5382
Otherwise, we defer the decision to the normal skipping logic.
5384
if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
5385
return Log_event::EVENT_SKIP_IGNORE;
5387
return Log_event::do_shall_skip(rli);
5391
Rows_log_event::do_update_pos(Relay_log_info *rli)
5395
if (get_flags(STMT_END_F))
5398
This is the end of a statement or transaction, so close (and
5399
unlock) the tables we opened when processing the
5400
Table_map_log_event starting the statement.
5402
OBSERVER. This will clear *all* mappings, not only those that
5403
are open for the table. There is not good handle for on-close
5406
NOTE. Even if we have no table ('table' == 0) we still need to be
5407
here, so that we increase the group relay log position. If we didn't, we
5408
could have a group relay log position which lags behind "forever"
5409
(assume the last master's transaction is ignored by the slave because of
5410
replicate-ignore rules).
5412
thd->binlog_flush_pending_rows_event(true);
5415
If this event is not in a transaction, the call below will, if some
5416
transactional storage engines are involved, commit the statement into
5417
them and flush the pending event to binlog.
5418
If this event is in a transaction, the call will do nothing, but a
5419
Xid_log_event will come next which will, if some transactional engines
5420
are involved, commit the transaction and flush the pending event to the
5423
error= ha_autocommit_or_rollback(thd, 0);
5426
Now what if this is not a transactional engine? we still need to
5427
flush the pending event to the binlog; we did it with
5428
thd->binlog_flush_pending_rows_event(). Note that we imitate
5429
what is done for real queries: a call to
5430
ha_autocommit_or_rollback() (sometimes only if involves a
5431
transactional engine), and a call to be sure to have the pending
5435
thd->reset_current_stmt_binlog_row_based();
5437
rli->cleanup_context(thd, 0);
5441
Indicate that a statement is finished.
5442
Step the group log position if we are not in a transaction,
5443
otherwise increase the event log position.
5445
rli->stmt_done(log_pos, when);
5448
Clear any errors pushed in thd->net.last_err* if for example "no key
5449
found" (as this is allowed). This is a safety measure; apparently
5450
those errors (e.g. when executing a Delete_rows_log_event of a
5451
non-existing row, like in rpl_row_mystery22.test,
5452
thd->net.last_error = "Can't find record in 't1'" and last_errno=1032)
5453
do not become visible. We still prefer to wipe them out.
5458
rli->report(ERROR_LEVEL, error,
5459
_("Error in %s event: commit of row events failed, "
5461
get_type_str(), m_table->s->db.str,
5462
m_table->s->table_name.str);
5466
rli->inc_event_relay_log_pos();
5472
bool Rows_log_event::write_data_header(IO_CACHE *file)
5474
unsigned char buf[ROWS_HEADER_LEN]; // No need to init the buffer
5475
assert(m_table_id != UINT32_MAX);
5476
int6store(buf + RW_MAPID_OFFSET, (uint64_t)m_table_id);
5477
int2store(buf + RW_FLAGS_OFFSET, m_flags);
5478
return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
5481
bool Rows_log_event::write_data_body(IO_CACHE*file)
5484
Note that this should be the number of *bits*, not the number of
5487
unsigned char sbuf[sizeof(m_width)];
5488
my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
5490
unsigned char *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
5491
assert(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
5493
res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
5495
res= res || my_b_safe_write(file, (unsigned char*) m_cols.bitmap,
5496
no_bytes_in_map(&m_cols));
5498
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
5500
if (get_type_code() == UPDATE_ROWS_EVENT)
5502
res= res || my_b_safe_write(file, (unsigned char*) m_cols_ai.bitmap,
5503
no_bytes_in_map(&m_cols_ai));
5505
res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
5512
void Rows_log_event::pack_info(Protocol *protocol)
5515
char const *const flagstr=
5516
get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
5517
size_t bytes= snprintf(buf, sizeof(buf),
5518
"table_id: %lu%s", m_table_id, flagstr);
5519
protocol->store(buf, bytes, &my_charset_bin);
5523
/**************************************************************************
5524
Table_map_log_event member functions and support functions
5525
**************************************************************************/
5528
@page How replication of field metadata works.
5530
When a table map is created, the master first calls
5531
Table_map_log_event::save_field_metadata() which calculates how many
5532
values will be in the field metadata. Only those fields that require the
5533
extra data are added. The method also loops through all of the fields in
5534
the table calling the method Field::save_field_metadata() which returns the
5535
values for the field that will be saved in the metadata and replicated to
5536
the slave. Once all fields have been processed, the table map is written to
5537
the binlog adding the size of the field metadata and the field metadata to
5538
the end of the body of the table map.
5540
When a table map is read on the slave, the field metadata is read from the
5541
table map and passed to the table_def class constructor which saves the
5542
field metadata from the table map into an array based on the type of the
5543
field. Field metadata values not present (those fields that do not use extra
5544
data) in the table map are initialized as zero (0). The array size is the
5545
same as the columns for the table on the slave.
5547
Additionally, values saved for field metadata on the master are saved as a
5548
string of bytes (unsigned char) in the binlog. A field may require 1 or more bytes
5549
to store the information. In cases where values require multiple bytes
5550
(e.g. values > 255), the endian-safe methods are used to properly encode
5551
the values on the master and decode them on the slave. When the field
5552
metadata values are captured on the slave, they are stored in an array of
5553
type uint16_t. This allows the least number of casts to prevent casting bugs
5554
when the field metadata is used in comparisons of field attributes. When
5555
the field metadata is used for calculating addresses in pointer math, the
5556
type used is uint32_t.
5560
Save the field metadata based on the real_type of the field.
5561
The metadata saved depends on the type of the field. Some fields
5562
store a single byte for pack_length() while others store two bytes
5563
for field_length (max length).
5568
We may want to consider changing the encoding of the information.
5569
Currently, the code attempts to minimize the number of bytes written to
5570
the tablemap. There are at least two other alternatives; 1) using
5571
net_store_length() to store the data allowing it to choose the number of
5572
bytes that are appropriate thereby making the code much easier to
5573
maintain (only 1 place to change the encoding), or 2) use a fixed number
5574
of bytes for each field. The problem with option 1 is that net_store_length()
5575
will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
5576
for fields like CHAR which can be no larger than 255 characters, the method
5577
will use 3 bytes when the value is > 250. Further, every value that is
5578
encoded using 2 parts (e.g., pack_length, field_length) will be numerically
5579
> 250 therefore will use 3 bytes for eah value. The problem with option 2
5580
is less wasteful for space but does waste 1 byte for every field that does
5583
int Table_map_log_event::save_field_metadata()
5586
for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
5587
index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
5592
Constructor used to build an event for writing to the binary log.
5593
Mats says tbl->s lives longer than this event so it's ok to copy pointers
5594
(tbl->s->db etc) and not pointer content.
5596
Table_map_log_event::Table_map_log_event(THD *thd, Table *tbl, ulong tid,
5597
bool is_transactional __attribute__((unused)),
5599
: Log_event(thd, 0, true),
5601
m_dbnam(tbl->s->db.str),
5602
m_dblen(m_dbnam ? tbl->s->db.length : 0),
5603
m_tblnam(tbl->s->table_name.str),
5604
m_tbllen(tbl->s->table_name.length),
5605
m_colcnt(tbl->s->fields),
5610
m_field_metadata(0),
5611
m_field_metadata_size(0),
5615
assert(m_table_id != UINT32_MAX);
5617
In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
5618
table.cc / alloc_table_share():
5619
Use the fact the key is db/0/table_name/0
5620
As we rely on this let's assert it.
5622
assert((tbl->s->db.str == 0) ||
5623
(tbl->s->db.str[tbl->s->db.length] == 0));
5624
assert(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
5627
m_data_size= TABLE_MAP_HEADER_LEN;
5628
m_data_size+= m_dblen + 2; // Include length and terminating \0
5629
m_data_size+= m_tbllen + 2; // Include length and terminating \0
5630
m_data_size+= 1 + m_colcnt; // COLCNT and column types
5632
/* If malloc fails, caught in is_valid() */
5633
if ((m_memory= (unsigned char*) my_malloc(m_colcnt, MYF(MY_WME))))
5635
m_coltype= reinterpret_cast<unsigned char*>(m_memory);
5636
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
5637
m_coltype[i]= m_table->field[i]->type();
5641
Calculate a bitmap for the results of maybe_null() for all columns.
5642
The bitmap is used to determine when there is a column from the master
5643
that is not on the slave and is null and thus not in the row data during
5646
uint32_t num_null_bytes= (m_table->s->fields + 7) / 8;
5647
m_data_size+= num_null_bytes;
5648
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
5649
&m_null_bits, num_null_bytes,
5650
&m_field_metadata, (m_colcnt * 2),
5653
memset(m_field_metadata, 0, (m_colcnt * 2));
5656
Create an array for the field metadata and store it.
5658
m_field_metadata_size= save_field_metadata();
5659
assert(m_field_metadata_size <= (m_colcnt * 2));
5662
Now set the size of the data to the size of the field metadata array
5663
plus one or two bytes for number of elements in the field metadata array.
5665
if (m_field_metadata_size > 255)
5666
m_data_size+= m_field_metadata_size + 2;
5668
m_data_size+= m_field_metadata_size + 1;
5670
memset(m_null_bits, 0, num_null_bytes);
5671
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
5672
if (m_table->field[i]->maybe_null())
5673
m_null_bits[(i / 8)]+= 1 << (i % 8);
5679
Constructor used by slave to read the event from the binary log.
5681
Table_map_log_event::Table_map_log_event(const char *buf, uint32_t event_len,
5682
const Format_description_log_event
5685
: Log_event(buf, description_event),
5687
m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
5688
m_colcnt(0), m_coltype(0),
5689
m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
5690
m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
5691
m_null_bits(0), m_meta_memory(NULL)
5693
unsigned int bytes_read= 0;
5695
uint8_t common_header_len= description_event->common_header_len;
5696
uint8_t post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
5698
/* Read the post-header */
5699
const char *post_start= buf + common_header_len;
5701
post_start+= TM_MAPID_OFFSET;
5702
if (post_header_len == 6)
5704
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
5705
m_table_id= uint4korr(post_start);
5710
assert(post_header_len == TABLE_MAP_HEADER_LEN);
5711
m_table_id= (ulong) uint6korr(post_start);
5712
post_start+= TM_FLAGS_OFFSET;
5715
assert(m_table_id != UINT32_MAX);
5717
m_flags= uint2korr(post_start);
5719
/* Read the variable part of the event */
5720
const char *const vpart= buf + common_header_len + post_header_len;
5722
/* Extract the length of the various parts from the buffer */
5723
unsigned char const *const ptr_dblen= (unsigned char const*)vpart + 0;
5724
m_dblen= *(unsigned char*) ptr_dblen;
5726
/* Length of database name + counter + terminating null */
5727
unsigned char const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
5728
m_tbllen= *(unsigned char*) ptr_tbllen;
5730
/* Length of table name + counter + terminating null */
5731
unsigned char const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
5732
unsigned char *ptr_after_colcnt= (unsigned char*) ptr_colcnt;
5733
m_colcnt= net_field_length(&ptr_after_colcnt);
5735
/* Allocate mem for all fields in one go. If fails, caught in is_valid() */
5736
m_memory= (unsigned char*) my_multi_malloc(MYF(MY_WME),
5737
&m_dbnam, (uint) m_dblen + 1,
5738
&m_tblnam, (uint) m_tbllen + 1,
5739
&m_coltype, (uint) m_colcnt,
5744
/* Copy the different parts into their memory */
5745
strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
5746
strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
5747
memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
5749
ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
5750
bytes_read= ptr_after_colcnt - (unsigned char *)buf;
5751
if (bytes_read < event_len)
5753
m_field_metadata_size= net_field_length(&ptr_after_colcnt);
5754
assert(m_field_metadata_size <= (m_colcnt * 2));
5755
uint32_t num_null_bytes= (m_colcnt + 7) / 8;
5756
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
5757
&m_null_bits, num_null_bytes,
5758
&m_field_metadata, m_field_metadata_size,
5760
memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
5761
ptr_after_colcnt= (unsigned char*)ptr_after_colcnt + m_field_metadata_size;
5762
memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
5769
Table_map_log_event::~Table_map_log_event()
5771
free(m_meta_memory);
5776
Return value is an error code, one of:
5778
-1 Failure to open table [from open_tables()]
5780
1 No room for more tables [from set_table()]
5781
2 Out of memory [from set_table()]
5782
3 Wrong table definition
5783
4 Daisy-chaining RBR with SBR not possible
5786
int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
5788
RPL_TableList *table_list;
5789
char *db_mem, *tname_mem;
5792
assert(rli->sql_thd == thd);
5794
/* Step the query id to mark what columns that are actually used. */
5795
pthread_mutex_lock(&LOCK_thread_count);
5796
thd->query_id= next_query_id();
5797
pthread_mutex_unlock(&LOCK_thread_count);
5799
if (!(memory= my_multi_malloc(MYF(MY_WME),
5800
&table_list, (uint) sizeof(RPL_TableList),
5801
&db_mem, (uint) NAME_LEN + 1,
5802
&tname_mem, (uint) NAME_LEN + 1,
5804
return(HA_ERR_OUT_OF_MEM);
5806
memset(table_list, 0, sizeof(*table_list));
5807
table_list->db = db_mem;
5808
table_list->alias= table_list->table_name = tname_mem;
5809
table_list->lock_type= TL_WRITE;
5810
table_list->next_global= table_list->next_local= 0;
5811
table_list->table_id= m_table_id;
5812
table_list->updating= 1;
5813
my_stpcpy(table_list->db, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
5814
my_stpcpy(table_list->table_name, m_tblnam);
5818
if (!rpl_filter->db_ok(table_list->db) ||
5819
(rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list)))
5826
open_tables() reads the contents of thd->lex, so they must be
5827
initialized, so we should call lex_start(); to be even safer, we
5828
call mysql_init_query() which does a more complete set of inits.
5831
mysql_reset_thd_for_next_command(thd);
5833
Check if the slave is set to use SBR. If so, it should switch
5834
to using RBR until the end of the "statement", i.e., next
5835
STMT_END_F or next error.
5837
if (!thd->current_stmt_binlog_row_based &&
5838
mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG))
5840
thd->set_current_stmt_binlog_row_based();
5844
Open the table if it is not already open and add the table to
5845
table map. Note that for any table that should not be
5846
replicated, a filter is needed.
5848
The creation of a new TableList is used to up-cast the
5849
table_list consisting of RPL_TableList items. This will work
5850
since the only case where the argument to open_tables() is
5851
changed, is when thd->lex->query_tables == table_list, i.e.,
5852
when the statement requires prelocking. Since this is not
5853
executed when a statement is executed, this case will not occur.
5854
As a precaution, an assertion is added to ensure that the bad
5857
Either way, the memory in the list is *never* released
5858
internally in the open_tables() function, hence we take a copy
5859
of the pointer to make sure that it's not lost.
5862
assert(thd->lex->query_tables != table_list);
5863
TableList *tmp_table_list= table_list;
5864
if ((error= open_tables(thd, &tmp_table_list, &count, 0)))
5866
if (thd->is_slave_error || thd->is_fatal_error)
5869
Error reporting borrowed from Query_log_event with many excessive
5870
simplifications (we don't honour --slave-skip-errors)
5872
uint32_t actual_error= thd->main_da.sql_errno();
5873
rli->report(ERROR_LEVEL, actual_error,
5874
_("Error '%s' on opening table `%s`.`%s`"),
5876
? thd->main_da.message()
5877
: _("unexpected success or fatal error")),
5878
table_list->db, table_list->table_name);
5879
thd->is_slave_error= 1;
5884
m_table= table_list->table;
5887
This will fail later otherwise, the 'in_use' field should be
5888
set to the current thread.
5890
assert(m_table->in_use);
5893
Use placement new to construct the table_def instance in the
5894
memory allocated for it inside table_list.
5896
The memory allocated by the table_def structure (i.e., not the
5897
memory allocated *for* the table_def structure) is released
5898
inside Relay_log_info::clear_tables_to_lock() by calling the
5899
table_def destructor explicitly.
5901
new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt,
5902
m_field_metadata, m_field_metadata_size, m_null_bits);
5903
table_list->m_tabledef_valid= true;
5906
We record in the slave's information that the table should be
5907
locked by linking the table into the list of tables to lock.
5909
table_list->next_global= table_list->next_local= rli->tables_to_lock;
5910
const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
5911
const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
5912
/* 'memory' is freed in clear_tables_to_lock */
5922
Log_event::enum_skip_reason
5923
Table_map_log_event::do_shall_skip(Relay_log_info *rli)
5926
If the slave skip counter is 1, then we should not start executing
5929
return continue_group(rli);
5932
int Table_map_log_event::do_update_pos(Relay_log_info *rli)
5934
rli->inc_event_relay_log_pos();
5939
bool Table_map_log_event::write_data_header(IO_CACHE *file)
5941
assert(m_table_id != UINT32_MAX);
5942
unsigned char buf[TABLE_MAP_HEADER_LEN];
5943
int6store(buf + TM_MAPID_OFFSET, (uint64_t)m_table_id);
5944
int2store(buf + TM_FLAGS_OFFSET, m_flags);
5945
return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
5948
bool Table_map_log_event::write_data_body(IO_CACHE *file)
5950
assert(m_dbnam != NULL);
5951
assert(m_tblnam != NULL);
5952
/* We use only one byte per length for storage in event: */
5953
assert(m_dblen < 128);
5954
assert(m_tbllen < 128);
5956
unsigned char const dbuf[]= { (unsigned char) m_dblen };
5957
unsigned char const tbuf[]= { (unsigned char) m_tbllen };
5959
unsigned char cbuf[sizeof(m_colcnt)];
5960
unsigned char *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
5961
assert(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
5964
Store the size of the field metadata.
5966
unsigned char mbuf[sizeof(m_field_metadata_size)];
5967
unsigned char *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
5969
return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
5970
my_b_safe_write(file, (const unsigned char*)m_dbnam, m_dblen+1) ||
5971
my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
5972
my_b_safe_write(file, (const unsigned char*)m_tblnam, m_tbllen+1) ||
5973
my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
5974
my_b_safe_write(file, m_coltype, m_colcnt) ||
5975
my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
5976
my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
5977
my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
5982
Print some useful information for the SHOW BINARY LOG information
5986
void Table_map_log_event::pack_info(Protocol *protocol)
5989
size_t bytes= snprintf(buf, sizeof(buf),
5990
"table_id: %lu (%s.%s)",
5991
m_table_id, m_dbnam, m_tblnam);
5992
protocol->store(buf, bytes, &my_charset_bin);
5996
/**************************************************************************
5997
Write_rows_log_event member functions
5998
**************************************************************************/
6001
Constructor used to build an event for writing to the binary log.
6003
Write_rows_log_event::Write_rows_log_event(THD *thd_arg, Table *tbl_arg,
6005
bool is_transactional)
6006
: Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
6011
Constructor used by slave to read the event from the binary log.
6013
Write_rows_log_event::Write_rows_log_event(const char *buf, uint32_t event_len,
6014
const Format_description_log_event
6016
: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
6021
Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6026
todo: to introduce a property for the event (handler?) which forces
6027
applying the event in the replace (idempotent) fashion.
6029
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
6032
We are using REPLACE semantics and not INSERT IGNORE semantics
6033
when writing rows, that is: new rows replace old rows. We need to
6034
inform the storage engine that it should use this behaviour.
6037
/* Tell the storage engine that we are using REPLACE semantics. */
6038
thd->lex->duplicates= DUP_REPLACE;
6041
Pretend we're executing a REPLACE command: this is needed for
6042
InnoDB since it is not (properly) checking the
6043
lex->duplicates flag.
6045
thd->lex->sql_command= SQLCOM_REPLACE;
6047
Do not raise the error flag in case of hitting to an unique attribute
6049
m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
6052
m_table->file->ha_start_bulk_insert(0);
6054
We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
6055
any TIMESTAMP column with data from the row but instead will use
6056
the event's current time.
6057
As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
6058
columns, we know that all TIMESTAMP columns on slave will receive explicit
6059
data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
6060
When we allow a table without TIMESTAMP to be replicated to a table having
6061
more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
6062
column to be replicated into a BIGINT column and the slave's table has a
6063
TIMESTAMP column, then the slave's TIMESTAMP column will take its value
6064
from set_time() which we called earlier (consistent with SBR). And then in
6065
some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
6066
analyze if explicit data is provided for slave's TIMESTAMP columns).
6068
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
6074
Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6078
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
6080
m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
6081
m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
6083
resetting the extra with
6084
table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
6086
explanation: file->reset() performs this duty
6087
ultimately. Still todo: fix
6090
if ((local_error= m_table->file->ha_end_bulk_insert()))
6092
m_table->file->print_error(local_error, MYF(0));
6094
return error? error : local_error;
6099
Check if there are more UNIQUE keys after the given key.
6102
last_uniq_key(Table *table, uint32_t keyno)
6104
while (++keyno < table->s->keys)
6105
if (table->key_info[keyno].flags & HA_NOSAME)
6111
Check if an error is a duplicate key error.
6113
This function is used to check if an error code is one of the
6114
duplicate key error, i.e., and error code for which it is sensible
6115
to do a <code>get_dup_key()</code> to retrieve the duplicate key.
6117
@param errcode The error code to check.
6119
@return <code>true</code> if the error code is such that
6120
<code>get_dup_key()</code> will return true, <code>false</code>
6124
is_duplicate_key_error(int errcode)
6128
case HA_ERR_FOUND_DUPP_KEY:
6129
case HA_ERR_FOUND_DUPP_UNIQUE:
6136
Write the current row into event's table.
6138
The row is located in the row buffer, pointed by @c m_curr_row member.
6139
Number of columns of the row is stored in @c m_width member (it can be
6140
different from the number of columns in the table to which we insert).
6141
Bitmap @c m_cols indicates which columns are present in the row. It is assumed
6142
that event's table is already open and pointed by @c m_table.
6144
If the same record already exists in the table it can be either overwritten
6145
or an error is reported depending on the value of @c overwrite flag
6146
(error reporting not yet implemented). Note that the matching record can be
6147
different from the row we insert if we use primary keys to identify records in
6150
The row to be inserted can contain values only for selected columns. The
6151
missing columns are filled with default values using @c prepare_record()
6152
function. If a matching record is found in the table and @c overwritte is
6153
true, the missing columns are taken from it.
6155
@param rli Relay log info (needed for row unpacking).
6157
Shall we overwrite if the row already exists or signal
6158
error (currently ignored).
6160
@returns Error code on failure, 0 on success.
6162
This method, if successful, sets @c m_curr_row_end pointer to point at the
6163
next row in the rows buffer. This is done when unpacking the row to be
6166
@note If a matching record is found, it is either updated using
6167
@c ha_update_row() or first deleted and then new record written.
6171
Rows_log_event::write_row(const Relay_log_info *const rli,
6172
const bool overwrite)
6174
assert(m_table != NULL && thd != NULL);
6176
Table *table= m_table; // pointer to event's table
6179
auto_afree_ptr<char> key(NULL);
6181
/* fill table->record[0] with default values */
6184
We only check if the columns have default values for non-NDB
6185
engines, for NDB we ignore the check since updates are sent as
6186
writes, causing errors when trying to prepare the record.
6188
TODO[ndb]: Elimiate this hard-coded dependency on NDB. Ideally,
6189
the engine should be able to set a flag that it want the default
6190
values filled in and one flag to handle the case that the default
6191
values should be checked. Maybe these two flags can be combined.
6193
if ((error= prepare_record(table, &m_cols, m_width, true)))
6196
/* unpack row into table->record[0] */
6197
error= unpack_current_row(rli, &m_cols);
6199
// Temporary fix to find out why it fails [/Matz]
6200
memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
6203
Try to write record. If a corresponding record already exists in the table,
6204
we try to change it using ha_update_row() if possible. Otherwise we delete
6205
it and repeat the whole process again.
6207
TODO: Add safety measures against infinite looping.
6210
while ((error= table->file->ha_write_row(table->record[0])))
6212
if (error == HA_ERR_LOCK_DEADLOCK ||
6213
error == HA_ERR_LOCK_WAIT_TIMEOUT ||
6214
(keynum= table->file->get_dup_key(error)) < 0 ||
6218
Deadlock, waiting for lock or just an error from the handler
6219
such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
6220
Retrieval of the duplicate key number may fail
6221
- either because the error was not "duplicate key" error
6222
- or because the information which key is not available
6224
table->file->print_error(error, MYF(0));
6228
We need to retrieve the old row into record[1] to be able to
6229
either update or delete the offending record. We either:
6231
- use rnd_pos() with a row-id (available as dupp_row) to the
6232
offending row, if that is possible (MyISAM and Blackhole), or else
6234
- use index_read_idx() with the key that is duplicated, to
6235
retrieve the offending row.
6237
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
6239
if (table->file->inited && (error= table->file->ha_index_end()))
6241
if ((error= table->file->ha_rnd_init(false)))
6244
error= table->file->rnd_pos(table->record[1], table->file->dup_ref);
6245
table->file->ha_rnd_end();
6248
table->file->print_error(error, MYF(0));
6254
if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
6259
if (key.get() == NULL)
6261
key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
6262
if (key.get() == NULL)
6268
key_copy((unsigned char*)key.get(), table->record[0], table->key_info + keynum,
6270
error= table->file->index_read_idx_map(table->record[1], keynum,
6271
(const unsigned char*)key.get(),
6276
table->file->print_error(error, MYF(0));
6282
Now, record[1] should contain the offending row. That
6283
will enable us to update it or, alternatively, delete it (so
6284
that we can insert the new row afterwards).
6288
If row is incomplete we will use the record found to fill
6291
if (!get_flags(COMPLETE_ROWS_F))
6293
restore_record(table,record[1]);
6294
error= unpack_current_row(rli, &m_cols);
6298
REPLACE is defined as either INSERT or DELETE + INSERT. If
6299
possible, we can replace it with an UPDATE, but that will not
6300
work on InnoDB if FOREIGN KEY checks are necessary.
6302
I (Matz) am not sure of the reason for the last_uniq_key()
6303
check as, but I'm guessing that it's something along the
6306
Suppose that we got the duplicate key to be a key that is not
6307
the last unique key for the table and we perform an update:
6308
then there might be another key for which the unique check will
6309
fail, so we're better off just deleting the row and inserting
6312
if (last_uniq_key(table, keynum) &&
6313
!table->file->referenced_by_foreign_key())
6315
error=table->file->ha_update_row(table->record[1],
6319
case HA_ERR_RECORD_IS_THE_SAME:
6326
table->file->print_error(error, MYF(0));
6333
if ((error= table->file->ha_delete_row(table->record[1])))
6335
table->file->print_error(error, MYF(0));
6338
/* Will retry ha_write_row() with the offending row removed. */
6347
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6349
assert(m_table != NULL);
6351
write_row(rli, /* if 1 then overwrite */
6352
bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
6354
if (error && !thd->is_error())
6357
my_error(ER_UNKNOWN_ERROR, MYF(0));
6364
/**************************************************************************
6365
Delete_rows_log_event member functions
6366
**************************************************************************/
6369
Compares table->record[0] and table->record[1]
6371
Returns TRUE if different.
6373
static bool record_compare(Table *table)
6376
Need to set the X bit and the filler bits in both records since
6377
there are engines that do not set it correctly.
6379
In addition, since MyISAM checks that one hasn't tampered with the
6380
record, it is necessary to restore the old bytes into the record
6381
after doing the comparison.
6383
TODO[record format ndb]: Remove it once NDB returns correct
6384
records. Check that the other engines also return correct records.
6387
unsigned char saved_x[2], saved_filler[2];
6389
if (table->s->null_bytes > 0)
6391
for (int i = 0 ; i < 2 ; ++i)
6393
saved_x[i]= table->record[i][0];
6394
saved_filler[i]= table->record[i][table->s->null_bytes - 1];
6395
table->record[i][0]|= 1U;
6396
table->record[i][table->s->null_bytes - 1]|=
6397
256U - (1U << table->s->last_null_bit_pos);
6401
if (table->s->blob_fields + table->s->varchar_fields == 0)
6403
result= cmp_record(table,record[1]);
6404
goto record_compare_exit;
6407
/* Compare null bits */
6408
if (memcmp(table->null_flags,
6409
table->null_flags+table->s->rec_buff_length,
6410
table->s->null_bytes))
6412
result= true; // Diff in NULL value
6413
goto record_compare_exit;
6416
/* Compare updated fields */
6417
for (Field **ptr=table->field ; *ptr ; ptr++)
6419
if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
6422
goto record_compare_exit;
6426
record_compare_exit:
6428
Restore the saved bytes.
6430
TODO[record format ndb]: Remove this code once NDB returns the
6431
correct record format.
6433
if (table->s->null_bytes > 0)
6435
for (int i = 0 ; i < 2 ; ++i)
6437
table->record[i][0]= saved_x[i];
6438
table->record[i][table->s->null_bytes - 1]= saved_filler[i];
6446
Locate the current row in event's table.
6448
The current row is pointed by @c m_curr_row. Member @c m_width tells how many
6449
columns are there in the row (this can be differnet from the number of columns
6450
in the table). It is assumed that event's table is already open and pointed
6453
If a corresponding record is found in the table it is stored in
6454
@c m_table->record[0]. Note that when record is located based on a primary
6455
key, it is possible that the record found differs from the row being located.
6457
If no key is specified or table does not have keys, a table scan is used to
6458
find the row. In that case the row should be complete and contain values for
6459
all columns. However, it can still be shorter than the table, i.e. the table
6460
can contain extra columns not present in the row. It is also possible that
6461
the table has fewer columns than the row being located.
6463
@returns Error code on failure, 0 on success.
6465
@post In case of success @c m_table->record[0] contains the record found.
6466
Also, the internal "cursor" of the table is positioned at the record found.
6468
@note If the engine allows random access of the records, a combination of
6469
@c position() and @c rnd_pos() will be used.
6472
int Rows_log_event::find_row(const Relay_log_info *rli)
6474
assert(m_table && m_table->in_use != NULL);
6476
Table *table= m_table;
6479
/* unpack row - missing fields get default values */
6480
prepare_record(table, &m_cols, m_width, false/* don't check errors */);
6481
error= unpack_current_row(rli, &m_cols);
6483
// Temporary fix to find out why it fails [/Matz]
6484
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
6486
if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
6487
table->s->primary_key < MAX_KEY)
6490
Use a more efficient method to fetch the record given by
6491
table->record[0] if the engine allows it. We first compute a
6492
row reference using the position() member function (it will be
6493
stored in table->file->ref) and the use rnd_pos() to position
6494
the "cursor" (i.e., record[0] in this case) at the correct row.
6496
TODO: Add a check that the correct record has been fetched by
6497
comparing with the original record. Take into account that the
6498
record on the master and slave can be of different
6499
length. Something along these lines should work:
6501
ADD>>> store_record(table,record[1]);
6502
int error= table->file->rnd_pos(table->record[0], table->file->ref);
6503
ADD>>> assert(memcmp(table->record[1], table->record[0],
6504
table->s->reclength) == 0);
6507
int error= table->file->rnd_pos_by_record(table->record[0]);
6508
table->file->ha_rnd_end();
6511
table->file->print_error(error, MYF(0));
6516
// We can't use position() - try other methods.
6519
Save copy of the record in table->record[1]. It might be needed
6520
later if linear search is used to find exact match.
6522
store_record(table,record[1]);
6524
if (table->s->keys > 0)
6526
/* We have a key: search the table using the index */
6527
if (!table->file->inited && (error= table->file->ha_index_init(0, false)))
6529
table->file->print_error(error, MYF(0));
6533
/* Fill key data for the row */
6536
key_copy(m_key, table->record[0], table->key_info, 0);
6539
We need to set the null bytes to ensure that the filler bit are
6540
all set when returning. There are storage engines that just set
6541
the necessary bits on the bytes and don't set the filler bits
6544
my_ptrdiff_t const pos=
6545
table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
6546
table->record[0][pos]= 0xFF;
6548
if ((error= table->file->index_read_map(table->record[0], m_key,
6550
HA_READ_KEY_EXACT)))
6552
table->file->print_error(error, MYF(0));
6553
table->file->ha_index_end();
6558
Below is a minor "optimization". If the key (i.e., key number
6559
0) has the HA_NOSAME flag set, we know that we have found the
6560
correct record (since there can be no duplicates); otherwise, we
6561
have to compare the record with the one found to see if it is
6564
CAVEAT! This behaviour is essential for the replication of,
6565
e.g., the mysql.proc table since the correct record *shall* be
6566
found using the primary key *only*. There shall be no
6567
comparison of non-PK columns to decide if the correct record is
6568
found. I can see no scenario where it would be incorrect to
6569
chose the row to change only using a PK or an UNNI.
6571
if (table->key_info->flags & HA_NOSAME)
6573
table->file->ha_index_end();
6578
In case key is not unique, we still have to iterate over records found
6579
and find the one which is identical to the row given. A copy of the
6580
record we are looking for is stored in record[1].
6582
while (record_compare(table))
6585
We need to set the null bytes to ensure that the filler bit
6586
are all set when returning. There are storage engines that
6587
just set the necessary bits on the bytes and don't set the
6588
filler bits correctly.
6590
TODO[record format ndb]: Remove this code once NDB returns the
6591
correct record format.
6593
if (table->s->null_bytes > 0)
6595
table->record[0][table->s->null_bytes - 1]|=
6596
256U - (1U << table->s->last_null_bit_pos);
6599
if ((error= table->file->index_next(table->record[0])))
6601
table->file->print_error(error, MYF(0));
6602
table->file->ha_index_end();
6608
Have to restart the scan to be able to fetch the next row.
6610
table->file->ha_index_end();
6614
int restart_count= 0; // Number of times scanning has restarted from top
6616
/* We don't have a key: search the table using rnd_next() */
6617
if ((error= table->file->ha_rnd_init(1)))
6619
table->file->print_error(error, MYF(0));
6623
/* Continue until we find the right record or have made a full loop */
6626
error= table->file->rnd_next(table->record[0]);
6631
case HA_ERR_RECORD_DELETED:
6634
case HA_ERR_END_OF_FILE:
6635
if (++restart_count < 2)
6636
table->file->ha_rnd_init(1);
6640
table->file->print_error(error, MYF(0));
6641
table->file->ha_rnd_end();
6645
while (restart_count < 2 && record_compare(table));
6648
Note: above record_compare will take into accout all record fields
6649
which might be incorrect in case a partial row was given in the event
6651
table->file->ha_rnd_end();
6653
assert(error == HA_ERR_END_OF_FILE || error == HA_ERR_RECORD_DELETED || error == 0);
6657
table->default_column_bitmaps();
6660
table->default_column_bitmaps();
6666
Constructor used to build an event for writing to the binary log.
6669
Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, Table *tbl_arg,
6671
bool is_transactional)
6672
: Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
6677
Constructor used by slave to read the event from the binary log.
6679
Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint32_t event_len,
6680
const Format_description_log_event
6682
: Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
6688
Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6690
if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
6691
m_table->s->primary_key < MAX_KEY)
6694
We don't need to allocate any memory for m_key since it is not used.
6699
if (m_table->s->keys > 0)
6701
// Allocate buffer for key searches
6702
m_key= (unsigned char*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
6704
return HA_ERR_OUT_OF_MEM;
6711
Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6714
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6715
m_table->file->ha_index_or_rnd_end();
6722
int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6725
assert(m_table != NULL);
6727
if (!(error= find_row(rli)))
6730
Delete the record found, located in record[0]
6732
error= m_table->file->ha_delete_row(m_table->record[0]);
6738
/**************************************************************************
6739
Update_rows_log_event member functions
6740
**************************************************************************/
6743
Constructor used to build an event for writing to the binary log.
6745
Update_rows_log_event::Update_rows_log_event(THD *thd_arg, Table *tbl_arg,
6747
bool is_transactional)
6748
: Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
6750
init(tbl_arg->write_set);
6753
void Update_rows_log_event::init(MY_BITMAP const *cols)
6755
/* if bitmap_init fails, caught in is_valid() */
6756
if (likely(!bitmap_init(&m_cols_ai,
6757
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
6761
/* Cols can be zero if this is a dummy binrows event */
6762
if (likely(cols != NULL))
6764
memcpy(m_cols_ai.bitmap, cols->bitmap, no_bytes_in_map(cols));
6765
create_last_word_mask(&m_cols_ai);
6771
Update_rows_log_event::~Update_rows_log_event()
6773
if (m_cols_ai.bitmap == m_bitbuf_ai) // no my_malloc happened
6774
m_cols_ai.bitmap= 0; // so no free in bitmap_free
6775
bitmap_free(&m_cols_ai); // To pair with bitmap_init().
6780
Constructor used by slave to read the event from the binary log.
6782
Update_rows_log_event::Update_rows_log_event(const char *buf, uint32_t event_len,
6784
Format_description_log_event
6786
: Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
6792
Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6794
if (m_table->s->keys > 0)
6796
// Allocate buffer for key searches
6797
m_key= (unsigned char*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
6799
return HA_ERR_OUT_OF_MEM;
6802
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
6808
Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6811
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6812
m_table->file->ha_index_or_rnd_end();
6813
free(m_key); // Free for multi_malloc
6820
Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6822
assert(m_table != NULL);
6824
int error= find_row(rli);
6828
We need to read the second image in the event of error to be
6829
able to skip to the next pair of updates
6831
m_curr_row= m_curr_row_end;
6832
unpack_current_row(rli, &m_cols_ai);
6837
This is the situation after locating BI:
6839
===|=== before image ====|=== after image ===|===
6841
m_curr_row m_curr_row_end
6843
BI found in the table is stored in record[0]. We copy it to record[1]
6844
and unpack AI to record[0].
6847
store_record(m_table,record[1]);
6849
m_curr_row= m_curr_row_end;
6850
error= unpack_current_row(rli, &m_cols_ai); // this also updates m_curr_row_end
6853
Now we have the right row to update. The old row (the one we're
6854
looking for) is in record[1] and the new row is in record[0].
6857
// Temporary fix to find out why it fails [/Matz]
6858
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
6859
memcpy(m_table->write_set->bitmap, m_cols_ai.bitmap, (m_table->write_set->n_bits + 7) / 8);
6861
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
6862
if (error == HA_ERR_RECORD_IS_THE_SAME)
6869
Incident_log_event::Incident_log_event(const char *buf, uint32_t event_len,
6870
const Format_description_log_event *descr_event)
6871
: Log_event(buf, descr_event)
6873
uint8_t const common_header_len=
6874
descr_event->common_header_len;
6875
uint8_t const post_header_len=
6876
descr_event->post_header_len[INCIDENT_EVENT-1];
6878
m_incident= static_cast<Incident>(uint2korr(buf + common_header_len));
6879
char const *ptr= buf + common_header_len + post_header_len;
6880
char const *const str_end= buf + event_len;
6881
uint8_t len= 0; // Assignment to keep compiler happy
6882
const char *str= NULL; // Assignment to keep compiler happy
6883
read_str(&ptr, str_end, &str, &len);
6884
m_message.str= const_cast<char*>(str);
6885
m_message.length= len;
6890
Incident_log_event::~Incident_log_event()
6896
Incident_log_event::description() const
6898
static const char *const description[]= {
6899
"NOTHING", // Not used
6903
assert(0 <= m_incident);
6904
assert((size_t) m_incident <= sizeof(description)/sizeof(*description));
6906
return description[m_incident];
6910
void Incident_log_event::pack_info(Protocol *protocol)
6914
if (m_message.length > 0)
6915
bytes= snprintf(buf, sizeof(buf), "#%d (%s)",
6916
m_incident, description());
6918
bytes= snprintf(buf, sizeof(buf), "#%d (%s): %s",
6919
m_incident, description(), m_message.str);
6920
protocol->store(buf, bytes, &my_charset_bin);
6925
Incident_log_event::do_apply_event(Relay_log_info const *rli)
6927
rli->report(ERROR_LEVEL, ER_SLAVE_INCIDENT,
6928
ER(ER_SLAVE_INCIDENT),
6930
m_message.length > 0 ? m_message.str : "<none>");
6936
Incident_log_event::write_data_header(IO_CACHE *file)
6938
unsigned char buf[sizeof(int16_t)];
6939
int2store(buf, (int16_t) m_incident);
6940
return(my_b_safe_write(file, buf, sizeof(buf)));
6944
Incident_log_event::write_data_body(IO_CACHE *file)
6946
return(write_str(file, m_message.str, m_message.length));
6949
Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint32_t event_len,
6950
const Format_description_log_event* description_event)
6951
:Log_event(buf, description_event)
6953
uint8_t header_size= description_event->common_header_len;
6954
ident_len = event_len - header_size;
6955
set_if_smaller(ident_len,FN_REFLEN-1);
6956
log_ident= buf + header_size;