1
/* Copyright (C) 2000-2004 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
#include <drizzled/server_includes.h>
20
#include "rpl_filter.h"
21
#include "rpl_utility.h"
22
#include "rpl_record.h"
23
#include <mysys/my_dir.h>
24
#include <drizzled/drizzled_error_messages.h>
28
#include <mysys/base64.h>
29
#include <mysys/my_bitmap.h>
31
#include <libdrizzle/gettext.h>
32
#include <libdrizzle/libdrizzle.h>
34
#define log_cs &my_charset_utf8_general_ci
36
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
40
Size of buffer for printing a double in format %.<PREC>g
42
optional '-' + optional zero + '.' + PREC digits + 'e' + sign +
43
exponent digits + '\0'
45
#define FMT_G_BUFSIZE(PREC) (3 + (PREC) + 5 + 1)
48
static const char *HA_ERR(int i)
51
case HA_ERR_KEY_NOT_FOUND: return "HA_ERR_KEY_NOT_FOUND";
52
case HA_ERR_FOUND_DUPP_KEY: return "HA_ERR_FOUND_DUPP_KEY";
53
case HA_ERR_RECORD_CHANGED: return "HA_ERR_RECORD_CHANGED";
54
case HA_ERR_WRONG_INDEX: return "HA_ERR_WRONG_INDEX";
55
case HA_ERR_CRASHED: return "HA_ERR_CRASHED";
56
case HA_ERR_WRONG_IN_RECORD: return "HA_ERR_WRONG_IN_RECORD";
57
case HA_ERR_OUT_OF_MEM: return "HA_ERR_OUT_OF_MEM";
58
case HA_ERR_NOT_A_TABLE: return "HA_ERR_NOT_A_TABLE";
59
case HA_ERR_WRONG_COMMAND: return "HA_ERR_WRONG_COMMAND";
60
case HA_ERR_OLD_FILE: return "HA_ERR_OLD_FILE";
61
case HA_ERR_NO_ACTIVE_RECORD: return "HA_ERR_NO_ACTIVE_RECORD";
62
case HA_ERR_RECORD_DELETED: return "HA_ERR_RECORD_DELETED";
63
case HA_ERR_RECORD_FILE_FULL: return "HA_ERR_RECORD_FILE_FULL";
64
case HA_ERR_INDEX_FILE_FULL: return "HA_ERR_INDEX_FILE_FULL";
65
case HA_ERR_END_OF_FILE: return "HA_ERR_END_OF_FILE";
66
case HA_ERR_UNSUPPORTED: return "HA_ERR_UNSUPPORTED";
67
case HA_ERR_TO_BIG_ROW: return "HA_ERR_TO_BIG_ROW";
68
case HA_WRONG_CREATE_OPTION: return "HA_WRONG_CREATE_OPTION";
69
case HA_ERR_FOUND_DUPP_UNIQUE: return "HA_ERR_FOUND_DUPP_UNIQUE";
70
case HA_ERR_UNKNOWN_CHARSET: return "HA_ERR_UNKNOWN_CHARSET";
71
case HA_ERR_WRONG_MRG_TABLE_DEF: return "HA_ERR_WRONG_MRG_TABLE_DEF";
72
case HA_ERR_CRASHED_ON_REPAIR: return "HA_ERR_CRASHED_ON_REPAIR";
73
case HA_ERR_CRASHED_ON_USAGE: return "HA_ERR_CRASHED_ON_USAGE";
74
case HA_ERR_LOCK_WAIT_TIMEOUT: return "HA_ERR_LOCK_WAIT_TIMEOUT";
75
case HA_ERR_LOCK_TABLE_FULL: return "HA_ERR_LOCK_TABLE_FULL";
76
case HA_ERR_READ_ONLY_TRANSACTION: return "HA_ERR_READ_ONLY_TRANSACTION";
77
case HA_ERR_LOCK_DEADLOCK: return "HA_ERR_LOCK_DEADLOCK";
78
case HA_ERR_CANNOT_ADD_FOREIGN: return "HA_ERR_CANNOT_ADD_FOREIGN";
79
case HA_ERR_NO_REFERENCED_ROW: return "HA_ERR_NO_REFERENCED_ROW";
80
case HA_ERR_ROW_IS_REFERENCED: return "HA_ERR_ROW_IS_REFERENCED";
81
case HA_ERR_NO_SAVEPOINT: return "HA_ERR_NO_SAVEPOINT";
82
case HA_ERR_NON_UNIQUE_BLOCK_SIZE: return "HA_ERR_NON_UNIQUE_BLOCK_SIZE";
83
case HA_ERR_NO_SUCH_TABLE: return "HA_ERR_NO_SUCH_TABLE";
84
case HA_ERR_TABLE_EXIST: return "HA_ERR_TABLE_EXIST";
85
case HA_ERR_NO_CONNECTION: return "HA_ERR_NO_CONNECTION";
86
case HA_ERR_NULL_IN_SPATIAL: return "HA_ERR_NULL_IN_SPATIAL";
87
case HA_ERR_TABLE_DEF_CHANGED: return "HA_ERR_TABLE_DEF_CHANGED";
88
case HA_ERR_NO_PARTITION_FOUND: return "HA_ERR_NO_PARTITION_FOUND";
89
case HA_ERR_RBR_LOGGING_FAILED: return "HA_ERR_RBR_LOGGING_FAILED";
90
case HA_ERR_DROP_INDEX_FK: return "HA_ERR_DROP_INDEX_FK";
91
case HA_ERR_FOREIGN_DUPLICATE_KEY: return "HA_ERR_FOREIGN_DUPLICATE_KEY";
92
case HA_ERR_TABLE_NEEDS_UPGRADE: return "HA_ERR_TABLE_NEEDS_UPGRADE";
93
case HA_ERR_TABLE_READONLY: return "HA_ERR_TABLE_READONLY";
94
case HA_ERR_AUTOINC_READ_FAILED: return "HA_ERR_AUTOINC_READ_FAILED";
95
case HA_ERR_AUTOINC_ERANGE: return "HA_ERR_AUTOINC_ERANGE";
96
case HA_ERR_GENERIC: return "HA_ERR_GENERIC";
97
case HA_ERR_RECORD_IS_THE_SAME: return "HA_ERR_RECORD_IS_THE_SAME";
98
case HA_ERR_LOGGING_IMPOSSIBLE: return "HA_ERR_LOGGING_IMPOSSIBLE";
99
case HA_ERR_CORRUPT_EVENT: return "HA_ERR_CORRUPT_EVENT";
100
case HA_ERR_ROWS_EVENT_APPLY : return "HA_ERR_ROWS_EVENT_APPLY";
106
Error reporting facility for Rows_log_event::do_apply_event
108
@param level error, warning or info
109
@param ha_error HA_ERR_ code
110
@param rli pointer to the active Relay_log_info instance
111
@param thd pointer to the slave thread's thd
112
@param table pointer to the event's table object
113
@param type the type of the event
114
@param log_name the master binlog file name
115
@param pos the master binlog file pos (the next after the event)
118
static void inline slave_rows_error_report(enum loglevel level, int ha_error,
119
Relay_log_info const *rli, THD *thd,
120
Table *table, const char * type,
121
const char *log_name, ulong pos)
123
const char *handler_error= HA_ERR(ha_error);
124
char buff[MAX_SLAVE_ERRMSG], *slider;
125
const char *buff_end= buff + sizeof(buff);
127
List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
131
for (err= it++, slider= buff; err && slider < buff_end - 1;
132
slider += len, err= it++)
134
len= snprintf(slider, buff_end - slider,
135
_(" %s, Error_code: %d;"), err->msg, err->code);
138
rli->report(level, thd->is_error()? thd->main_da.sql_errno() : 0,
139
_("Could not execute %s event on table %s.%s;"
140
"%s handler error %s; "
141
"the event's master log %s, end_log_pos %lu"),
142
type, table->s->db.str,
143
table->s->table_name.str,
145
handler_error == NULL? _("<unknown>") : handler_error,
151
Cache that will automatically be written to a dedicated file on
157
class Write_on_release_cache
165
typedef unsigned short flag_set;
171
Write_on_release_cache
172
cache Pointer to cache to use
173
file File to write cache to upon destruction
174
flags Flags for the cache
178
Class used to guarantee copy of cache to file before exiting the
179
current block. On successful copy of the cache, the cache will
180
be reinited as a WRITE_CACHE.
182
Currently, a pointer to the cache is provided in the
183
constructor, but it would be possible to create a subclass
184
holding the IO_CACHE itself.
186
Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0)
187
: m_cache(cache), m_file(file), m_flags(flags)
189
reinit_io_cache(m_cache, WRITE_CACHE, 0L, false, true);
192
~Write_on_release_cache()
194
copy_event_cache_to_file_and_reinit(m_cache, m_file);
195
if (m_flags | FLUSH_F)
200
Return a pointer to the internal IO_CACHE.
207
Function to return a pointer to the internal cache, so that the
208
object can be treated as a IO_CACHE and used with the my_b_*
212
A pointer to the internal IO_CACHE.
214
IO_CACHE *operator&()
220
// Hidden, to prevent usage.
221
Write_on_release_cache(Write_on_release_cache const&);
228
uint32_t debug_not_change_ts_if_art_event= 1; // bug#29309 simulation
234
static void clear_all_errors(THD *thd, Relay_log_info *rli)
236
thd->is_slave_error = 0;
243
Ignore error code specified on command line.
246
inline int ignored_error_code(int err_code)
248
return ((err_code == ER_SLAVE_IGNORED_TABLE) ||
249
(use_slave_mask && bitmap_is_set(&slave_error_mask, err_code)));
257
static char *pretty_print_str(char *packet, const char *str, int len)
259
const char *end= str + len;
265
switch ((c=*str++)) {
266
case '\n': *pos++= '\\'; *pos++= 'n'; break;
267
case '\r': *pos++= '\\'; *pos++= 'r'; break;
268
case '\\': *pos++= '\\'; *pos++= '\\'; break;
269
case '\b': *pos++= '\\'; *pos++= 'b'; break;
270
case '\t': *pos++= '\\'; *pos++= 't'; break;
271
case '\'': *pos++= '\\'; *pos++= '\''; break;
272
case 0 : *pos++= '\\'; *pos++= '0'; break;
284
Creates a temporary name for load data infile:.
286
@param buf Store new filename here
287
@param file_id File_id (part of file name)
288
@param event_server_id Event_id (part of file name)
289
@param ext Extension for file name
292
Pointer to start of extension
295
static char *slave_load_file_stem(char *buf, uint32_t file_id,
296
int event_server_id, const char *ext)
299
fn_format(buf,"SQL_LOAD-",slave_load_tmpdir, "", MY_UNPACK_FILENAME);
302
buf= strchr(buf, '\0');
303
buf= int10_to_str(::server_id, buf, 10);
305
buf= int10_to_str(event_server_id, buf, 10);
307
res= int10_to_str(file_id, buf, 10);
308
my_stpcpy(res, ext); // Add extension last
309
return res; // Pointer to extension
314
Delete all temporary files used for SQL_LOAD.
317
static void cleanup_load_tmpdir()
322
char fname[FN_REFLEN], prefbuf[31], *p;
324
if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
328
When we are deleting temporary files, we should only remove
329
the files associated with the server id of our server.
330
We don't use event_server_id here because since we've disabled
331
direct binlogging of Create_file/Append_file/Exec_load events
332
we cannot meet Start_log event in the middle of events from one
335
p= strmake(prefbuf, STRING_WITH_LEN("SQL_LOAD-"));
336
p= int10_to_str(::server_id, p, 10);
340
for (i=0 ; i < (uint)dirp->number_off_files; i++)
342
file=dirp->dir_entry+i;
343
if (is_prefix(file->name, prefbuf))
345
fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME);
346
my_delete(fname, MYF(0));
358
static bool write_str(IO_CACHE *file, const char *str, uint32_t length)
360
unsigned char tmp[1];
361
tmp[0]= (unsigned char) length;
362
return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
363
my_b_safe_write(file, (unsigned char*) str, length));
371
static inline int read_str(const char **buf, const char *buf_end,
372
const char **str, uint8_t *len)
374
if (*buf + ((uint) (unsigned char) **buf) >= buf_end)
376
*len= (uint8_t) **buf;
378
(*buf)+= (uint) *len+1;
384
Transforms a string into "" or its expression in 0x... form.
387
char *str_to_hex(char *to, const char *from, uint32_t len)
393
to= octet2hex(to, from, len);
396
to= my_stpcpy(to, "\"\"");
397
return to; // pointer to end 0 of 'to'
402
Append a version of the 'from' string suitable for use in a query to
403
the 'to' string. To generate a correct escaping, the character set
404
information in 'csinfo' is used.
408
append_query_string(const CHARSET_INFO * const csinfo,
409
String const *from, String *to)
412
uint32_t const orig_len= to->length();
413
if (to->reserve(orig_len + from->length()*2+3))
416
beg= to->c_ptr_quick() + to->length();
418
if (csinfo->escape_with_backslash_is_dangerous)
419
ptr= str_to_hex(ptr, from->ptr(), from->length());
423
ptr+= drizzle_escape_string(ptr, from->ptr(), from->length());
426
to->length(orig_len + ptr - beg);
431
/**************************************************************************
432
Log_event methods (= the parent class of all events)
433
**************************************************************************/
437
returns the human readable name of the event's type
440
const char* Log_event::get_type_str(Log_event_type type)
443
case START_EVENT_V3: return "Start_v3";
444
case STOP_EVENT: return "Stop";
445
case QUERY_EVENT: return "Query";
446
case ROTATE_EVENT: return "Rotate";
447
case INTVAR_EVENT: return "Intvar";
448
case LOAD_EVENT: return "Load";
449
case NEW_LOAD_EVENT: return "New_load";
450
case SLAVE_EVENT: return "Slave";
451
case CREATE_FILE_EVENT: return "Create_file";
452
case APPEND_BLOCK_EVENT: return "Append_block";
453
case DELETE_FILE_EVENT: return "Delete_file";
454
case EXEC_LOAD_EVENT: return "Exec_load";
455
case RAND_EVENT: return "RAND";
456
case XID_EVENT: return "Xid";
457
case USER_VAR_EVENT: return "User var";
458
case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
459
case TABLE_MAP_EVENT: return "Table_map";
460
case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old";
461
case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old";
462
case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old";
463
case WRITE_ROWS_EVENT: return "Write_rows";
464
case UPDATE_ROWS_EVENT: return "Update_rows";
465
case DELETE_ROWS_EVENT: return "Delete_rows";
466
case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
467
case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
468
case INCIDENT_EVENT: return "Incident";
469
default: return "Unknown"; /* impossible */
473
const char* Log_event::get_type_str()
475
return get_type_str(get_type_code());
480
Log_event::Log_event()
483
Log_event::Log_event(THD* thd_arg, uint16_t flags_arg, bool using_trans)
484
:log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), thd(thd_arg)
486
server_id= thd->server_id;
487
when= thd->start_time;
488
cache_stmt= using_trans;
493
This minimal constructor is for when you are not even sure that there
494
is a valid THD. For example in the server when we are shutting down or
495
flushing logs after receiving a SIGHUP (then we must write a Rotate to
496
the binlog but we have no THD, so we need this minimal constructor).
499
Log_event::Log_event()
500
:temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
503
server_id= ::server_id;
505
We can't call my_time() here as this would cause a call before
514
Log_event::Log_event()
517
Log_event::Log_event(const char* buf,
518
const Format_description_log_event* description_event)
519
:temp_buf(0), cache_stmt(0)
522
when= uint4korr(buf);
523
server_id= uint4korr(buf + SERVER_ID_OFFSET);
524
data_written= uint4korr(buf + EVENT_LEN_OFFSET);
525
if (description_event->binlog_version==1)
532
log_pos= uint4korr(buf + LOG_POS_OFFSET);
534
If the log is 4.0 (so here it can only be a 4.0 relay log read by
535
the SQL thread or a 4.0 master binlog read by the I/O thread),
536
log_pos is the beginning of the event: we transform it into the end
537
of the event, which is more useful.
538
But how do you know that the log is 4.0: you know it if
539
description_event is version 3 *and* you are not reading a
540
Format_desc (remember that mysqlbinlog starts by assuming that 5.0
541
logs are in 4.0 format, until it finds a Format_desc).
543
if (description_event->binlog_version==3 &&
544
buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
547
If log_pos=0, don't change it. log_pos==0 is a marker to mean
548
"don't change rli->group_master_log_pos" (see
549
inc_group_relay_log_pos()). As it is unreal log_pos, adding the
550
event len's is nonsense. For example, a fake Rotate event should
551
not have its log_pos (which is 0) changed or it will modify
552
Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense
553
value of (a non-zero offset which does not exist in the master's
554
binlog, so which will cause problems if the user uses this value
557
log_pos+= data_written; /* purecov: inspected */
560
flags= uint2korr(buf + FLAGS_OFFSET);
561
if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
562
(buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
565
These events always have a header which stops here (i.e. their
569
Initialization to zero of all other Log_event members as they're
570
not specified. Currently there are no such members; in the future
571
there will be an event UID (but Format_description and Rotate
572
don't need this UID, as they are not propagated through
573
--log-slave-updates (remember the UID is used to not play a query
574
twice when you have two masters which are slaves of a 3rd master).
579
/* otherwise, go on with reading the header from buf (nothing now) */
583
int Log_event::do_update_pos(Relay_log_info *rli)
586
rli is null when (as far as I (Guilhem) know) the caller is
587
Load_log_event::do_apply_event *and* that one is called from
588
Execute_load_log_event::do_apply_event. In this case, we don't
589
do anything here ; Execute_load_log_event::do_apply_event will
590
call Log_event::do_apply_event again later with the proper rli.
591
Strictly speaking, if we were sure that rli is null only in the
592
case discussed above, 'if (rli)' is useless here. But as we are
593
not 100% sure, keep it for now.
595
Matz: I don't think we will need this check with this refactoring.
600
bug#29309 simulation: resetting the flag to force
601
wrong behaviour of artificial event to update
602
rli->last_master_timestamp for only one time -
603
the first FLUSH LOGS in the test.
605
if (debug_not_change_ts_if_art_event == 1
606
&& is_artificial_event())
607
debug_not_change_ts_if_art_event= 0;
608
rli->stmt_done(log_pos,
609
is_artificial_event() &&
610
debug_not_change_ts_if_art_event > 0 ? 0 : when);
611
if (debug_not_change_ts_if_art_event == 0)
612
debug_not_change_ts_if_art_event= 2;
614
return 0; // Cannot fail currently
618
Log_event::enum_skip_reason
619
Log_event::do_shall_skip(Relay_log_info *rli)
621
if ((server_id == ::server_id && !rli->replicate_same_server_id) || (rli->slave_skip_counter == 1 && rli->is_in_group()))
622
return EVENT_SKIP_IGNORE;
623
else if (rli->slave_skip_counter > 0)
624
return EVENT_SKIP_COUNT;
626
return EVENT_SKIP_NOT;
631
Log_event::pack_info()
634
void Log_event::pack_info(Protocol *protocol)
636
protocol->store("", &my_charset_bin);
641
Only called by SHOW BINLOG EVENTS
643
int Log_event::net_send(Protocol *protocol, const char* log_name, my_off_t pos)
645
const char *p= strrchr(log_name, FN_LIBCHAR);
646
const char *event_type;
650
protocol->prepare_for_resend();
651
protocol->store(log_name, &my_charset_bin);
652
protocol->store((uint64_t) pos);
653
event_type = get_type_str();
654
protocol->store(event_type, strlen(event_type), &my_charset_bin);
655
protocol->store((uint32_t) server_id);
656
protocol->store((uint64_t) log_pos);
658
return protocol->write();
663
init_show_field_list() prepares the column names and types for the
664
output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
668
void Log_event::init_show_field_list(List<Item>* field_list)
670
field_list->push_back(new Item_empty_string("Log_name", 20));
671
field_list->push_back(new Item_return_int("Pos", MY_INT32_NUM_DECIMAL_DIGITS,
672
DRIZZLE_TYPE_LONGLONG));
673
field_list->push_back(new Item_empty_string("Event_type", 20));
674
field_list->push_back(new Item_return_int("Server_id", 10,
676
field_list->push_back(new Item_return_int("End_log_pos",
677
MY_INT32_NUM_DECIMAL_DIGITS,
678
DRIZZLE_TYPE_LONGLONG));
679
field_list->push_back(new Item_empty_string("Info", 20));
686
bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
688
unsigned char header[LOG_EVENT_HEADER_LEN];
691
/* Store number of bytes that will be written by this event */
692
data_written= event_data_length + sizeof(header);
695
log_pos != 0 if this is relay-log event. In this case we should not
699
if (is_artificial_event())
702
We should not do any cleanup on slave when reading this. We
703
mark this by setting log_pos to 0. Start_log_event_v3() will
704
detect this on reading and set artificial_event=1 for the event.
711
Calculate position of end of event
713
Note that with a SEQ_READ_APPEND cache, my_b_tell() does not
714
work well. So this will give slightly wrong positions for the
715
Format_desc/Rotate/Stop events which the slave writes to its
716
relay log. For example, the initial Format_desc will have
717
end_log_pos=91 instead of 95. Because after writing the first 4
718
bytes of the relay log, my_b_tell() still reports 0. Because
719
my_b_append() does not update the counter which my_b_tell()
720
later uses (one should probably use my_b_append_tell() to work
721
around this). To get right positions even when writing to the
722
relay log, we use the (new) my_b_safe_tell().
724
Note that this raises a question on the correctness of all these
725
assert(my_b_tell()=rli->event_relay_log_pos).
727
If in a transaction, the log_pos which we calculate below is not
728
very good (because then my_b_safe_tell() returns start position
729
of the BEGIN, so it's like the statement was at the BEGIN's
730
place), but it's not a very serious problem (as the slave, when
731
it is in a transaction, does not take those end_log_pos into
732
account (as it calls inc_event_relay_log_pos()). To be fixed
733
later, so that it looks less strange. But not bug.
736
log_pos= my_b_safe_tell(file)+data_written;
739
now= (ulong) get_time(); // Query start time
742
Header will be of size LOG_EVENT_HEADER_LEN for all events, except for
743
FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT, where it will be
744
LOG_EVENT_MINIMAL_HEADER_LEN (remember these 2 have a frozen header,
745
because we read them before knowing the format).
748
int4store(header, now); // timestamp
749
header[EVENT_TYPE_OFFSET]= get_type_code();
750
int4store(header+ SERVER_ID_OFFSET, server_id);
751
int4store(header+ EVENT_LEN_OFFSET, data_written);
752
int4store(header+ LOG_POS_OFFSET, log_pos);
753
int2store(header+ FLAGS_OFFSET, flags);
755
return(my_b_safe_write(file, header, sizeof(header)) != 0);
760
This needn't be format-tolerant, because we only read
761
LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length).
764
int Log_event::read_log_event(IO_CACHE* file, String* packet,
765
pthread_mutex_t* log_lock)
769
char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
772
pthread_mutex_lock(log_lock);
773
if (my_b_read(file, (unsigned char*) buf, sizeof(buf)))
776
If the read hits eof, we must report it as eof so the caller
777
will know it can go into cond_wait to be woken up on the next
781
result= LOG_READ_EOF;
783
result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
786
data_len= uint4korr(buf + EVENT_LEN_OFFSET);
787
if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
788
data_len > current_thd->variables.max_allowed_packet)
790
result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
795
/* Append the log event header to packet */
796
if (packet->append(buf, sizeof(buf)))
798
/* Failed to allocate packet */
799
result= LOG_READ_MEM;
802
data_len-= LOG_EVENT_MINIMAL_HEADER_LEN;
805
/* Append rest of event, read directly from file into packet */
806
if (packet->append(file, data_len))
809
Fatal error occured when appending rest of the event
810
to packet, possible failures:
811
1. EOF occured when reading from file, it's really an error
812
as data_len is >=0 there's supposed to be more bytes available.
813
file->error will have been set to number of bytes left to read
814
2. Read was interrupted, file->error would normally be set to -1
815
3. Failed to allocate memory for packet, my_errno
816
will be ENOMEM(file->error shuold be 0, but since the
817
memory allocation occurs before the call to read it might
820
result= (my_errno == ENOMEM ? LOG_READ_MEM :
821
(file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
822
/* Implicit goto end; */
828
pthread_mutex_unlock(log_lock);
832
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
833
#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
837
Allocates memory; The caller is responsible for clean-up.
839
Log_event* Log_event::read_log_event(IO_CACHE* file,
840
pthread_mutex_t* log_lock,
841
const Format_description_log_event
844
assert(description_event != 0);
845
char head[LOG_EVENT_MINIMAL_HEADER_LEN];
847
First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
848
check the event for sanity and to know its length; no need to really parse
849
it. We say "at most" because this could be a 3.23 master, which has header
850
of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's
851
"minimal" over the set {MySQL >=4.0}).
853
uint32_t header_size= cmin(description_event->common_header_len,
854
LOG_EVENT_MINIMAL_HEADER_LEN);
857
if (my_b_read(file, (unsigned char *) head, header_size))
861
No error here; it could be that we are at the file's end. However
862
if the next my_b_read() fails (below), it will be an error as we
863
were able to read the first bytes.
867
uint32_t data_len = uint4korr(head + EVENT_LEN_OFFSET);
869
const char *error= 0;
871
#ifndef max_allowed_packet
872
THD *thd=current_thd;
873
uint32_t max_allowed_packet= thd ? thd->variables.max_allowed_packet : ~(ulong)0;
876
if (data_len > max_allowed_packet)
878
error = "Event too big";
882
if (data_len < header_size)
884
error = "Event too small";
888
// some events use the extra byte to null-terminate strings
889
if (!(buf = (char*) my_malloc(data_len+1, MYF(MY_WME))))
891
error = "Out of memory";
895
memcpy(buf, head, header_size);
896
if (my_b_read(file, (unsigned char*) buf + header_size, data_len - header_size))
898
error = "read error";
901
if ((res= read_log_event(buf, data_len, &error, description_event)))
902
res->register_temp_buf(buf);
909
sql_print_error(_("Error in Log_event::read_log_event(): "
910
"'%s', data_len: %d, event_type: %d"),
911
error,data_len,head[EVENT_TYPE_OFFSET]);
914
The SQL slave thread will check if file->error<0 to know
915
if there was an I/O error. Even if there is no "low-level" I/O errors
916
with 'file', any of the high-level above errors is worrying
917
enough to stop the SQL thread now ; as we are skipping the current event,
918
going on with reading and successfully executing other events can
919
only corrupt the slave's databases. So stop.
928
Binlog format tolerance is in (buf, event_len, description_event)
932
Log_event* Log_event::read_log_event(const char* buf, uint32_t event_len,
934
const Format_description_log_event *description_event)
937
assert(description_event != 0);
939
/* Check the integrity */
940
if (event_len < EVENT_LEN_OFFSET ||
941
buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
942
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
944
*error="Sanity check failed"; // Needed to free buffer
945
return(NULL); // general sanity check - will fail on a partial read
948
uint32_t event_type= buf[EVENT_TYPE_OFFSET];
949
if (event_type > description_event->number_of_event_types &&
950
event_type != FORMAT_DESCRIPTION_EVENT)
953
It is unsafe to use the description_event if its post_header_len
954
array does not include the event type.
961
In some previuos versions (see comment in
962
Format_description_log_event::Format_description_log_event(char*,...)),
963
event types were assigned different id numbers than in the
964
present version. In order to replicate from such versions to the
965
present version, we must map those event type id's to our event
966
type id's. The mapping is done with the event_type_permutation
967
array, which was set up when the Format_description_log_event
970
if (description_event->event_type_permutation)
971
event_type= description_event->event_type_permutation[event_type];
975
ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
978
ev = new Load_log_event(buf, event_len, description_event);
981
ev = new Load_log_event(buf, event_len, description_event);
984
ev = new Rotate_log_event(buf, event_len, description_event);
986
case SLAVE_EVENT: /* can never happen (unused event) */
987
ev = new Slave_log_event(buf, event_len);
989
case CREATE_FILE_EVENT:
990
ev = new Create_file_log_event(buf, event_len, description_event);
992
case APPEND_BLOCK_EVENT:
993
ev = new Append_block_log_event(buf, event_len, description_event);
995
case DELETE_FILE_EVENT:
996
ev = new Delete_file_log_event(buf, event_len, description_event);
998
case EXEC_LOAD_EVENT:
999
ev = new Execute_load_log_event(buf, event_len, description_event);
1001
case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
1002
ev = new Start_log_event_v3(buf, description_event);
1005
ev = new Stop_log_event(buf, description_event);
1008
ev = new Intvar_log_event(buf, description_event);
1011
ev = new Xid_log_event(buf, description_event);
1014
ev = new Rand_log_event(buf, description_event);
1016
case USER_VAR_EVENT:
1017
ev = new User_var_log_event(buf, description_event);
1019
case FORMAT_DESCRIPTION_EVENT:
1020
ev = new Format_description_log_event(buf, event_len, description_event);
1022
case WRITE_ROWS_EVENT:
1023
ev = new Write_rows_log_event(buf, event_len, description_event);
1025
case UPDATE_ROWS_EVENT:
1026
ev = new Update_rows_log_event(buf, event_len, description_event);
1028
case DELETE_ROWS_EVENT:
1029
ev = new Delete_rows_log_event(buf, event_len, description_event);
1031
case TABLE_MAP_EVENT:
1032
ev = new Table_map_log_event(buf, event_len, description_event);
1034
case BEGIN_LOAD_QUERY_EVENT:
1035
ev = new Begin_load_query_log_event(buf, event_len, description_event);
1037
case EXECUTE_LOAD_QUERY_EVENT:
1038
ev= new Execute_load_query_log_event(buf, event_len, description_event);
1040
case INCIDENT_EVENT:
1041
ev = new Incident_log_event(buf, event_len, description_event);
1050
is_valid() are small event-specific sanity tests which are
1051
important; for example there are some my_malloc() in constructors
1052
(e.g. Query_log_event::Query_log_event(char*...)); when these
1053
my_malloc() fail we can't return an error out of the constructor
1054
(because constructor is "void") ; so instead we leave the pointer we
1055
wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
1056
Same for Format_description_log_event, member 'post_header_len'.
1058
if (!ev || !ev->is_valid())
1061
*error= "Found invalid event in binary log";
1067
inline Log_event::enum_skip_reason
1068
Log_event::continue_group(Relay_log_info *rli)
1070
if (rli->slave_skip_counter == 1)
1071
return Log_event::EVENT_SKIP_IGNORE;
1072
return Log_event::do_shall_skip(rli);
1075
/**************************************************************************
1076
Query_log_event methods
1077
**************************************************************************/
1080
This (which is used only for SHOW BINLOG EVENTS) could be updated to
1081
print SET @@session_var=. But this is not urgent, as SHOW BINLOG EVENTS is
1082
only an information, it does not produce suitable queries to replay (for
1083
example it does not print LOAD DATA INFILE).
1088
void Query_log_event::pack_info(Protocol *protocol)
1090
// TODO: show the catalog ??
1092
if (!(buf= (char*) my_malloc(9 + db_len + q_len, MYF(MY_WME))))
1095
if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
1098
pos= my_stpcpy(buf, "use `");
1099
memcpy(pos, db, db_len);
1100
pos= my_stpcpy(pos+db_len, "`; ");
1104
memcpy(pos, query, q_len);
1107
protocol->store(buf, pos-buf, &my_charset_bin);
1113
Utility function for the next method (Query_log_event::write()) .
1115
static void write_str_with_code_and_len(char **dst, const char *src,
1116
int len, uint32_t code)
1120
*((*dst)++)= (unsigned char) len;
1121
memcpy(*dst, src, len);
1127
Query_log_event::write().
1130
In this event we have to modify the header to have the correct
1131
EVENT_LEN_OFFSET as we don't yet know how many status variables we
1135
bool Query_log_event::write(IO_CACHE* file)
1138
@todo if catalog can be of length FN_REFLEN==512, then we are not
1139
replicating it correctly, since the length is stored in a byte
1142
unsigned char buf[QUERY_HEADER_LEN+
1143
1+4+ // code of flags2 and flags2
1144
1+8+ // code of sql_mode and sql_mode
1145
1+1+FN_REFLEN+ // code of catalog and catalog length and catalog
1146
1+4+ // code of autoinc and the 2 autoinc variables
1147
1+6+ // code of charset and charset
1148
1+1+MAX_TIME_ZONE_NAME_LENGTH+ // code of tz and tz length and tz name
1149
1+2+ // code of lc_time_names and lc_time_names_number
1150
1+2 // code of charset_database and charset_database_number
1151
], *start, *start_of_status;
1155
return 1; // Something wrong with event
1158
We want to store the thread id:
1159
(- as an information for the user when he reads the binlog)
1160
- if the query uses temporary table: for the slave SQL thread to know to
1161
which master connection the temp table belongs.
1162
Now imagine we (write()) are called by the slave SQL thread (we are
1163
logging a query executed by this thread; the slave runs with
1164
--log-slave-updates). Then this query will be logged with
1165
thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of
1166
the same name were created simultaneously on the master (in the master
1168
CREATE TEMPORARY TABLE t; (thread 1)
1169
CREATE TEMPORARY TABLE t; (thread 2)
1171
then in the slave's binlog there will be
1172
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1173
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1174
which is bad (same thread id!).
1176
To avoid this, we log the thread's thread id EXCEPT for the SQL
1177
slave thread for which we log the original (master's) thread id.
1178
Now this moves the bug: what happens if the thread id on the
1179
master was 10 and when the slave replicates the query, a
1180
connection number 10 is opened by a normal client on the slave,
1181
and updates a temp table of the same name? We get a problem
1182
again. To avoid this, in the handling of temp tables (sql_base.cc)
1183
we use thread_id AND server_id. TODO when this is merged into
1184
4.1: in 4.1, slave_proxy_id has been renamed to pseudo_thread_id
1185
and is a session variable: that's to make mysqlbinlog work with
1186
temp tables. We probably need to introduce
1188
SET PSEUDO_SERVER_ID
1189
for mysqlbinlog in 4.1. mysqlbinlog would print:
1190
SET PSEUDO_SERVER_ID=
1191
SET PSEUDO_THREAD_ID=
1192
for each query using temp tables.
1194
int4store(buf + Q_THREAD_ID_OFFSET, slave_proxy_id);
1195
int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
1196
buf[Q_DB_LEN_OFFSET] = (char) db_len;
1197
int2store(buf + Q_ERR_CODE_OFFSET, error_code);
1200
You MUST always write status vars in increasing order of code. This
1201
guarantees that a slightly older slave will be able to parse those he
1204
start_of_status= start= buf+QUERY_HEADER_LEN;
1207
*start++= Q_FLAGS2_CODE;
1208
int4store(start, flags2);
1211
if (sql_mode_inited)
1213
*start++= Q_SQL_MODE_CODE;
1214
int8store(start, (uint64_t)sql_mode);
1217
if (catalog_len) // i.e. this var is inited (false for 4.0 events)
1219
write_str_with_code_and_len((char **)(&start),
1220
catalog, catalog_len, Q_CATALOG_NZ_CODE);
1222
In 5.0.x where x<4 masters we used to store the end zero here. This was
1223
a waste of one byte so we don't do it in x>=4 masters. We change code to
1224
Q_CATALOG_NZ_CODE, because re-using the old code would make x<4 slaves
1225
of this x>=4 master segfault (expecting a zero when there is
1226
none). Remaining compatibility problems are: the older slave will not
1227
find the catalog; but it is will not crash, and it's not an issue
1228
that it does not find the catalog as catalogs were not used in these
1229
older MySQL versions (we store it in binlog and read it from relay log
1230
but do nothing useful with it). What is an issue is that the older slave
1231
will stop processing the Q_* blocks (and jumps to the db/query) as soon
1232
as it sees unknown Q_CATALOG_NZ_CODE; so it will not be able to read
1233
Q_AUTO_INCREMENT*, Q_CHARSET and so replication will fail silently in
1234
various ways. Documented that you should not mix alpha/beta versions if
1235
they are not exactly the same version, with example of 5.0.3->5.0.2 and
1236
5.0.4->5.0.3. If replication is from older to new, the new will
1237
recognize Q_CATALOG_CODE and have no problem.
1240
if (auto_increment_increment != 1 || auto_increment_offset != 1)
1242
*start++= Q_AUTO_INCREMENT;
1243
int2store(start, auto_increment_increment);
1244
int2store(start+2, auto_increment_offset);
1249
*start++= Q_CHARSET_CODE;
1250
memcpy(start, charset, 6);
1255
/* In the TZ sys table, column Name is of length 64 so this should be ok */
1256
assert(time_zone_len <= MAX_TIME_ZONE_NAME_LENGTH);
1257
*start++= Q_TIME_ZONE_CODE;
1258
*start++= time_zone_len;
1259
memcpy(start, time_zone_str, time_zone_len);
1260
start+= time_zone_len;
1262
if (lc_time_names_number)
1264
assert(lc_time_names_number <= 0xFFFF);
1265
*start++= Q_LC_TIME_NAMES_CODE;
1266
int2store(start, lc_time_names_number);
1269
if (charset_database_number)
1271
assert(charset_database_number <= 0xFFFF);
1272
*start++= Q_CHARSET_DATABASE_CODE;
1273
int2store(start, charset_database_number);
1277
Here there could be code like
1278
if (command-line-option-which-says-"log_this_variable" && inited)
1280
*start++= Q_THIS_VARIABLE_CODE;
1281
int4store(start, this_variable);
1286
/* Store length of status variables */
1287
status_vars_len= (uint) (start-start_of_status);
1288
assert(status_vars_len <= MAX_SIZE_LOG_EVENT_STATUS);
1289
int2store(buf + Q_STATUS_VARS_LEN_OFFSET, status_vars_len);
1292
Calculate length of whole event
1293
The "1" below is the \0 in the db's length
1295
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
1297
return (write_header(file, event_length) ||
1298
my_b_safe_write(file, (unsigned char*) buf, QUERY_HEADER_LEN) ||
1299
write_post_header_for_derived(file) ||
1300
my_b_safe_write(file, (unsigned char*) start_of_status,
1301
(uint) (start-start_of_status)) ||
1302
my_b_safe_write(file, (db) ? (unsigned char*) db : (unsigned char*)"", db_len + 1) ||
1303
my_b_safe_write(file, (unsigned char*) query, q_len)) ? 1 : 0;
1307
The simplest constructor that could possibly work. This is used for
1308
creating static objects that have a special meaning and are invisible
1311
Query_log_event::Query_log_event()
1312
:Log_event(), data_buf(0)
1319
Query_log_event::Query_log_event()
1320
thd_arg - thread handle
1321
query_arg - array of char representing the query
1322
query_length - size of the `query_arg' array
1323
using_trans - there is a modified transactional table
1324
suppress_use - suppress the generation of 'USE' statements
1325
killed_status_arg - an optional with default to THD::KILLED_NO_VALUE
1326
if the value is different from the default, the arg
1327
is set to the current thd->killed value.
1328
A caller might need to masquerade thd->killed with
1331
Creates an event for binlogging
1332
The value for local `killed_status' can be supplied by caller.
1334
Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
1335
ulong query_length, bool using_trans,
1337
THD::killed_state killed_status_arg)
1339
(thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
1341
(suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1343
data_buf(0), query(query_arg), catalog(thd_arg->catalog),
1344
db(thd_arg->db), q_len((uint32_t) query_length),
1345
thread_id(thd_arg->thread_id),
1346
/* save the original thread id; we already know the server id */
1347
slave_proxy_id(thd_arg->variables.pseudo_thread_id),
1348
flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1350
auto_increment_increment(thd_arg->variables.auto_increment_increment),
1351
auto_increment_offset(thd_arg->variables.auto_increment_offset),
1352
lc_time_names_number(thd_arg->variables.lc_time_names->number),
1353
charset_database_number(0)
1357
if (killed_status_arg == THD::KILLED_NO_VALUE)
1358
killed_status_arg= thd_arg->killed;
1361
(killed_status_arg == THD::NOT_KILLED) ?
1362
(thd_arg->is_error() ? thd_arg->main_da.sql_errno() : 0) :
1363
(thd_arg->killed_errno());
1366
exec_time = (ulong) (end_time - thd_arg->start_time);
1368
@todo this means that if we have no catalog, then it is replicated
1369
as an existing catalog of length zero. is that safe? /sven
1371
catalog_len = (catalog) ? (uint32_t) strlen(catalog) : 0;
1372
/* status_vars_len is set just before writing the event */
1373
db_len = (db) ? (uint32_t) strlen(db) : 0;
1374
if (thd_arg->variables.collation_database != thd_arg->db_charset)
1375
charset_database_number= thd_arg->variables.collation_database->number;
1378
If we don't use flags2 for anything else than options contained in
1379
thd_arg->options, it would be more efficient to flags2=thd_arg->options
1380
(OPTIONS_WRITTEN_TO_BIN_LOG would be used only at reading time).
1381
But it's likely that we don't want to use 32 bits for 3 bits; in the future
1382
we will probably want to reclaim the 29 bits. So we need the &.
1384
flags2= (uint32_t) (thd_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
1385
assert(thd_arg->variables.character_set_client->number < 256*256);
1386
assert(thd_arg->variables.collation_connection->number < 256*256);
1387
assert(thd_arg->variables.collation_server->number < 256*256);
1388
assert(thd_arg->variables.character_set_client->mbminlen == 1);
1389
int2store(charset, thd_arg->variables.character_set_client->number);
1390
int2store(charset+2, thd_arg->variables.collation_connection->number);
1391
int2store(charset+4, thd_arg->variables.collation_server->number);
1392
if (thd_arg->time_zone_used)
1395
Note that our event becomes dependent on the Time_zone object
1396
representing the time zone. Fortunately such objects are never deleted
1397
or changed during mysqld's lifetime.
1399
time_zone_len= thd_arg->variables.time_zone->get_name()->length();
1400
time_zone_str= thd_arg->variables.time_zone->get_name()->ptr();
1407
/* 2 utility functions for the next method */
1410
Read a string with length from memory.
1412
This function reads the string-with-length stored at
1413
<code>src</code> and extract the length into <code>*len</code> and
1414
a pointer to the start of the string into <code>*dst</code>. The
1415
string can then be copied using <code>memcpy()</code> with the
1416
number of bytes given in <code>*len</code>.
1418
@param src Pointer to variable holding a pointer to the memory to
1419
read the string from.
1420
@param dst Pointer to variable holding a pointer where the actual
1421
string starts. Starting from this position, the string
1422
can be copied using @c memcpy().
1423
@param len Pointer to variable where the length will be stored.
1424
@param end One-past-the-end of the memory where the string is
1427
@return Zero if the entire string can be copied successfully,
1428
@c UINT_MAX if the length could not be read from memory
1429
(that is, if <code>*src >= end</code>), otherwise the
1430
number of bytes that are missing to read the full
1431
string, which happends <code>*dst + *len >= end</code>.
1434
get_str_len_and_pointer(const Log_event::Byte **src,
1437
const Log_event::Byte *end)
1440
return -1; // Will be UINT_MAX in two-complement arithmetics
1441
uint32_t length= **src;
1444
if (*src + length >= end)
1445
return *src + length - end + 1; // Number of bytes missing
1446
*dst= (char *)*src + 1; // Will be copied later
1453
static void copy_str_and_move(const char **src,
1454
Log_event::Byte **dst,
1457
memcpy(*dst, *src, len);
1458
*src= (const char *)*dst;
1465
Macro to check that there is enough space to read from memory.
1467
@param PTR Pointer to memory
1468
@param END End of memory
1469
@param CNT Number of bytes that should be read.
1471
#define CHECK_SPACE(PTR,END,CNT) \
1473
assert((PTR) + (CNT) <= (END)); \
1474
if ((PTR) + (CNT) > (END)) { \
1482
This is used by the SQL slave thread to prepare the event before execution.
1484
Query_log_event::Query_log_event(const char* buf, uint32_t event_len,
1485
const Format_description_log_event
1487
Log_event_type event_type)
1488
:Log_event(buf, description_event), data_buf(0), query(NULL),
1489
db(NULL), catalog_len(0), status_vars_len(0),
1490
flags2_inited(0), sql_mode_inited(0), charset_inited(0),
1491
auto_increment_increment(1), auto_increment_offset(1),
1492
time_zone_len(0), lc_time_names_number(0), charset_database_number(0)
1496
uint8_t common_header_len, post_header_len;
1497
Log_event::Byte *start;
1498
const Log_event::Byte *end;
1501
common_header_len= description_event->common_header_len;
1502
post_header_len= description_event->post_header_len[event_type-1];
1505
We test if the event's length is sensible, and if so we compute data_len.
1506
We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
1507
We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
1509
if (event_len < (uint)(common_header_len + post_header_len))
1511
data_len = event_len - (common_header_len + post_header_len);
1512
buf+= common_header_len;
1514
slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
1515
exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
1516
db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
1517
error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
1520
5.0 format starts here.
1521
Depending on the format, we may or not have affected/warnings etc
1522
The remnent post-header to be parsed has length:
1524
tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
1527
status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
1529
Check if status variable length is corrupt and will lead to very
1530
wrong data. We could be even more strict and require data_len to
1531
be even bigger, but this will suffice to catch most corruption
1532
errors that can lead to a crash.
1534
if (status_vars_len > cmin(data_len, (uint32_t)MAX_SIZE_LOG_EVENT_STATUS))
1539
data_len-= status_vars_len;
1543
We have parsed everything we know in the post header for QUERY_EVENT,
1544
the rest of post header is either comes from older version MySQL or
1545
dedicated to derived events (e.g. Execute_load_query...)
1548
/* variable-part: the status vars; only in MySQL 5.0 */
1550
start= (Log_event::Byte*) (buf+post_header_len);
1551
end= (const Log_event::Byte*) (start+status_vars_len);
1552
for (const Log_event::Byte* pos= start; pos < end;)
1556
CHECK_SPACE(pos, end, 4);
1558
flags2= uint4korr(pos);
1561
case Q_SQL_MODE_CODE:
1563
CHECK_SPACE(pos, end, 8);
1565
sql_mode= (ulong) uint8korr(pos); // QQ: Fix when sql_mode is uint64_t
1569
case Q_CATALOG_NZ_CODE:
1570
if (get_str_len_and_pointer(&pos, &catalog, &catalog_len, end))
1576
case Q_AUTO_INCREMENT:
1577
CHECK_SPACE(pos, end, 4);
1578
auto_increment_increment= uint2korr(pos);
1579
auto_increment_offset= uint2korr(pos+2);
1582
case Q_CHARSET_CODE:
1584
CHECK_SPACE(pos, end, 6);
1586
memcpy(charset, pos, 6);
1590
case Q_TIME_ZONE_CODE:
1592
if (get_str_len_and_pointer(&pos, &time_zone_str, &time_zone_len, end))
1599
case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
1600
CHECK_SPACE(pos, end, 1);
1601
if ((catalog_len= *pos))
1602
catalog= (char*) pos+1; // Will be copied later
1603
CHECK_SPACE(pos, end, catalog_len + 2);
1604
pos+= catalog_len+2; // leap over end 0
1605
catalog_nz= 0; // catalog has end 0 in event
1607
case Q_LC_TIME_NAMES_CODE:
1608
CHECK_SPACE(pos, end, 2);
1609
lc_time_names_number= uint2korr(pos);
1612
case Q_CHARSET_DATABASE_CODE:
1613
CHECK_SPACE(pos, end, 2);
1614
charset_database_number= uint2korr(pos);
1618
/* That's why you must write status vars in growing order of code */
1619
pos= (const unsigned char*) end; // Break loop
1623
if (!(start= data_buf = (Log_event::Byte*) my_malloc(catalog_len + 1 +
1628
if (catalog_len) // If catalog is given
1631
@todo we should clean up and do only copy_str_and_move; it
1632
works for both cases. Then we can remove the catalog_nz
1635
if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
1636
copy_str_and_move(&catalog, &start, catalog_len);
1639
memcpy(start, catalog, catalog_len+1); // copy end 0
1640
catalog= (const char *)start;
1641
start+= catalog_len+1;
1645
copy_str_and_move(&time_zone_str, &start, time_zone_len);
1648
if time_zone_len or catalog_len are 0, then time_zone and catalog
1649
are uninitialized at this point. shouldn't they point to the
1650
zero-length null-terminated strings we allocated space for in the
1651
my_alloc call above? /sven
1654
/* A 2nd variable part; this is common to all versions */
1655
memcpy(start, end, data_len); // Copy db and query
1656
start[data_len]= '\0'; // End query with \0 (For safetly)
1658
query= (char *)(start + db_len + 1);
1659
q_len= data_len - db_len -1;
1665
Query_log_event::do_apply_event()
1667
int Query_log_event::do_apply_event(Relay_log_info const *rli)
1669
return do_apply_event(rli, query, q_len);
1675
Compare the values of "affected rows" around here. Something
1678
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1680
sql_print_error("Slave: did not get the expected number of affected \
1681
rows running query from master - expected %d, got %d (this numbers \
1682
should have matched modulo 4294967296).", 0, ...);
1683
thd->query_error = 1;
1686
We may also want an option to tell the slave to ignore "affected"
1687
mismatch. This mismatch could be implemented with a new ER_ code, and
1688
to ignore it you would use --slave-skip-errors...
1690
int Query_log_event::do_apply_event(Relay_log_info const *rli,
1691
const char *query_arg, uint32_t q_len_arg)
1694
int expected_error,actual_error= 0;
1696
Colleagues: please never free(thd->catalog) in MySQL. This would
1697
lead to bugs as here thd->catalog is a part of an alloced block,
1698
not an entire alloced block (see
1699
Query_log_event::do_apply_event()). Same for thd->db. Thank
1702
thd->catalog= catalog_len ? (char *) catalog : (char *)"";
1703
new_db.length= db_len;
1704
new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
1705
thd->set_db(new_db.str, new_db.length); /* allocates a copy of 'db' */
1706
thd->variables.auto_increment_increment= auto_increment_increment;
1707
thd->variables.auto_increment_offset= auto_increment_offset;
1710
InnoDB internally stores the master log position it has executed so far,
1711
i.e. the position just after the COMMIT event.
1712
When InnoDB will want to store, the positions in rli won't have
1713
been updated yet, so group_master_log_* will point to old BEGIN
1714
and event_master_log* will point to the beginning of current COMMIT.
1715
But log_pos of the COMMIT Query event is what we want, i.e. the pos of the
1716
END of the current log event (COMMIT). We save it in rli so that InnoDB can
1719
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
1721
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
1722
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
1725
Note: We do not need to execute reset_one_shot_variables() if this
1727
Reason: The db stored in binlog events is the same for SET and for
1728
its companion query. If the SET is ignored because of
1729
db_ok(), the companion query will also be ignored, and if
1730
the companion query is ignored in the db_ok() test of
1731
::do_apply_event(), then the companion SET also have so
1732
we don't need to reset_one_shot_variables().
1734
if (rpl_filter->db_ok(thd->db))
1736
thd->set_time((time_t)when);
1737
thd->query_length= q_len_arg;
1738
thd->query= (char*)query_arg;
1739
pthread_mutex_lock(&LOCK_thread_count);
1740
thd->query_id = next_query_id();
1741
pthread_mutex_unlock(&LOCK_thread_count);
1742
thd->variables.pseudo_thread_id= thread_id; // for temp tables
1744
if (ignored_error_code((expected_error= error_code)) ||
1745
!check_expected_error(thd,rli,expected_error))
1749
all bits of thd->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
1750
must take their value from flags2.
1752
thd->options= flags2|(thd->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
1754
else, we are in a 3.23/4.0 binlog; we previously received a
1755
Rotate_log_event which reset thd->options and sql_mode etc, so
1760
if (rli->cached_charset_compare(charset))
1762
/* Verify that we support the charsets found in the event. */
1763
if (!(thd->variables.character_set_client=
1764
get_charset(uint2korr(charset), MYF(MY_WME))) ||
1765
!(thd->variables.collation_connection=
1766
get_charset(uint2korr(charset+2), MYF(MY_WME))) ||
1767
!(thd->variables.collation_server=
1768
get_charset(uint2korr(charset+4), MYF(MY_WME))))
1771
We updated the thd->variables with nonsensical values (0). Let's
1772
set them to something safe (i.e. which avoids crash), and we'll
1773
stop with EE_UNKNOWN_CHARSET in compare_errors (unless set to
1776
set_slave_thread_default_charset(thd, rli);
1777
goto compare_errors;
1779
thd->update_charset(); // for the charset change to take effect
1784
String tmp(time_zone_str, time_zone_len, &my_charset_bin);
1785
if (!(thd->variables.time_zone= my_tz_find(thd, &tmp)))
1787
my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr());
1788
thd->variables.time_zone= global_system_variables.time_zone;
1789
goto compare_errors;
1792
if (lc_time_names_number)
1794
if (!(thd->variables.lc_time_names=
1795
my_locale_by_number(lc_time_names_number)))
1797
my_printf_error(ER_UNKNOWN_ERROR,
1798
"Unknown locale: '%d'", MYF(0), lc_time_names_number);
1799
thd->variables.lc_time_names= &my_locale_en_US;
1800
goto compare_errors;
1804
thd->variables.lc_time_names= &my_locale_en_US;
1805
if (charset_database_number)
1807
const CHARSET_INFO *cs;
1808
if (!(cs= get_charset(charset_database_number, MYF(0))))
1811
int10_to_str((int) charset_database_number, buf, -10);
1812
my_error(ER_UNKNOWN_COLLATION, MYF(0), buf);
1813
goto compare_errors;
1815
thd->variables.collation_database= cs;
1818
thd->variables.collation_database= thd->db_charset;
1820
/* Execute the query (note that we bypass dispatch_command()) */
1821
const char* found_semicolon= NULL;
1822
mysql_parse(thd, thd->query, thd->query_length, &found_semicolon);
1823
log_slow_statement(thd);
1828
The query got a really bad error on the master (thread killed etc),
1829
which could be inconsistent. Parse it to test the table names: if the
1830
replicate-*-do|ignore-table rules say "this query must be ignored" then
1831
we exit gracefully; otherwise we warn about the bad error and tell DBA
1834
if (mysql_test_parse_for_slave(thd, thd->query, thd->query_length))
1835
clear_all_errors(thd, const_cast<Relay_log_info*>(rli)); /* Can ignore query */
1838
rli->report(ERROR_LEVEL, expected_error,
1839
_("Query partially completed on the master "
1840
"(error on master: %d) and was aborted. There is a "
1841
"chance that your master is inconsistent at this "
1842
"point. If you are sure that your master is ok, run "
1843
"this query manually on the slave and then restart the "
1844
"slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1; "
1845
"START SLAVE; . Query: '%s'"),
1846
expected_error, thd->query);
1847
thd->is_slave_error= 1;
1855
If we expected a non-zero error code, and we don't get the same error
1856
code, and none of them should be ignored.
1858
actual_error= thd->is_error() ? thd->main_da.sql_errno() : 0;
1859
if ((expected_error != actual_error) &&
1861
!ignored_error_code(actual_error) &&
1862
!ignored_error_code(expected_error))
1864
rli->report(ERROR_LEVEL, 0,
1865
_("Query caused differenxt errors on master and slave.\n"
1866
"Error on master: '%s' (%d), Error on slave: '%s' (%d).\n"
1867
"Default database: '%s'. Query: '%s'"),
1868
ER_SAFE(expected_error),
1870
actual_error ? thd->main_da.message() : _("no error"),
1872
print_slave_db_safe(db), query_arg);
1873
thd->is_slave_error= 1;
1876
If we get the same error code as expected, or they should be ignored.
1878
else if (expected_error == actual_error ||
1879
ignored_error_code(actual_error))
1881
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
1882
thd->killed= THD::NOT_KILLED;
1885
Other cases: mostly we expected no error and get one.
1887
else if (thd->is_slave_error || thd->is_fatal_error)
1889
rli->report(ERROR_LEVEL, actual_error,
1890
_("Error '%s' on query. Default database: '%s'. Query: '%s'"),
1891
(actual_error ? thd->main_da.message() :
1892
_("unexpected success or fatal error")),
1893
print_slave_db_safe(thd->db), query_arg);
1894
thd->is_slave_error= 1;
1898
TODO: compare the values of "affected rows" around here. Something
1900
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1902
sql_print_error("Slave: did not get the expected number of affected \
1903
rows running query from master - expected %d, got %d (this numbers \
1904
should have matched modulo 4294967296).", 0, ...);
1905
thd->is_slave_error = 1;
1907
We may also want an option to tell the slave to ignore "affected"
1908
mismatch. This mismatch could be implemented with a new ER_ code, and
1909
to ignore it you would use --slave-skip-errors...
1911
To do the comparison we need to know the value of "affected" which the
1912
above mysql_parse() computed. And we need to know the value of
1913
"affected" in the master's binlog. Both will be implemented later. The
1914
important thing is that we now have the format ready to log the values
1915
of "affected" in the binlog. So we can release 5.0.0 before effectively
1916
logging "affected" and effectively comparing it.
1918
} /* End of if (db_ok(... */
1921
pthread_mutex_lock(&LOCK_thread_count);
1923
Probably we have set thd->query, thd->db, thd->catalog to point to places
1924
in the data_buf of this event. Now the event is going to be deleted
1925
probably, so data_buf will be freed, so the thd->... listed above will be
1926
pointers to freed memory.
1927
So we must set them to 0, so that those bad pointers values are not later
1928
used. Note that "cleanup" queries like automatic DROP TEMPORARY Table
1929
don't suffer from these assignments to 0 as DROP TEMPORARY
1930
Table uses the db.table syntax.
1933
thd->set_db(NULL, 0); /* will free the current database */
1934
thd->query= 0; // just to be sure
1935
thd->query_length= 0;
1936
pthread_mutex_unlock(&LOCK_thread_count);
1937
close_thread_tables(thd);
1939
As a disk space optimization, future masters will not log an event for
1940
LAST_INSERT_ID() if that function returned 0 (and thus they will be able
1941
to replace the THD::stmt_depends_on_first_successful_insert_id_in_prev_stmt
1942
variable by (THD->first_successful_insert_id_in_prev_stmt > 0) ; with the
1943
resetting below we are ready to support that.
1945
thd->first_successful_insert_id_in_prev_stmt_for_binlog= 0;
1946
thd->first_successful_insert_id_in_prev_stmt= 0;
1947
thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
1948
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
1949
return thd->is_slave_error;
1952
int Query_log_event::do_update_pos(Relay_log_info *rli)
1955
Note that we will not increment group* positions if we are just
1956
after a SET ONE_SHOT, because SET ONE_SHOT should not be separated
1957
from its following updating query.
1959
if (thd->one_shot_set)
1961
rli->inc_event_relay_log_pos();
1965
return Log_event::do_update_pos(rli);
1969
Log_event::enum_skip_reason
1970
Query_log_event::do_shall_skip(Relay_log_info *rli)
1972
assert(query && q_len > 0);
1974
if (rli->slave_skip_counter > 0)
1976
if (strcmp("BEGIN", query) == 0)
1978
thd->options|= OPTION_BEGIN;
1979
return(Log_event::continue_group(rli));
1982
if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
1984
thd->options&= ~OPTION_BEGIN;
1985
return(Log_event::EVENT_SKIP_COUNT);
1988
return(Log_event::do_shall_skip(rli));
1992
/**************************************************************************
1993
Start_log_event_v3 methods
1994
**************************************************************************/
1996
Start_log_event_v3::Start_log_event_v3()
1997
:Log_event(), created(0), binlog_version(BINLOG_VERSION),
1998
artificial_event(0), dont_set_created(0)
2000
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
2004
Start_log_event_v3::pack_info()
2007
void Start_log_event_v3::pack_info(Protocol *protocol)
2009
char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
2010
pos= my_stpcpy(buf, "Server ver: ");
2011
pos= my_stpcpy(pos, server_version);
2012
pos= my_stpcpy(pos, ", Binlog ver: ");
2013
pos= int10_to_str(binlog_version, pos, 10);
2014
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
2019
Start_log_event_v3::Start_log_event_v3()
2022
Start_log_event_v3::Start_log_event_v3(const char* buf,
2023
const Format_description_log_event
2025
:Log_event(buf, description_event)
2027
buf+= description_event->common_header_len;
2028
binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET);
2029
memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
2031
// prevent overrun if log is corrupted on disk
2032
server_version[ST_SERVER_VER_LEN-1]= 0;
2033
created= uint4korr(buf+ST_CREATED_OFFSET);
2034
/* We use log_pos to mark if this was an artificial event or not */
2035
artificial_event= (log_pos == 0);
2036
dont_set_created= 1;
2041
Start_log_event_v3::write()
2044
bool Start_log_event_v3::write(IO_CACHE* file)
2046
char buff[START_V3_HEADER_LEN];
2047
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
2048
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
2049
if (!dont_set_created)
2050
created= when= get_time();
2051
int4store(buff + ST_CREATED_OFFSET,created);
2052
return (write_header(file, sizeof(buff)) ||
2053
my_b_safe_write(file, (unsigned char*) buff, sizeof(buff)));
2058
Start_log_event_v3::do_apply_event() .
2062
- To handle the case where the master died without having time to write
2063
DROP TEMPORARY Table, DO RELEASE_LOCK (prepared statements' deletion is
2064
TODO), we clean up all temporary tables that we got, if we are sure we
2068
- Remove all active user locks.
2069
Guilhem 2003-06: this is true but not urgent: the worst it can cause is
2070
the use of a bit of memory for a user lock which will not be used
2071
anymore. If the user lock is later used, the old one will be released. In
2072
other words, no deadlock problem.
2075
int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
2077
switch (binlog_version)
2082
This can either be 4.x (then a Start_log_event_v3 is only at master
2083
startup so we are sure the master has restarted and cleared his temp
2084
tables; the event always has 'created'>0) or 5.0 (then we have to test
2089
close_temporary_tables(thd);
2090
cleanup_load_tmpdir();
2095
Now the older formats; in that case load_tmpdir is cleaned up by the I/O
2099
if (strncmp(rli->relay_log.description_event_for_exec->server_version,
2100
"3.23.57",7) >= 0 && created)
2103
Can distinguish, based on the value of 'created': this event was
2104
generated at master startup.
2106
close_temporary_tables(thd);
2109
Otherwise, can't distinguish a Start_log_event generated at
2110
master startup and one generated by master FLUSH LOGS, so cannot
2111
be sure temp tables have to be dropped. So do nothing.
2115
/* this case is impossible */
2121
/***************************************************************************
2122
Format_description_log_event methods
2123
****************************************************************************/
2126
Format_description_log_event 1st ctor.
2128
Ctor. Can be used to create the event to write to the binary log (when the
2129
server starts or when FLUSH LOGS), or to create artificial events to parse
2130
binlogs from MySQL 3.23 or 4.x.
2131
When in a client, only the 2nd use is possible.
2133
@param binlog_version the binlog version for which we want to build
2134
an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x
2135
x>=2 and 4.1) or 4 (MySQL 5.0). Note that the
2136
old 4.0 (binlog version 2) is not supported;
2137
it should not be used for replication with
2141
Format_description_log_event::
2142
Format_description_log_event(uint8_t binlog_ver, const char* server_ver)
2143
:Start_log_event_v3(), event_type_permutation(0)
2145
binlog_version= binlog_ver;
2146
switch (binlog_ver) {
2147
case 4: /* MySQL 5.0 */
2148
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
2149
common_header_len= LOG_EVENT_HEADER_LEN;
2150
number_of_event_types= LOG_EVENT_TYPES;
2151
/* we'll catch my_malloc() error in is_valid() */
2152
post_header_len=(uint8_t*) my_malloc(number_of_event_types*sizeof(uint8_t),
2155
This long list of assignments is not beautiful, but I see no way to
2156
make it nicer, as the right members are #defines, not array members, so
2157
it's impossible to write a loop.
2159
if (post_header_len)
2161
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2162
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN;
2163
post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
2164
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2165
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2166
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2167
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2168
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2169
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
2170
post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
2171
post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN;
2172
post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2173
post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2174
post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2175
post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1];
2176
post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
2177
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
2178
post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
2183
case 3: /* 4.0.x x>=2 */
2185
We build an artificial (i.e. not sent by the master) event, which
2186
describes what those old master versions send.
2189
my_stpcpy(server_version, server_ver ? server_ver : "3.23");
2191
my_stpcpy(server_version, server_ver ? server_ver : "4.0");
2192
common_header_len= binlog_ver==1 ? OLD_HEADER_LEN :
2193
LOG_EVENT_MINIMAL_HEADER_LEN;
2195
The first new event in binlog version 4 is Format_desc. So any event type
2196
after that does not exist in older versions. We use the events known by
2197
version 3, even if version 1 had only a subset of them (this is not a
2198
problem: it uses a few bytes for nothing but unifies code; it does not
2199
make the slave detect less corruptions).
2201
number_of_event_types= FORMAT_DESCRIPTION_EVENT - 1;
2202
post_header_len=(uint8_t*) my_malloc(number_of_event_types*sizeof(uint8_t),
2204
if (post_header_len)
2206
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2207
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_MINIMAL_LEN;
2208
post_header_len[STOP_EVENT-1]= 0;
2209
post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN;
2210
post_header_len[INTVAR_EVENT-1]= 0;
2211
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2212
post_header_len[SLAVE_EVENT-1]= 0;
2213
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2214
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2215
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2216
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2217
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
2218
post_header_len[RAND_EVENT-1]= 0;
2219
post_header_len[USER_VAR_EVENT-1]= 0;
2222
default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
2223
post_header_len= 0; /* will make is_valid() fail */
2226
calc_server_version_split();
2231
The problem with this constructor is that the fixed header may have a
2232
length different from this version, but we don't know this length as we
2233
have not read the Format_description_log_event which says it, yet. This
2234
length is in the post-header of the event, but we don't know where the
2237
So this type of event HAS to:
2238
- either have the header's length at the beginning (in the header, at a
2239
fixed position which will never be changed), not in the post-header. That
2240
would make the header be "shifted" compared to other events.
2241
- or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future
2242
versions, so that we know for sure.
2244
I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because
2245
it is sent before Format_description_log_event).
2248
Format_description_log_event::
2249
Format_description_log_event(const char* buf,
2252
Format_description_log_event*
2254
:Start_log_event_v3(buf, description_event), event_type_permutation(0)
2256
buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
2257
if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
2258
return; /* sanity check */
2259
number_of_event_types=
2260
event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1);
2261
/* If alloc fails, we'll detect it in is_valid() */
2262
post_header_len= (uint8_t*) my_memdup((unsigned char*)buf+ST_COMMON_HEADER_LEN_OFFSET+1,
2263
number_of_event_types*
2264
sizeof(*post_header_len), MYF(0));
2265
calc_server_version_split();
2268
In some previous versions, the events were given other event type
2269
id numbers than in the present version. When replicating from such
2270
a version, we therefore set up an array that maps those id numbers
2271
to the id numbers of the present server.
2273
If post_header_len is null, it means malloc failed, and is_valid
2274
will fail, so there is no need to do anything.
2276
The trees in which events have wrong id's are:
2278
mysql-5.1-wl1012.old mysql-5.1-wl2325-5.0-drop6p13-alpha
2279
mysql-5.1-wl2325-5.0-drop6 mysql-5.1-wl2325-5.0
2280
mysql-5.1-wl2325-no-dd
2282
(this was found by grepping for two lines in sequence where the
2283
first matches "FORMAT_DESCRIPTION_EVENT," and the second matches
2284
"TABLE_MAP_EVENT," in log_event.h in all trees)
2286
In these trees, the following server_versions existed since
2287
TABLE_MAP_EVENT was introduced:
2289
5.1.1-a_drop5p3 5.1.1-a_drop5p4 5.1.1-alpha
2290
5.1.2-a_drop5p10 5.1.2-a_drop5p11 5.1.2-a_drop5p12
2291
5.1.2-a_drop5p13 5.1.2-a_drop5p14 5.1.2-a_drop5p15
2292
5.1.2-a_drop5p16 5.1.2-a_drop5p16b 5.1.2-a_drop5p16c
2293
5.1.2-a_drop5p17 5.1.2-a_drop5p4 5.1.2-a_drop5p5
2294
5.1.2-a_drop5p6 5.1.2-a_drop5p7 5.1.2-a_drop5p8
2295
5.1.2-a_drop5p9 5.1.3-a_drop5p17 5.1.3-a_drop5p17b
2296
5.1.3-a_drop5p17c 5.1.4-a_drop5p18 5.1.4-a_drop5p19
2297
5.1.4-a_drop5p20 5.1.4-a_drop6p0 5.1.4-a_drop6p1
2298
5.1.4-a_drop6p2 5.1.5-a_drop5p20 5.2.0-a_drop6p3
2299
5.2.0-a_drop6p4 5.2.0-a_drop6p5 5.2.0-a_drop6p6
2300
5.2.1-a_drop6p10 5.2.1-a_drop6p11 5.2.1-a_drop6p12
2301
5.2.1-a_drop6p6 5.2.1-a_drop6p7 5.2.1-a_drop6p8
2302
5.2.2-a_drop6p13 5.2.2-a_drop6p13-alpha 5.2.2-a_drop6p13b
2305
(this was found by grepping for "mysql," in all historical
2306
versions of configure.in in the trees listed above).
2308
There are 5.1.1-alpha versions that use the new event id's, so we
2309
do not test that version string. So replication from 5.1.1-alpha
2310
with the other event id's to a new version does not work.
2311
Moreover, we can safely ignore the part after drop[56]. This
2312
allows us to simplify the big list above to the following regexes:
2314
5\.1\.[1-5]-a_drop5.*
2316
5\.2\.[0-2]-a_drop6.*
2318
This is what we test for in the 'if' below.
2320
if (post_header_len &&
2321
server_version[0] == '5' && server_version[1] == '.' &&
2322
server_version[3] == '.' &&
2323
strncmp(server_version + 5, "-a_drop", 7) == 0 &&
2324
((server_version[2] == '1' &&
2325
server_version[4] >= '1' && server_version[4] <= '5' &&
2326
server_version[12] == '5') ||
2327
(server_version[2] == '1' &&
2328
server_version[4] == '4' &&
2329
server_version[12] == '6') ||
2330
(server_version[2] == '2' &&
2331
server_version[4] >= '0' && server_version[4] <= '2' &&
2332
server_version[12] == '6')))
2334
if (number_of_event_types != 22)
2336
/* this makes is_valid() return false. */
2337
free(post_header_len);
2338
post_header_len= NULL;
2341
static const uint8_t perm[23]=
2343
UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
2344
INTVAR_EVENT, LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
2345
APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
2347
RAND_EVENT, USER_VAR_EVENT,
2348
FORMAT_DESCRIPTION_EVENT,
2350
PRE_GA_WRITE_ROWS_EVENT,
2351
PRE_GA_UPDATE_ROWS_EVENT,
2352
PRE_GA_DELETE_ROWS_EVENT,
2354
BEGIN_LOAD_QUERY_EVENT,
2355
EXECUTE_LOAD_QUERY_EVENT,
2357
event_type_permutation= perm;
2359
Since we use (permuted) event id's to index the post_header_len
2360
array, we need to permute the post_header_len array too.
2362
uint8_t post_header_len_temp[23];
2363
for (int i= 1; i < 23; i++)
2364
post_header_len_temp[perm[i] - 1]= post_header_len[i - 1];
2365
for (int i= 0; i < 22; i++)
2366
post_header_len[i] = post_header_len_temp[i];
2371
bool Format_description_log_event::write(IO_CACHE* file)
2374
We don't call Start_log_event_v3::write() because this would make 2
2377
unsigned char buff[FORMAT_DESCRIPTION_HEADER_LEN];
2378
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
2379
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
2380
if (!dont_set_created)
2381
created= when= get_time();
2382
int4store(buff + ST_CREATED_OFFSET,created);
2383
buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
2384
memcpy(buff+ST_COMMON_HEADER_LEN_OFFSET+1, post_header_len,
2386
return (write_header(file, sizeof(buff)) ||
2387
my_b_safe_write(file, buff, sizeof(buff)));
2391
int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
2394
As a transaction NEVER spans on 2 or more binlogs:
2395
if we have an active transaction at this point, the master died
2396
while writing the transaction to the binary log, i.e. while
2397
flushing the binlog cache to the binlog. XA guarantees that master has
2398
rolled back. So we roll back.
2399
Note: this event could be sent by the master to inform us of the
2400
format of its binlog; in other words maybe it is not at its
2401
original place when it comes to us; we'll know this by checking
2402
log_pos ("artificial" events have log_pos == 0).
2404
if (!artificial_event && created && thd->transaction.all.ha_list)
2406
/* This is not an error (XA is safe), just an information */
2407
rli->report(INFORMATION_LEVEL, 0,
2408
_("Rolling back unfinished transaction (no COMMIT "
2409
"or ROLLBACK in relay log). A probable cause is that "
2410
"the master died while writing the transaction to "
2411
"its binary log, thus rolled back too."));
2412
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 1);
2415
If this event comes from ourselves, there is no cleaning task to
2416
perform, we don't call Start_log_event_v3::do_apply_event()
2417
(this was just to update the log's description event).
2419
if (server_id != (uint32_t) ::server_id)
2422
If the event was not requested by the slave i.e. the master sent
2423
it while the slave asked for a position >4, the event will make
2424
rli->group_master_log_pos advance. Say that the slave asked for
2425
position 1000, and the Format_desc event's end is 96. Then in
2426
the beginning of replication rli->group_master_log_pos will be
2427
0, then 96, then jump to first really asked event (which is
2428
>96). So this is ok.
2430
return(Start_log_event_v3::do_apply_event(rli));
2435
int Format_description_log_event::do_update_pos(Relay_log_info *rli)
2437
/* save the information describing this binlog */
2438
delete rli->relay_log.description_event_for_exec;
2439
rli->relay_log.description_event_for_exec= this;
2441
if (server_id == (uint32_t) ::server_id)
2444
We only increase the relay log position if we are skipping
2445
events and do not touch any group_* variables, nor flush the
2446
relay log info. If there is a crash, we will have to re-skip
2447
the events again, but that is a minor issue.
2449
If we do not skip stepping the group log position (and the
2450
server id was changed when restarting the server), it might well
2451
be that we start executing at a position that is invalid, e.g.,
2452
at a Rows_log_event or a Query_log_event preceeded by a
2453
Intvar_log_event instead of starting at a Table_map_log_event or
2454
the Intvar_log_event respectively.
2456
rli->inc_event_relay_log_pos();
2461
return Log_event::do_update_pos(rli);
2465
Log_event::enum_skip_reason
2466
Format_description_log_event::do_shall_skip(Relay_log_info *rli __attribute__((unused)))
2468
return Log_event::EVENT_SKIP_NOT;
2473
Splits the event's 'server_version' string into three numeric pieces stored
2474
into 'server_version_split':
2475
X.Y.Zabc (X,Y,Z numbers, a not a digit) -> {X,Y,Z}
2478
'server_version_split' is then used for lookups to find if the server which
2479
created this event has some known bug.
2481
void Format_description_log_event::calc_server_version_split()
2483
char *p= server_version, *r;
2485
for (uint32_t i= 0; i<=2; i++)
2487
number= strtoul(p, &r, 10);
2488
server_version_split[i]= (unsigned char)number;
2489
assert(number < 256); // fit in unsigned char
2491
assert(!((i == 0) && (*r != '.'))); // should be true in practice
2493
p++; // skip the dot
2498
/**************************************************************************
2499
Load_log_event methods
2500
General note about Load_log_event: the binlogging of LOAD DATA INFILE is
2501
going to be changed in 5.0 (or maybe in 5.1; not decided yet).
2502
However, the 5.0 slave could still have to read such events (from a 4.x
2503
master), convert them (which just means maybe expand the header, when 5.0
2504
servers have a UID in events) (remember that whatever is after the header
2505
will be like in 4.x, as this event's format is not modified in 5.0 as we
2506
will use new types of events to log the new LOAD DATA INFILE features).
2507
To be able to read/convert, we just need to not assume that the common
2508
header is of length LOG_EVENT_HEADER_LEN (we must use the description
2510
Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE
2511
between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the
2512
positions displayed in SHOW SLAVE STATUS then are fine too).
2513
**************************************************************************/
2516
Load_log_event::pack_info()
2519
uint32_t Load_log_event::get_query_buffer_length()
2522
5 + db_len + 3 + // "use DB; "
2523
18 + fname_len + 2 + // "LOAD DATA INFILE 'file''"
2525
9 + // " REPLACE or IGNORE "
2526
13 + table_name_len*2 + // "INTO Table `table`"
2527
21 + sql_ex.field_term_len*4 + 2 + // " FIELDS TERMINATED BY 'str'"
2528
23 + sql_ex.enclosed_len*4 + 2 + // " OPTIONALLY ENCLOSED BY 'str'"
2529
12 + sql_ex.escaped_len*4 + 2 + // " ESCAPED BY 'str'"
2530
21 + sql_ex.line_term_len*4 + 2 + // " LINES TERMINATED BY 'str'"
2531
19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'"
2532
15 + 22 + // " IGNORE xxx LINES"
2533
3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)"
2537
void Load_log_event::print_query(bool need_db, char *buf,
2538
char **end, char **fn_start, char **fn_end)
2542
if (need_db && db && db_len)
2544
pos= my_stpcpy(pos, "use `");
2545
memcpy(pos, db, db_len);
2546
pos= my_stpcpy(pos+db_len, "`; ");
2549
pos= my_stpcpy(pos, "LOAD DATA ");
2554
if (check_fname_outside_temp_buf())
2555
pos= my_stpcpy(pos, "LOCAL ");
2556
pos= my_stpcpy(pos, "INFILE '");
2557
memcpy(pos, fname, fname_len);
2558
pos= my_stpcpy(pos+fname_len, "' ");
2560
if (sql_ex.opt_flags & REPLACE_FLAG)
2561
pos= my_stpcpy(pos, " REPLACE ");
2562
else if (sql_ex.opt_flags & IGNORE_FLAG)
2563
pos= my_stpcpy(pos, " IGNORE ");
2565
pos= my_stpcpy(pos ,"INTO");
2570
pos= my_stpcpy(pos ," Table `");
2571
memcpy(pos, table_name, table_name_len);
2572
pos+= table_name_len;
2574
/* We have to create all optinal fields as the default is not empty */
2575
pos= my_stpcpy(pos, "` FIELDS TERMINATED BY ");
2576
pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len);
2577
if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
2578
pos= my_stpcpy(pos, " OPTIONALLY ");
2579
pos= my_stpcpy(pos, " ENCLOSED BY ");
2580
pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len);
2582
pos= my_stpcpy(pos, " ESCAPED BY ");
2583
pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
2585
pos= my_stpcpy(pos, " LINES TERMINATED BY ");
2586
pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
2587
if (sql_ex.line_start_len)
2589
pos= my_stpcpy(pos, " STARTING BY ");
2590
pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
2593
if ((long) skip_lines > 0)
2595
pos= my_stpcpy(pos, " IGNORE ");
2596
pos= int64_t10_to_str((int64_t) skip_lines, pos, 10);
2597
pos= my_stpcpy(pos," LINES ");
2603
const char *field= fields;
2604
pos= my_stpcpy(pos, " (");
2605
for (i = 0; i < num_fields; i++)
2612
memcpy(pos, field, field_lens[i]);
2613
pos+= field_lens[i];
2614
field+= field_lens[i] + 1;
2623
void Load_log_event::pack_info(Protocol *protocol)
2627
if (!(buf= (char*) my_malloc(get_query_buffer_length(), MYF(MY_WME))))
2629
print_query(true, buf, &end, 0, 0);
2630
protocol->store(buf, end-buf, &my_charset_bin);
2636
Load_log_event::write_data_header()
2639
bool Load_log_event::write_data_header(IO_CACHE* file)
2641
char buf[LOAD_HEADER_LEN];
2642
int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
2643
int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
2644
int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
2645
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
2646
buf[L_DB_LEN_OFFSET] = (char)db_len;
2647
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
2648
return my_b_safe_write(file, (unsigned char*)buf, LOAD_HEADER_LEN) != 0;
2653
Load_log_event::write_data_body()
2656
bool Load_log_event::write_data_body(IO_CACHE* file)
2658
if (sql_ex.write_data(file))
2660
if (num_fields && fields && field_lens)
2662
if (my_b_safe_write(file, (unsigned char*)field_lens, num_fields) ||
2663
my_b_safe_write(file, (unsigned char*)fields, field_block_len))
2666
return (my_b_safe_write(file, (unsigned char*)table_name, table_name_len + 1) ||
2667
my_b_safe_write(file, (unsigned char*)db, db_len + 1) ||
2668
my_b_safe_write(file, (unsigned char*)fname, fname_len));
2673
Load_log_event::Load_log_event()
2676
Load_log_event::Load_log_event(THD *thd_arg, sql_exchange *ex,
2677
const char *db_arg, const char *table_name_arg,
2678
List<Item> &fields_arg,
2679
enum enum_duplicates handle_dup,
2680
bool ignore, bool using_trans)
2682
thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
2684
thread_id(thd_arg->thread_id),
2685
slave_proxy_id(thd_arg->variables.pseudo_thread_id),
2686
num_fields(0),fields(0),
2687
field_lens(0),field_block_len(0),
2688
table_name(table_name_arg ? table_name_arg : ""),
2689
db(db_arg), fname(ex->file_name), local_fname(false)
2693
exec_time = (ulong) (end_time - thd_arg->start_time);
2694
/* db can never be a zero pointer in 4.0 */
2695
db_len = (uint32_t) strlen(db);
2696
table_name_len = (uint32_t) strlen(table_name);
2697
fname_len = (fname) ? (uint) strlen(fname) : 0;
2698
sql_ex.field_term = (char*) ex->field_term->ptr();
2699
sql_ex.field_term_len = (uint8_t) ex->field_term->length();
2700
sql_ex.enclosed = (char*) ex->enclosed->ptr();
2701
sql_ex.enclosed_len = (uint8_t) ex->enclosed->length();
2702
sql_ex.line_term = (char*) ex->line_term->ptr();
2703
sql_ex.line_term_len = (uint8_t) ex->line_term->length();
2704
sql_ex.line_start = (char*) ex->line_start->ptr();
2705
sql_ex.line_start_len = (uint8_t) ex->line_start->length();
2706
sql_ex.escaped = (char*) ex->escaped->ptr();
2707
sql_ex.escaped_len = (uint8_t) ex->escaped->length();
2708
sql_ex.opt_flags = 0;
2709
sql_ex.cached_new_format = -1;
2712
sql_ex.opt_flags|= DUMPFILE_FLAG;
2713
if (ex->opt_enclosed)
2714
sql_ex.opt_flags|= OPT_ENCLOSED_FLAG;
2716
sql_ex.empty_flags= 0;
2718
switch (handle_dup) {
2720
sql_ex.opt_flags|= REPLACE_FLAG;
2722
case DUP_UPDATE: // Impossible here
2727
sql_ex.opt_flags|= IGNORE_FLAG;
2729
if (!ex->field_term->length())
2730
sql_ex.empty_flags |= FIELD_TERM_EMPTY;
2731
if (!ex->enclosed->length())
2732
sql_ex.empty_flags |= ENCLOSED_EMPTY;
2733
if (!ex->line_term->length())
2734
sql_ex.empty_flags |= LINE_TERM_EMPTY;
2735
if (!ex->line_start->length())
2736
sql_ex.empty_flags |= LINE_START_EMPTY;
2737
if (!ex->escaped->length())
2738
sql_ex.empty_flags |= ESCAPED_EMPTY;
2740
skip_lines = ex->skip_lines;
2742
List_iterator<Item> li(fields_arg);
2743
field_lens_buf.length(0);
2744
fields_buf.length(0);
2746
while ((item = li++))
2749
unsigned char len = (unsigned char) strlen(item->name);
2750
field_block_len += len + 1;
2751
fields_buf.append(item->name, len + 1);
2752
field_lens_buf.append((char*)&len, 1);
2755
field_lens = (const unsigned char*)field_lens_buf.ptr();
2756
fields = fields_buf.ptr();
2762
The caller must do buf[event_len] = 0 before he starts using the
2765
Load_log_event::Load_log_event(const char *buf, uint32_t event_len,
2766
const Format_description_log_event *description_event)
2767
:Log_event(buf, description_event), num_fields(0), fields(0),
2768
field_lens(0),field_block_len(0),
2769
table_name(0), db(0), fname(0), local_fname(false)
2772
I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0,
2773
4.0->5.0 and 5.0->5.0 and it works.
2776
copy_log_event(buf, event_len,
2777
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
2779
description_event->common_header_len :
2780
LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
2782
/* otherwise it's a derived class, will call copy_log_event() itself */
2788
Load_log_event::copy_log_event()
2791
int Load_log_event::copy_log_event(const char *buf, ulong event_len,
2793
const Format_description_log_event *description_event)
2796
char* buf_end = (char*)buf + event_len;
2797
/* this is the beginning of the post-header */
2798
const char* data_head = buf + description_event->common_header_len;
2799
slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
2800
exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
2801
skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
2802
table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
2803
db_len = (uint)data_head[L_DB_LEN_OFFSET];
2804
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
2806
if ((int) event_len < body_offset)
2809
Sql_ex.init() on success returns the pointer to the first byte after
2810
the sql_ex structure, which is the start of field lengths array.
2812
if (!(field_lens= (unsigned char*)sql_ex.init((char*)buf + body_offset,
2814
buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
2817
data_len = event_len - body_offset;
2818
if (num_fields > data_len) // simple sanity check against corruption
2820
for (uint32_t i = 0; i < num_fields; i++)
2821
field_block_len += (uint)field_lens[i] + 1;
2823
fields = (char*)field_lens + num_fields;
2824
table_name = fields + field_block_len;
2825
db = table_name + table_name_len + 1;
2826
fname = db + db_len + 1;
2827
fname_len = strlen(fname);
2828
// null termination is accomplished by the caller doing buf[event_len]=0
2835
Load_log_event::set_fields()
2838
This function can not use the member variable
2839
for the database, since LOAD DATA INFILE on the slave
2840
can be for a different database than the current one.
2841
This is the reason for the affected_db argument to this method.
2844
void Load_log_event::set_fields(const char* affected_db,
2845
List<Item> &field_list,
2846
Name_resolution_context *context)
2849
const char* field = fields;
2850
for (i= 0; i < num_fields; i++)
2852
field_list.push_back(new Item_field(context,
2853
affected_db, table_name, field));
2854
field+= field_lens[i] + 1;
2860
Does the data loading job when executing a LOAD DATA on the slave.
2864
@param use_rli_only_for_errors If set to 1, rli is provided to
2865
Load_log_event::exec_event only for this
2866
function to have RPL_LOG_NAME and
2867
rli->last_slave_error, both being used by
2868
error reports. rli's position advancing
2869
is skipped (done by the caller which is
2870
Execute_load_log_event::exec_event).
2871
If set to 0, rli is provided for full use,
2872
i.e. for error reports and position
2876
fix this; this can be done by testing rules in
2877
Create_file_log_event::exec_event() and then discarding Append_block and
2880
this is a bug - this needs to be moved to the I/O thread
2888
int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
2889
bool use_rli_only_for_errors)
2892
new_db.length= db_len;
2893
new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
2894
thd->set_db(new_db.str, new_db.length);
2895
assert(thd->query == 0);
2896
thd->query_length= 0; // Should not be needed
2897
thd->is_slave_error= 0;
2898
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
2900
/* see Query_log_event::do_apply_event() and BUG#13360 */
2901
assert(!rli->m_table_map.count());
2903
Usually lex_start() is called by mysql_parse(), but we need it here
2904
as the present method does not call mysql_parse().
2907
mysql_reset_thd_for_next_command(thd);
2909
if (!use_rli_only_for_errors)
2912
Saved for InnoDB, see comment in
2913
Query_log_event::do_apply_event()
2915
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
2919
We test replicate_*_db rules. Note that we have already prepared
2920
the file to load, even if we are going to ignore and delete it
2921
now. So it is possible that we did a lot of disk writes for
2922
nothing. In other words, a big LOAD DATA INFILE on the master will
2923
still consume a lot of space on the slave (space in the relay log
2924
+ space of temp files: twice the space of the file to load...)
2925
even if it will finally be ignored. TODO: fix this; this can be
2926
done by testing rules in Create_file_log_event::do_apply_event()
2927
and then discarding Append_block and al. Another way is do the
2928
filtering in the I/O thread (more efficient: no disk writes at
2932
Note: We do not need to execute reset_one_shot_variables() if this
2934
Reason: The db stored in binlog events is the same for SET and for
2935
its companion query. If the SET is ignored because of
2936
db_ok(), the companion query will also be ignored, and if
2937
the companion query is ignored in the db_ok() test of
2938
::do_apply_event(), then the companion SET also have so
2939
we don't need to reset_one_shot_variables().
2941
if (rpl_filter->db_ok(thd->db))
2943
thd->set_time((time_t)when);
2944
pthread_mutex_lock(&LOCK_thread_count);
2945
thd->query_id = next_query_id();
2946
pthread_mutex_unlock(&LOCK_thread_count);
2948
Initing thd->row_count is not necessary in theory as this variable has no
2949
influence in the case of the slave SQL thread (it is used to generate a
2950
"data truncated" warning but which is absorbed and never gets to the
2951
error log); still we init it to avoid a Valgrind message.
2953
drizzle_reset_errors(thd, 0);
2956
memset(&tables, 0, sizeof(tables));
2957
tables.db= thd->strmake(thd->db, thd->db_length);
2958
tables.alias = tables.table_name = (char*) table_name;
2959
tables.lock_type = TL_WRITE;
2962
// the table will be opened in mysql_load
2963
if (rpl_filter->is_on() && !rpl_filter->tables_ok(thd->db, &tables))
2965
// TODO: this is a bug - this needs to be moved to the I/O thread
2967
skip_load_data_infile(net);
2973
enum enum_duplicates handle_dup;
2975
char *load_data_query;
2978
Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST
2979
and written to slave's binlog if binlogging is on.
2981
if (!(load_data_query= (char *)thd->alloc(get_query_buffer_length() + 1)))
2984
This will set thd->fatal_error in case of OOM. So we surely will notice
2985
that something is wrong.
2990
print_query(false, load_data_query, &end, (char **)&thd->lex->fname_start,
2991
(char **)&thd->lex->fname_end);
2993
thd->query_length= end - load_data_query;
2994
thd->query= load_data_query;
2996
if (sql_ex.opt_flags & REPLACE_FLAG)
2998
handle_dup= DUP_REPLACE;
3000
else if (sql_ex.opt_flags & IGNORE_FLAG)
3003
handle_dup= DUP_ERROR;
3008
When replication is running fine, if it was DUP_ERROR on the
3009
master then we could choose IGNORE here, because if DUP_ERROR
3010
suceeded on master, and data is identical on the master and slave,
3011
then there should be no uniqueness errors on slave, so IGNORE is
3012
the same as DUP_ERROR. But in the unlikely case of uniqueness errors
3013
(because the data on the master and slave happen to be different
3014
(user error or bug), we want LOAD DATA to print an error message on
3015
the slave to discover the problem.
3017
If reading from net (a 3.23 master), mysql_load() will change this
3020
handle_dup= DUP_ERROR;
3023
We need to set thd->lex->sql_command and thd->lex->duplicates
3024
since InnoDB tests these variables to decide if this is a LOAD
3025
DATA ... REPLACE INTO ... statement even though mysql_parse()
3026
is not called. This is not needed in 5.0 since there the LOAD
3027
DATA ... statement is replicated using mysql_parse(), which
3028
sets the thd->lex fields correctly.
3030
thd->lex->sql_command= SQLCOM_LOAD;
3031
thd->lex->duplicates= handle_dup;
3033
sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
3034
String field_term(sql_ex.field_term,sql_ex.field_term_len,log_cs);
3035
String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,log_cs);
3036
String line_term(sql_ex.line_term,sql_ex.line_term_len,log_cs);
3037
String line_start(sql_ex.line_start,sql_ex.line_start_len,log_cs);
3038
String escaped(sql_ex.escaped,sql_ex.escaped_len, log_cs);
3039
ex.field_term= &field_term;
3040
ex.enclosed= &enclosed;
3041
ex.line_term= &line_term;
3042
ex.line_start= &line_start;
3043
ex.escaped= &escaped;
3045
ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
3046
if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
3047
ex.field_term->length(0);
3049
ex.skip_lines = skip_lines;
3050
List<Item> field_list;
3051
thd->lex->select_lex.context.resolve_in_table_list_only(&tables);
3052
set_fields(tables.db, field_list, &thd->lex->select_lex.context);
3053
thd->variables.pseudo_thread_id= thread_id;
3056
// mysql_load will use thd->net to read the file
3057
thd->net.vio = net->vio;
3059
Make sure the client does not get confused about the packet sequence
3061
thd->net.pkt_nr = net->pkt_nr;
3064
It is safe to use tmp_list twice because we are not going to
3065
update it inside mysql_load().
3067
List<Item> tmp_list;
3068
if (mysql_load(thd, &ex, &tables, field_list, tmp_list, tmp_list,
3069
handle_dup, ignore, net != 0))
3070
thd->is_slave_error= 1;
3071
if (thd->cuted_fields)
3073
/* log_pos is the position of the LOAD event in the master log */
3074
sql_print_warning(_("Slave: load data infile on table '%s' at "
3075
"log position %s in log '%s' produced %ld "
3076
"warning(s). Default database: '%s'"),
3078
llstr(log_pos,llbuff), RPL_LOG_NAME,
3079
(ulong) thd->cuted_fields,
3080
print_slave_db_safe(thd->db));
3083
net->pkt_nr= thd->net.pkt_nr;
3089
We will just ask the master to send us /dev/null if we do not
3090
want to load the data.
3091
TODO: this a bug - needs to be done in I/O thread
3094
skip_load_data_infile(net);
3099
const char *remember_db= thd->db;
3100
pthread_mutex_lock(&LOCK_thread_count);
3102
thd->set_db(NULL, 0); /* will free the current database */
3104
thd->query_length= 0;
3105
pthread_mutex_unlock(&LOCK_thread_count);
3106
close_thread_tables(thd);
3108
if (thd->is_slave_error)
3110
/* this err/sql_errno code is copy-paste from net_send_error() */
3113
if (thd->is_error())
3115
err= thd->main_da.message();
3116
sql_errno= thd->main_da.sql_errno();
3120
sql_errno=ER_UNKNOWN_ERROR;
3123
rli->report(ERROR_LEVEL, sql_errno,
3124
_("Error '%s' running LOAD DATA INFILE on table '%s'. "
3125
"Default database: '%s'"),
3126
err, (char*)table_name, print_slave_db_safe(remember_db));
3127
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
3130
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
3132
if (thd->is_fatal_error)
3135
snprintf(buf, sizeof(buf),
3136
_("Running LOAD DATA INFILE on table '%-.64s'."
3137
" Default database: '%-.64s'"),
3139
print_slave_db_safe(remember_db));
3141
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
3142
ER(ER_SLAVE_FATAL_ERROR), buf);
3146
return ( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) );
3150
/**************************************************************************
3151
Rotate_log_event methods
3152
**************************************************************************/
3155
Rotate_log_event::pack_info()
3158
void Rotate_log_event::pack_info(Protocol *protocol)
3160
char buf1[256], buf[22];
3161
String tmp(buf1, sizeof(buf1), log_cs);
3163
tmp.append(new_log_ident, ident_len);
3164
tmp.append(STRING_WITH_LEN(";pos="));
3165
tmp.append(llstr(pos,buf));
3166
protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
3171
Rotate_log_event::Rotate_log_event() (2 constructors)
3175
Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
3176
uint32_t ident_len_arg, uint64_t pos_arg,
3178
:Log_event(), new_log_ident(new_log_ident_arg),
3179
pos(pos_arg),ident_len(ident_len_arg ? ident_len_arg :
3180
(uint) strlen(new_log_ident_arg)), flags(flags_arg)
3182
if (flags & DUP_NAME)
3183
new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME));
3188
Rotate_log_event::Rotate_log_event(const char* buf, uint32_t event_len,
3189
const Format_description_log_event* description_event)
3190
:Log_event(buf, description_event) ,new_log_ident(0), flags(DUP_NAME)
3192
// The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
3193
uint8_t header_size= description_event->common_header_len;
3194
uint8_t post_header_len= description_event->post_header_len[ROTATE_EVENT-1];
3195
uint32_t ident_offset;
3196
if (event_len < header_size)
3199
pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
3200
ident_len = (uint)(event_len -
3201
(header_size+post_header_len));
3202
ident_offset = post_header_len;
3203
set_if_smaller(ident_len,FN_REFLEN-1);
3204
new_log_ident= my_strndup(buf + ident_offset, (uint) ident_len, MYF(MY_WME));
3210
Rotate_log_event::write()
3213
bool Rotate_log_event::write(IO_CACHE* file)
3215
char buf[ROTATE_HEADER_LEN];
3216
int8store(buf + R_POS_OFFSET, pos);
3217
return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
3218
my_b_safe_write(file, (unsigned char*)buf, ROTATE_HEADER_LEN) ||
3219
my_b_safe_write(file, (unsigned char*)new_log_ident, (uint) ident_len));
3224
Got a rotate log event from the master.
3226
This is mainly used so that we can later figure out the logname and
3227
position for the master.
3229
We can't rotate the slave's BINlog as this will cause infinitive rotations
3230
in a A -> B -> A setup.
3231
The NOTES below is a wrong comment which will disappear when 4.1 is merged.
3236
int Rotate_log_event::do_update_pos(Relay_log_info *rli)
3238
pthread_mutex_lock(&rli->data_lock);
3239
rli->event_relay_log_pos= my_b_tell(rli->cur_log);
3241
If we are in a transaction or in a group: the only normal case is
3242
when the I/O thread was copying a big transaction, then it was
3243
stopped and restarted: we have this in the relay log:
3251
In that case, we don't want to touch the coordinates which
3252
correspond to the beginning of the transaction. Starting from
3253
5.0.0, there also are some rotates from the slave itself, in the
3254
relay log, which shall not change the group positions.
3256
if ((server_id != ::server_id || rli->replicate_same_server_id) &&
3257
!rli->is_in_group())
3259
memcpy(rli->group_master_log_name, new_log_ident, ident_len+1);
3260
rli->notify_group_master_log_name_update();
3261
rli->group_master_log_pos= pos;
3262
strmake(rli->group_relay_log_name, rli->event_relay_log_name,
3263
sizeof(rli->group_relay_log_name) - 1);
3264
rli->notify_group_relay_log_name_update();
3265
rli->group_relay_log_pos= rli->event_relay_log_pos;
3267
Reset thd->options and sql_mode etc, because this could be the signal of
3268
a master's downgrade from 5.0 to 4.0.
3269
However, no need to reset description_event_for_exec: indeed, if the next
3270
master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
3271
master is 4.0 then the events are in the slave's format (conversion).
3273
set_slave_thread_options(thd);
3274
set_slave_thread_default_charset(thd, rli);
3275
thd->variables.auto_increment_increment=
3276
thd->variables.auto_increment_offset= 1;
3278
pthread_mutex_unlock(&rli->data_lock);
3279
pthread_cond_broadcast(&rli->data_cond);
3280
flush_relay_log_info(rli);
3286
Log_event::enum_skip_reason
3287
Rotate_log_event::do_shall_skip(Relay_log_info *rli)
3289
enum_skip_reason reason= Log_event::do_shall_skip(rli);
3292
case Log_event::EVENT_SKIP_NOT:
3293
case Log_event::EVENT_SKIP_COUNT:
3294
return Log_event::EVENT_SKIP_NOT;
3296
case Log_event::EVENT_SKIP_IGNORE:
3297
return Log_event::EVENT_SKIP_IGNORE;
3300
return Log_event::EVENT_SKIP_NOT; // To keep compiler happy
3304
/**************************************************************************
3305
Intvar_log_event methods
3306
**************************************************************************/
3309
Intvar_log_event::pack_info()
3312
void Intvar_log_event::pack_info(Protocol *protocol)
3314
char buf[256], *pos;
3315
pos= strmake(buf, get_var_type_name(), sizeof(buf)-23);
3317
pos= int64_t10_to_str(val, pos, -10);
3318
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3323
Intvar_log_event::Intvar_log_event()
3326
Intvar_log_event::Intvar_log_event(const char* buf,
3327
const Format_description_log_event* description_event)
3328
:Log_event(buf, description_event)
3330
buf+= description_event->common_header_len;
3331
type= buf[I_TYPE_OFFSET];
3332
val= uint8korr(buf+I_VAL_OFFSET);
3337
Intvar_log_event::get_var_type_name()
3340
const char* Intvar_log_event::get_var_type_name()
3343
case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
3344
case INSERT_ID_EVENT: return "INSERT_ID";
3345
default: /* impossible */ return "UNKNOWN";
3351
Intvar_log_event::write()
3354
bool Intvar_log_event::write(IO_CACHE* file)
3356
unsigned char buf[9];
3357
buf[I_TYPE_OFFSET]= (unsigned char) type;
3358
int8store(buf + I_VAL_OFFSET, val);
3359
return (write_header(file, sizeof(buf)) ||
3360
my_b_safe_write(file, buf, sizeof(buf)));
3365
Intvar_log_event::print()
3369
Intvar_log_event::do_apply_event()
3372
int Intvar_log_event::do_apply_event(Relay_log_info const *rli)
3375
We are now in a statement until the associated query log event has
3378
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3381
case LAST_INSERT_ID_EVENT:
3382
thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1;
3383
thd->first_successful_insert_id_in_prev_stmt= val;
3385
case INSERT_ID_EVENT:
3386
thd->force_one_auto_inc_interval(val);
3392
int Intvar_log_event::do_update_pos(Relay_log_info *rli)
3394
rli->inc_event_relay_log_pos();
3399
Log_event::enum_skip_reason
3400
Intvar_log_event::do_shall_skip(Relay_log_info *rli)
3403
It is a common error to set the slave skip counter to 1 instead of
3404
2 when recovering from an insert which used a auto increment,
3405
rand, or user var. Therefore, if the slave skip counter is 1, we
3406
just say that this event should be skipped by ignoring it, meaning
3407
that we do not change the value of the slave skip counter since it
3408
will be decreased by the following insert event.
3410
return continue_group(rli);
3414
/**************************************************************************
3415
Rand_log_event methods
3416
**************************************************************************/
3418
void Rand_log_event::pack_info(Protocol *protocol)
3420
char buf1[256], *pos;
3421
pos= my_stpcpy(buf1,"rand_seed1=");
3422
pos= int10_to_str((long) seed1, pos, 10);
3423
pos= my_stpcpy(pos, ",rand_seed2=");
3424
pos= int10_to_str((long) seed2, pos, 10);
3425
protocol->store(buf1, (uint) (pos-buf1), &my_charset_bin);
3429
Rand_log_event::Rand_log_event(const char* buf,
3430
const Format_description_log_event* description_event)
3431
:Log_event(buf, description_event)
3433
buf+= description_event->common_header_len;
3434
seed1= uint8korr(buf+RAND_SEED1_OFFSET);
3435
seed2= uint8korr(buf+RAND_SEED2_OFFSET);
3439
bool Rand_log_event::write(IO_CACHE* file)
3441
unsigned char buf[16];
3442
int8store(buf + RAND_SEED1_OFFSET, seed1);
3443
int8store(buf + RAND_SEED2_OFFSET, seed2);
3444
return (write_header(file, sizeof(buf)) ||
3445
my_b_safe_write(file, buf, sizeof(buf)));
3449
int Rand_log_event::do_apply_event(Relay_log_info const *rli)
3452
We are now in a statement until the associated query log event has
3455
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3457
thd->rand.seed1= (ulong) seed1;
3458
thd->rand.seed2= (ulong) seed2;
3462
int Rand_log_event::do_update_pos(Relay_log_info *rli)
3464
rli->inc_event_relay_log_pos();
3469
Log_event::enum_skip_reason
3470
Rand_log_event::do_shall_skip(Relay_log_info *rli)
3473
It is a common error to set the slave skip counter to 1 instead of
3474
2 when recovering from an insert which used a auto increment,
3475
rand, or user var. Therefore, if the slave skip counter is 1, we
3476
just say that this event should be skipped by ignoring it, meaning
3477
that we do not change the value of the slave skip counter since it
3478
will be decreased by the following insert event.
3480
return continue_group(rli);
3484
/**************************************************************************
3485
Xid_log_event methods
3486
**************************************************************************/
3488
void Xid_log_event::pack_info(Protocol *protocol)
3490
char buf[128], *pos;
3491
pos= my_stpcpy(buf, "COMMIT /* xid=");
3492
pos= int64_t10_to_str(xid, pos, 10);
3493
pos= my_stpcpy(pos, " */");
3494
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3499
It's ok not to use int8store here,
3500
as long as xid_t::set(uint64_t) and
3501
xid_t::get_my_xid doesn't do it either.
3502
We don't care about actual values of xids as long as
3503
identical numbers compare identically
3507
Xid_log_event(const char* buf,
3508
const Format_description_log_event *description_event)
3509
:Log_event(buf, description_event)
3511
buf+= description_event->common_header_len;
3512
memcpy(&xid, buf, sizeof(xid));
3516
bool Xid_log_event::write(IO_CACHE* file)
3518
return write_header(file, sizeof(xid)) ||
3519
my_b_safe_write(file, (unsigned char*) &xid, sizeof(xid));
3523
int Xid_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
3525
return end_trans(thd, COMMIT);
3528
Log_event::enum_skip_reason
3529
Xid_log_event::do_shall_skip(Relay_log_info *rli)
3531
if (rli->slave_skip_counter > 0) {
3532
thd->options&= ~OPTION_BEGIN;
3533
return(Log_event::EVENT_SKIP_COUNT);
3535
return(Log_event::do_shall_skip(rli));
3539
/**************************************************************************
3540
User_var_log_event methods
3541
**************************************************************************/
3543
void User_var_log_event::pack_info(Protocol* protocol)
3546
uint32_t val_offset= 4 + name_len;
3547
uint32_t event_len= val_offset;
3551
if (!(buf= (char*) my_malloc(val_offset + 5, MYF(MY_WME))))
3553
my_stpcpy(buf + val_offset, "NULL");
3554
event_len= val_offset + 4;
3561
float8get(real_val, val);
3562
if (!(buf= (char*) my_malloc(val_offset + MY_GCVT_MAX_FIELD_WIDTH + 1,
3565
event_len+= my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, MY_GCVT_MAX_FIELD_WIDTH,
3566
buf + val_offset, NULL);
3569
if (!(buf= (char*) my_malloc(val_offset + 22, MYF(MY_WME))))
3571
event_len= int64_t10_to_str(uint8korr(val), buf + val_offset,-10)-buf;
3573
case DECIMAL_RESULT:
3575
if (!(buf= (char*) my_malloc(val_offset + DECIMAL_MAX_STR_LENGTH,
3578
String str(buf+val_offset, DECIMAL_MAX_STR_LENGTH, &my_charset_bin);
3580
binary2my_decimal(E_DEC_FATAL_ERROR, (unsigned char*) (val+2), &dec, val[0],
3582
my_decimal2string(E_DEC_FATAL_ERROR, &dec, 0, 0, 0, &str);
3583
event_len= str.length() + val_offset;
3587
/* 15 is for 'COLLATE' and other chars */
3588
buf= (char*) my_malloc(event_len+val_len*2+1+2*MY_CS_NAME_SIZE+15,
3590
const CHARSET_INFO *cs;
3593
if (!(cs= get_charset(charset_number, MYF(0))))
3595
my_stpcpy(buf+val_offset, "???");
3600
char *p= strxmov(buf + val_offset, "_", cs->csname, " ", NULL);
3601
p= str_to_hex(p, val, val_len);
3602
p= strxmov(p, " COLLATE ", cs->name, NULL);
3614
memcpy(buf+2, name, name_len);
3615
buf[2+name_len]= '`';
3616
buf[3+name_len]= '=';
3617
protocol->store(buf, event_len, &my_charset_bin);
3622
User_var_log_event::
3623
User_var_log_event(const char* buf,
3624
const Format_description_log_event* description_event)
3625
:Log_event(buf, description_event)
3627
buf+= description_event->common_header_len;
3628
name_len= uint4korr(buf);
3629
name= (char *) buf + UV_NAME_LEN_SIZE;
3630
buf+= UV_NAME_LEN_SIZE + name_len;
3631
is_null= (bool) *buf;
3634
type= STRING_RESULT;
3635
charset_number= my_charset_bin.number;
3641
type= (Item_result) buf[UV_VAL_IS_NULL];
3642
charset_number= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE);
3643
val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3644
UV_CHARSET_NUMBER_SIZE);
3645
val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3646
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE);
3651
bool User_var_log_event::write(IO_CACHE* file)
3653
char buf[UV_NAME_LEN_SIZE];
3654
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3655
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
3656
unsigned char buf2[(8 > DECIMAL_MAX_FIELD_SIZE + 2) ? 8 : DECIMAL_MAX_FIELD_SIZE +2], *pos= buf2;
3657
uint32_t buf1_length;
3660
int4store(buf, name_len);
3662
if ((buf1[0]= is_null))
3665
val_len= 0; // Length of 'pos'
3670
int4store(buf1 + 2, charset_number);
3674
float8store(buf2, *(double*) val);
3677
int8store(buf2, *(int64_t*) val);
3679
case DECIMAL_RESULT:
3681
my_decimal *dec= (my_decimal *)val;
3682
dec->fix_buffer_pointer();
3683
buf2[0]= (char)(dec->intg + dec->frac);
3684
buf2[1]= (char)dec->frac;
3685
decimal2bin((decimal_t*)val, buf2+2, buf2[0], buf2[1]);
3686
val_len= decimal_bin_size(buf2[0], buf2[1]) + 2;
3690
pos= (unsigned char*) val;
3697
int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
3701
/* Length of the whole event */
3702
event_length= sizeof(buf)+ name_len + buf1_length + val_len;
3704
return (write_header(file, event_length) ||
3705
my_b_safe_write(file, (unsigned char*) buf, sizeof(buf)) ||
3706
my_b_safe_write(file, (unsigned char*) name, name_len) ||
3707
my_b_safe_write(file, (unsigned char*) buf1, buf1_length) ||
3708
my_b_safe_write(file, pos, val_len));
3714
User_var_log_event::do_apply_event()
3717
int User_var_log_event::do_apply_event(Relay_log_info const *rli)
3720
const CHARSET_INFO *charset;
3721
if (!(charset= get_charset(charset_number, MYF(MY_WME))))
3723
LEX_STRING user_var_name;
3724
user_var_name.str= name;
3725
user_var_name.length= name_len;
3730
We are now in a statement until the associated query log event has
3733
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3737
it= new Item_null();
3743
float8get(real_val, val);
3744
it= new Item_float(real_val, 0);
3745
val= (char*) &real_val; // Pointer to value in native format
3749
int_val= (int64_t) uint8korr(val);
3750
it= new Item_int(int_val);
3751
val= (char*) &int_val; // Pointer to value in native format
3754
case DECIMAL_RESULT:
3756
Item_decimal *dec= new Item_decimal((unsigned char*) val+2, val[0], val[1]);
3758
val= (char *)dec->val_decimal(NULL);
3759
val_len= sizeof(my_decimal);
3763
it= new Item_string(val, val_len, charset);
3771
Item_func_set_user_var e(user_var_name, it);
3773
Item_func_set_user_var can't substitute something else on its place =>
3774
0 can be passed as last argument (reference on item)
3776
e.fix_fields(thd, 0);
3778
A variable can just be considered as a table with
3779
a single record and with a single column. Thus, like
3780
a column value, it could always have IMPLICIT derivation.
3782
e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0);
3783
free_root(thd->mem_root,0);
3788
int User_var_log_event::do_update_pos(Relay_log_info *rli)
3790
rli->inc_event_relay_log_pos();
3794
Log_event::enum_skip_reason
3795
User_var_log_event::do_shall_skip(Relay_log_info *rli)
3798
It is a common error to set the slave skip counter to 1 instead
3799
of 2 when recovering from an insert which used a auto increment,
3800
rand, or user var. Therefore, if the slave skip counter is 1, we
3801
just say that this event should be skipped by ignoring it, meaning
3802
that we do not change the value of the slave skip counter since it
3803
will be decreased by the following insert event.
3805
return continue_group(rli);
3809
/**************************************************************************
3810
Slave_log_event methods
3811
**************************************************************************/
3813
void Slave_log_event::pack_info(Protocol *protocol)
3815
char buf[256+HOSTNAME_LENGTH], *pos;
3816
pos= my_stpcpy(buf, "host=");
3817
pos= my_stpncpy(pos, master_host, HOSTNAME_LENGTH);
3818
pos= my_stpcpy(pos, ",port=");
3819
pos= int10_to_str((long) master_port, pos, 10);
3820
pos= my_stpcpy(pos, ",log=");
3821
pos= my_stpcpy(pos, master_log);
3822
pos= my_stpcpy(pos, ",pos=");
3823
pos= int64_t10_to_str(master_pos, pos, 10);
3824
protocol->store(buf, pos-buf, &my_charset_bin);
3830
re-write this better without holding both locks at the same time
3832
Slave_log_event::Slave_log_event(THD* thd_arg,
3833
Relay_log_info* rli)
3834
:Log_event(thd_arg, 0, 0) , mem_pool(0), master_host(0)
3836
if (!rli->inited) // QQ When can this happen ?
3839
Master_info* mi = rli->mi;
3840
// TODO: re-write this better without holding both locks at the same time
3841
pthread_mutex_lock(&mi->data_lock);
3842
pthread_mutex_lock(&rli->data_lock);
3843
master_host_len = strlen(mi->host);
3844
master_log_len = strlen(rli->group_master_log_name);
3845
// on OOM, just do not initialize the structure and print the error
3846
if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
3849
master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
3850
memcpy(master_host, mi->host, master_host_len + 1);
3851
master_log = master_host + master_host_len + 1;
3852
memcpy(master_log, rli->group_master_log_name, master_log_len + 1);
3853
master_port = mi->port;
3854
master_pos = rli->group_master_log_pos;
3857
sql_print_error(_("Out of memory while recording slave event"));
3858
pthread_mutex_unlock(&rli->data_lock);
3859
pthread_mutex_unlock(&mi->data_lock);
3864
Slave_log_event::~Slave_log_event()
3870
int Slave_log_event::get_data_size()
3872
return master_host_len + master_log_len + 1 + SL_MASTER_HOST_OFFSET;
3876
bool Slave_log_event::write(IO_CACHE* file)
3878
ulong event_length= get_data_size();
3879
int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
3880
int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
3881
// log and host are already there
3883
return (write_header(file, event_length) ||
3884
my_b_safe_write(file, (unsigned char*) mem_pool, event_length));
3888
void Slave_log_event::init_from_mem_pool(int data_size)
3890
master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
3891
master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
3892
master_host = mem_pool + SL_MASTER_HOST_OFFSET;
3893
master_host_len = strlen(master_host);
3895
master_log = master_host + master_host_len + 1;
3896
if (master_log > mem_pool + data_size)
3901
master_log_len = strlen(master_log);
3905
/** This code is not used, so has not been updated to be format-tolerant. */
3906
Slave_log_event::Slave_log_event(const char* buf, uint32_t event_len)
3907
:Log_event(buf,0) /*unused event*/ ,mem_pool(0),master_host(0)
3909
if (event_len < LOG_EVENT_HEADER_LEN)
3911
event_len -= LOG_EVENT_HEADER_LEN;
3912
if (!(mem_pool = (char*) my_malloc(event_len + 1, MYF(MY_WME))))
3914
memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len);
3915
mem_pool[event_len] = 0;
3916
init_from_mem_pool(event_len);
3920
int Slave_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
3922
if (mysql_bin_log.is_open())
3923
mysql_bin_log.write(this);
3928
/**************************************************************************
3929
Stop_log_event methods
3930
**************************************************************************/
3933
The master stopped. We used to clean up all temporary tables but
3934
this is useless as, as the master has shut down properly, it has
3935
written all DROP TEMPORARY Table (prepared statements' deletion is
3936
TODO only when we binlog prep stmts). We used to clean up
3937
slave_load_tmpdir, but this is useless as it has been cleared at the
3938
end of LOAD DATA INFILE. So we have nothing to do here. The place
3939
were we must do this cleaning is in
3940
Start_log_event_v3::do_apply_event(), not here. Because if we come
3941
here, the master was sane.
3943
int Stop_log_event::do_update_pos(Relay_log_info *rli)
3946
We do not want to update master_log pos because we get a rotate event
3947
before stop, so by now group_master_log_name is set to the next log.
3948
If we updated it, we will have incorrect master coordinates and this
3949
could give false triggers in MASTER_POS_WAIT() that we have reached
3950
the target position when in fact we have not.
3952
if (thd->options & OPTION_BEGIN)
3953
rli->inc_event_relay_log_pos();
3956
rli->inc_group_relay_log_pos(0);
3957
flush_relay_log_info(rli);
3963
/**************************************************************************
3964
Create_file_log_event methods
3965
**************************************************************************/
3968
Create_file_log_event ctor
3971
Create_file_log_event::
3972
Create_file_log_event(THD* thd_arg, sql_exchange* ex,
3973
const char* db_arg, const char* table_name_arg,
3974
List<Item>& fields_arg, enum enum_duplicates handle_dup,
3976
unsigned char* block_arg, uint32_t block_len_arg, bool using_trans)
3977
:Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
3979
fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
3980
file_id(thd_arg->file_id = mysql_bin_log.next_file_id())
3982
sql_ex.force_new_format();
3988
Create_file_log_event::write_data_body()
3991
bool Create_file_log_event::write_data_body(IO_CACHE* file)
3994
if ((res= Load_log_event::write_data_body(file)) || fake_base)
3996
return (my_b_safe_write(file, (unsigned char*) "", 1) ||
3997
my_b_safe_write(file, (unsigned char*) block, block_len));
4002
Create_file_log_event::write_data_header()
4005
bool Create_file_log_event::write_data_header(IO_CACHE* file)
4008
unsigned char buf[CREATE_FILE_HEADER_LEN];
4009
if ((res= Load_log_event::write_data_header(file)) || fake_base)
4011
int4store(buf + CF_FILE_ID_OFFSET, file_id);
4012
return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
4017
Create_file_log_event::write_base()
4020
bool Create_file_log_event::write_base(IO_CACHE* file)
4023
fake_base= 1; // pretend we are Load event
4030
Create_file_log_event ctor
4033
Create_file_log_event::Create_file_log_event(const char* buf, uint32_t len,
4034
const Format_description_log_event* description_event)
4035
:Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0)
4037
uint32_t block_offset;
4038
uint32_t header_len= description_event->common_header_len;
4039
uint8_t load_header_len= description_event->post_header_len[LOAD_EVENT-1];
4040
uint8_t create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
4041
if (!(event_buf= (char*) my_memdup(buf, len, MYF(MY_WME))) ||
4042
copy_log_event(event_buf,len,
4043
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
4044
load_header_len + header_len :
4045
(fake_base ? (header_len+load_header_len) :
4046
(header_len+load_header_len) +
4047
create_file_header_len)),
4050
if (description_event->binlog_version!=1)
4052
file_id= uint4korr(buf +
4054
load_header_len + CF_FILE_ID_OFFSET);
4056
Note that it's ok to use get_data_size() below, because it is computed
4057
with values we have already read from this event (because we called
4058
copy_log_event()); we are not using slave's format info to decode
4059
master's format, we are really using master's format info.
4060
Anyway, both formats should be identical (except the common_header_len)
4061
as these Load events are not changed between 4.0 and 5.0 (as logging of
4062
LOAD DATA INFILE does not use Load_log_event in 5.0).
4064
The + 1 is for \0 terminating fname
4066
block_offset= (description_event->common_header_len +
4067
Load_log_event::get_data_size() +
4068
create_file_header_len + 1);
4069
if (len < block_offset)
4071
block = (unsigned char*)buf + block_offset;
4072
block_len = len - block_offset;
4076
sql_ex.force_new_format();
4077
inited_from_old = 1;
4084
Create_file_log_event::pack_info()
4087
void Create_file_log_event::pack_info(Protocol *protocol)
4089
char buf[NAME_LEN*2 + 30 + 21*2], *pos;
4090
pos= my_stpcpy(buf, "db=");
4091
memcpy(pos, db, db_len);
4092
pos= my_stpcpy(pos + db_len, ";table=");
4093
memcpy(pos, table_name, table_name_len);
4094
pos= my_stpcpy(pos + table_name_len, ";file_id=");
4095
pos= int10_to_str((long) file_id, pos, 10);
4096
pos= my_stpcpy(pos, ";block_len=");
4097
pos= int10_to_str((long) block_len, pos, 10);
4098
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
4103
Create_file_log_event::do_apply_event()
4106
int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
4108
char proc_info[17+FN_REFLEN+10], *fname_buf;
4114
memset(&file, 0, sizeof(file));
4115
fname_buf= my_stpcpy(proc_info, "Making temp file ");
4116
ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
4117
thd_proc_info(thd, proc_info);
4118
my_delete(fname_buf, MYF(0)); // old copy may exist already
4119
if ((fd= my_create(fname_buf, CREATE_MODE,
4120
O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
4121
MYF(MY_WME))) < 0 ||
4122
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
4123
MYF(MY_WME|MY_NABP)))
4125
rli->report(ERROR_LEVEL, my_errno,
4126
_("Error in Create_file event: could not open file '%s'"),
4131
// a trick to avoid allocating another buffer
4133
fname_len= (uint) (my_stpcpy(ext, ".data") - fname);
4134
if (write_base(&file))
4136
my_stpcpy(ext, ".info"); // to have it right in the error message
4137
rli->report(ERROR_LEVEL, my_errno,
4138
_("Error in Create_file event: could not write to file '%s'"),
4142
end_io_cache(&file);
4143
my_close(fd, MYF(0));
4145
// fname_buf now already has .data, not .info, because we did our trick
4146
my_delete(fname_buf, MYF(0)); // old copy may exist already
4147
if ((fd= my_create(fname_buf, CREATE_MODE,
4148
O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
4151
rli->report(ERROR_LEVEL, my_errno,
4152
_("Error in Create_file event: could not open file '%s'"),
4156
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
4158
rli->report(ERROR_LEVEL, my_errno,
4159
_("Error in Create_file event: write to '%s' failed"),
4163
error=0; // Everything is ok
4167
end_io_cache(&file);
4169
my_close(fd, MYF(0));
4170
thd_proc_info(thd, 0);
4175
/**************************************************************************
4176
Append_block_log_event methods
4177
**************************************************************************/
4180
Append_block_log_event ctor
4183
Append_block_log_event::Append_block_log_event(THD *thd_arg,
4185
unsigned char *block_arg,
4186
uint32_t block_len_arg,
4188
:Log_event(thd_arg,0, using_trans), block(block_arg),
4189
block_len(block_len_arg), file_id(thd_arg->file_id), db(db_arg)
4195
Append_block_log_event ctor
4198
Append_block_log_event::Append_block_log_event(const char* buf, uint32_t len,
4199
const Format_description_log_event* description_event)
4200
:Log_event(buf, description_event),block(0)
4202
uint8_t common_header_len= description_event->common_header_len;
4203
uint8_t append_block_header_len=
4204
description_event->post_header_len[APPEND_BLOCK_EVENT-1];
4205
uint32_t total_header_len= common_header_len+append_block_header_len;
4206
if (len < total_header_len)
4208
file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
4209
block= (unsigned char*)buf + total_header_len;
4210
block_len= len - total_header_len;
4216
Append_block_log_event::write()
4219
bool Append_block_log_event::write(IO_CACHE* file)
4221
unsigned char buf[APPEND_BLOCK_HEADER_LEN];
4222
int4store(buf + AB_FILE_ID_OFFSET, file_id);
4223
return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
4224
my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
4225
my_b_safe_write(file, (unsigned char*) block, block_len));
4230
Append_block_log_event::pack_info()
4233
void Append_block_log_event::pack_info(Protocol *protocol)
4237
length= (uint) sprintf(buf, ";file_id=%u;block_len=%u", file_id,
4239
protocol->store(buf, length, &my_charset_bin);
4244
Append_block_log_event::get_create_or_append()
4247
int Append_block_log_event::get_create_or_append() const
4249
return 0; /* append to the file, fail if not exists */
4253
Append_block_log_event::do_apply_event()
4256
int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
4258
char proc_info[17+FN_REFLEN+10], *fname= proc_info+17;
4262
fname= my_stpcpy(proc_info, "Making temp file ");
4263
slave_load_file_stem(fname, file_id, server_id, ".data");
4264
thd_proc_info(thd, proc_info);
4265
if (get_create_or_append())
4267
my_delete(fname, MYF(0)); // old copy may exist already
4268
if ((fd= my_create(fname, CREATE_MODE,
4269
O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
4272
rli->report(ERROR_LEVEL, my_errno,
4273
_("Error in %s event: could not create file '%s'"),
4274
get_type_str(), fname);
4278
else if ((fd = my_open(fname, O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW,
4281
rli->report(ERROR_LEVEL, my_errno,
4282
_("Error in %s event: could not open file '%s'"),
4283
get_type_str(), fname);
4286
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
4288
rli->report(ERROR_LEVEL, my_errno,
4289
_("Error in %s event: write to '%s' failed"),
4290
get_type_str(), fname);
4297
my_close(fd, MYF(0));
4298
thd_proc_info(thd, 0);
4303
/**************************************************************************
4304
Delete_file_log_event methods
4305
**************************************************************************/
4308
Delete_file_log_event ctor
4311
Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg,
4313
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
4318
Delete_file_log_event ctor
4321
Delete_file_log_event::Delete_file_log_event(const char* buf, uint32_t len,
4322
const Format_description_log_event* description_event)
4323
:Log_event(buf, description_event),file_id(0)
4325
uint8_t common_header_len= description_event->common_header_len;
4326
uint8_t delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1];
4327
if (len < (uint)(common_header_len + delete_file_header_len))
4329
file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET);
4334
Delete_file_log_event::write()
4337
bool Delete_file_log_event::write(IO_CACHE* file)
4339
unsigned char buf[DELETE_FILE_HEADER_LEN];
4340
int4store(buf + DF_FILE_ID_OFFSET, file_id);
4341
return (write_header(file, sizeof(buf)) ||
4342
my_b_safe_write(file, buf, sizeof(buf)));
4347
Delete_file_log_event::pack_info()
4350
void Delete_file_log_event::pack_info(Protocol *protocol)
4354
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
4355
protocol->store(buf, (int32_t) length, &my_charset_bin);
4359
Delete_file_log_event::do_apply_event()
4362
int Delete_file_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
4364
char fname[FN_REFLEN+10];
4365
char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
4366
(void) my_delete(fname, MYF(MY_WME));
4367
my_stpcpy(ext, ".info");
4368
(void) my_delete(fname, MYF(MY_WME));
4373
/**************************************************************************
4374
Execute_load_log_event methods
4375
**************************************************************************/
4378
Execute_load_log_event ctor
4381
Execute_load_log_event::Execute_load_log_event(THD *thd_arg,
4384
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
4390
Execute_load_log_event ctor
4393
Execute_load_log_event::Execute_load_log_event(const char* buf, uint32_t len,
4394
const Format_description_log_event* description_event)
4395
:Log_event(buf, description_event), file_id(0)
4397
uint8_t common_header_len= description_event->common_header_len;
4398
uint8_t exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1];
4399
if (len < (uint)(common_header_len+exec_load_header_len))
4401
file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET);
4406
Execute_load_log_event::write()
4409
bool Execute_load_log_event::write(IO_CACHE* file)
4411
unsigned char buf[EXEC_LOAD_HEADER_LEN];
4412
int4store(buf + EL_FILE_ID_OFFSET, file_id);
4413
return (write_header(file, sizeof(buf)) ||
4414
my_b_safe_write(file, buf, sizeof(buf)));
4419
Execute_load_log_event::pack_info()
4422
void Execute_load_log_event::pack_info(Protocol *protocol)
4426
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
4427
protocol->store(buf, (int32_t) length, &my_charset_bin);
4432
Execute_load_log_event::do_apply_event()
4435
int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
4437
char fname[FN_REFLEN+10];
4442
Load_log_event *lev= 0;
4444
ext= slave_load_file_stem(fname, file_id, server_id, ".info");
4445
if ((fd = my_open(fname, O_RDONLY | O_BINARY | O_NOFOLLOW,
4446
MYF(MY_WME))) < 0 ||
4447
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
4448
MYF(MY_WME|MY_NABP)))
4450
rli->report(ERROR_LEVEL, my_errno,
4451
_("Error in Exec_load event: could not open file '%s'"),
4455
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
4456
(pthread_mutex_t*)0,
4457
rli->relay_log.description_event_for_exec)) ||
4458
lev->get_type_code() != NEW_LOAD_EVENT)
4460
rli->report(ERROR_LEVEL, 0,
4461
_("Error in Exec_load event: "
4462
"file '%s' appears corrupted"),
4469
lev->do_apply_event should use rli only for errors i.e. should
4470
not advance rli's position.
4472
lev->do_apply_event is the place where the table is loaded (it
4473
calls mysql_load()).
4476
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
4477
if (lev->do_apply_event(0,rli,1))
4480
We want to indicate the name of the file that could not be loaded
4482
But as we are here we are sure the error is in rli->last_slave_error and
4483
rli->last_slave_errno (example of error: duplicate entry for key), so we
4484
don't want to overwrite it with the filename.
4485
What we want instead is add the filename to the current error message.
4487
char *tmp= my_strdup(rli->last_error().message, MYF(MY_WME));
4490
rli->report(ERROR_LEVEL, rli->last_error().number,
4491
_("%s. Failed executing load from '%s'"),
4498
We have an open file descriptor to the .info file; we need to close it
4499
or Windows will refuse to delete the file in my_delete().
4503
my_close(fd, MYF(0));
4504
end_io_cache(&file);
4507
(void) my_delete(fname, MYF(MY_WME));
4508
memcpy(ext, ".data", 6);
4509
(void) my_delete(fname, MYF(MY_WME));
4516
my_close(fd, MYF(0));
4517
end_io_cache(&file);
4523
/**************************************************************************
4524
Begin_load_query_log_event methods
4525
**************************************************************************/
4527
Begin_load_query_log_event::
4528
Begin_load_query_log_event(THD* thd_arg, const char* db_arg, unsigned char* block_arg,
4529
uint32_t block_len_arg, bool using_trans)
4530
:Append_block_log_event(thd_arg, db_arg, block_arg, block_len_arg,
4533
file_id= thd_arg->file_id= mysql_bin_log.next_file_id();
4537
Begin_load_query_log_event::
4538
Begin_load_query_log_event(const char* buf, uint32_t len,
4539
const Format_description_log_event* desc_event)
4540
:Append_block_log_event(buf, len, desc_event)
4545
int Begin_load_query_log_event::get_create_or_append() const
4547
return 1; /* create the file */
4551
Log_event::enum_skip_reason
4552
Begin_load_query_log_event::do_shall_skip(Relay_log_info *rli)
4555
If the slave skip counter is 1, then we should not start executing
4558
return continue_group(rli);
4562
/**************************************************************************
4563
Execute_load_query_log_event methods
4564
**************************************************************************/
4567
Execute_load_query_log_event::
4568
Execute_load_query_log_event(THD *thd_arg, const char* query_arg,
4569
ulong query_length_arg, uint32_t fn_pos_start_arg,
4570
uint32_t fn_pos_end_arg,
4571
enum_load_dup_handling dup_handling_arg,
4572
bool using_trans, bool suppress_use,
4573
THD::killed_state killed_err_arg):
4574
Query_log_event(thd_arg, query_arg, query_length_arg, using_trans,
4575
suppress_use, killed_err_arg),
4576
file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg),
4577
fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
4582
Execute_load_query_log_event::
4583
Execute_load_query_log_event(const char* buf, uint32_t event_len,
4584
const Format_description_log_event* desc_event):
4585
Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT),
4586
file_id(0), fn_pos_start(0), fn_pos_end(0)
4588
if (!Query_log_event::is_valid())
4591
buf+= desc_event->common_header_len;
4593
fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET);
4594
fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET);
4595
dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET));
4597
if (fn_pos_start > q_len || fn_pos_end > q_len ||
4598
dup_handling > LOAD_DUP_REPLACE)
4601
file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET);
4605
ulong Execute_load_query_log_event::get_post_header_size_for_derived()
4607
return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN;
4612
Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
4614
unsigned char buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
4615
int4store(buf, file_id);
4616
int4store(buf + 4, fn_pos_start);
4617
int4store(buf + 4 + 4, fn_pos_end);
4618
*(buf + 4 + 4 + 4)= (unsigned char) dup_handling;
4619
return my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
4623
void Execute_load_query_log_event::pack_info(Protocol *protocol)
4626
if (!(buf= (char*) my_malloc(9 + db_len + q_len + 10 + 21, MYF(MY_WME))))
4631
pos= my_stpcpy(buf, "use `");
4632
memcpy(pos, db, db_len);
4633
pos= my_stpcpy(pos+db_len, "`; ");
4637
memcpy(pos, query, q_len);
4640
pos= my_stpcpy(pos, " ;file_id=");
4641
pos= int10_to_str((long) file_id, pos, 10);
4642
protocol->store(buf, pos-buf, &my_charset_bin);
4648
Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli)
4656
buf= (char*) my_malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
4657
(FN_REFLEN + 10) + 10 + 8 + 5, MYF(MY_WME));
4659
/* Replace filename and LOCAL keyword in query before executing it */
4662
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
4663
ER(ER_SLAVE_FATAL_ERROR),
4664
_("Not enough memory"));
4669
memcpy(p, query, fn_pos_start);
4671
fname= (p= strmake(p, STRING_WITH_LEN(" INFILE \'")));
4672
p= slave_load_file_stem(p, file_id, server_id, ".data");
4673
fname_end= p= strchr(p, '\0'); // Safer than p=p+5
4675
switch (dup_handling) {
4676
case LOAD_DUP_IGNORE:
4677
p= strmake(p, STRING_WITH_LEN(" IGNORE"));
4679
case LOAD_DUP_REPLACE:
4680
p= strmake(p, STRING_WITH_LEN(" REPLACE"));
4683
/* Ordinary load data */
4686
p= strmake(p, STRING_WITH_LEN(" INTO"));
4687
p= strmake(p, query+fn_pos_end, q_len-fn_pos_end);
4689
error= Query_log_event::do_apply_event(rli, buf, p-buf);
4691
/* Forging file name for deletion in same buffer */
4695
If there was an error the slave is going to stop, leave the
4696
file so that we can re-execute this event at START SLAVE.
4699
(void) my_delete(fname, MYF(MY_WME));
4706
/**************************************************************************
4708
**************************************************************************/
4711
sql_ex_info::write_data()
4714
bool sql_ex_info::write_data(IO_CACHE* file)
4718
return (write_str(file, field_term, (uint) field_term_len) ||
4719
write_str(file, enclosed, (uint) enclosed_len) ||
4720
write_str(file, line_term, (uint) line_term_len) ||
4721
write_str(file, line_start, (uint) line_start_len) ||
4722
write_str(file, escaped, (uint) escaped_len) ||
4723
my_b_safe_write(file,(unsigned char*) &opt_flags,1));
4728
@todo This is sensitive to field padding. We should write a
4729
char[7], not an old_sql_ex. /sven
4732
old_ex.field_term= *field_term;
4733
old_ex.enclosed= *enclosed;
4734
old_ex.line_term= *line_term;
4735
old_ex.line_start= *line_start;
4736
old_ex.escaped= *escaped;
4737
old_ex.opt_flags= opt_flags;
4738
old_ex.empty_flags=empty_flags;
4739
return my_b_safe_write(file, (unsigned char*) &old_ex, sizeof(old_ex)) != 0;
4748
const char *sql_ex_info::init(const char *buf, const char *buf_end,
4749
bool use_new_format)
4751
cached_new_format = use_new_format;
4756
The code below assumes that buf will not disappear from
4757
under our feet during the lifetime of the event. This assumption
4758
holds true in the slave thread if the log is in new format, but is not
4759
the case when we have old format because we will be reusing net buffer
4760
to read the actual file before we write out the Create_file event.
4762
if (read_str(&buf, buf_end, &field_term, &field_term_len) ||
4763
read_str(&buf, buf_end, &enclosed, &enclosed_len) ||
4764
read_str(&buf, buf_end, &line_term, &line_term_len) ||
4765
read_str(&buf, buf_end, &line_start, &line_start_len) ||
4766
read_str(&buf, buf_end, &escaped, &escaped_len))
4772
field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
4773
field_term = buf++; // Use first byte in string
4779
empty_flags= *buf++;
4780
if (empty_flags & FIELD_TERM_EMPTY)
4782
if (empty_flags & ENCLOSED_EMPTY)
4784
if (empty_flags & LINE_TERM_EMPTY)
4786
if (empty_flags & LINE_START_EMPTY)
4788
if (empty_flags & ESCAPED_EMPTY)
4795
/**************************************************************************
4796
Rows_log_event member functions
4797
**************************************************************************/
4799
Rows_log_event::Rows_log_event(THD *thd_arg, Table *tbl_arg, ulong tid,
4800
MY_BITMAP const *cols, bool is_transactional)
4801
: Log_event(thd_arg, 0, is_transactional),
4805
m_width(tbl_arg ? tbl_arg->s->fields : 1),
4806
m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
4807
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4810
We allow a special form of dummy event when the table, and cols
4811
are null and the table id is UINT32_MAX. This is a temporary
4812
solution, to be able to terminate a started statement in the
4813
binary log: the extraneous events will be removed in the future.
4815
assert((tbl_arg && tbl_arg->s && tid != UINT32_MAX) || (!tbl_arg && !cols && tid == UINT32_MAX));
4817
if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
4818
set_flags(NO_FOREIGN_KEY_CHECKS_F);
4819
if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
4820
set_flags(RELAXED_UNIQUE_CHECKS_F);
4821
/* if bitmap_init fails, caught in is_valid() */
4822
if (likely(!bitmap_init(&m_cols,
4823
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4827
/* Cols can be zero if this is a dummy binrows event */
4828
if (likely(cols != NULL))
4830
memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
4831
create_last_word_mask(&m_cols);
4836
// Needed because bitmap_init() does not set it to null on failure
4842
Rows_log_event::Rows_log_event(const char *buf, uint32_t event_len,
4843
Log_event_type event_type,
4844
const Format_description_log_event
4846
: Log_event(buf, description_event),
4849
m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
4850
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4852
uint8_t const common_header_len= description_event->common_header_len;
4853
uint8_t const post_header_len= description_event->post_header_len[event_type-1];
4855
const char *post_start= buf + common_header_len;
4856
post_start+= RW_MAPID_OFFSET;
4857
if (post_header_len == 6)
4859
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
4860
m_table_id= uint4korr(post_start);
4865
m_table_id= (ulong) uint6korr(post_start);
4866
post_start+= RW_FLAGS_OFFSET;
4869
m_flags= uint2korr(post_start);
4871
unsigned char const *const var_start=
4872
(const unsigned char *)buf + common_header_len + post_header_len;
4873
unsigned char const *const ptr_width= var_start;
4874
unsigned char *ptr_after_width= (unsigned char*) ptr_width;
4875
m_width = net_field_length(&ptr_after_width);
4876
/* if bitmap_init fails, catched in is_valid() */
4877
if (likely(!bitmap_init(&m_cols,
4878
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4882
memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
4883
create_last_word_mask(&m_cols);
4884
ptr_after_width+= (m_width + 7) / 8;
4888
// Needed because bitmap_init() does not set it to null on failure
4889
m_cols.bitmap= NULL;
4893
m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
4895
if (event_type == UPDATE_ROWS_EVENT)
4897
/* if bitmap_init fails, caught in is_valid() */
4898
if (likely(!bitmap_init(&m_cols_ai,
4899
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
4903
memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
4904
create_last_word_mask(&m_cols_ai);
4905
ptr_after_width+= (m_width + 7) / 8;
4909
// Needed because bitmap_init() does not set it to null on failure
4910
m_cols_ai.bitmap= 0;
4915
const unsigned char* const ptr_rows_data= (const unsigned char*) ptr_after_width;
4917
size_t const data_size= event_len - (ptr_rows_data - (const unsigned char *) buf);
4919
m_rows_buf= (unsigned char*) my_malloc(data_size, MYF(MY_WME));
4920
if (likely((bool)m_rows_buf))
4922
m_curr_row= m_rows_buf;
4923
m_rows_end= m_rows_buf + data_size;
4924
m_rows_cur= m_rows_end;
4925
memcpy(m_rows_buf, ptr_rows_data, data_size);
4928
m_cols.bitmap= 0; // to not free it
4933
Rows_log_event::~Rows_log_event()
4935
if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
4936
m_cols.bitmap= 0; // so no free in bitmap_free
4937
bitmap_free(&m_cols); // To pair with bitmap_init().
4938
free((unsigned char*)m_rows_buf);
4941
int Rows_log_event::get_data_size()
4943
int const type_code= get_type_code();
4945
unsigned char buf[sizeof(m_width)+1];
4946
unsigned char *end= net_store_length(buf, (m_width + 7) / 8);
4948
int data_size= ROWS_HEADER_LEN;
4949
data_size+= no_bytes_in_map(&m_cols);
4950
data_size+= end - buf;
4952
if (type_code == UPDATE_ROWS_EVENT)
4953
data_size+= no_bytes_in_map(&m_cols_ai);
4955
data_size+= (m_rows_cur - m_rows_buf);
4960
int Rows_log_event::do_add_row_data(unsigned char *row_data, size_t length)
4963
When the table has a primary key, we would probably want, by default, to
4964
log only the primary key value instead of the entire "before image". This
4965
would save binlog space. TODO
4969
If length is zero, there is nothing to write, so we just
4970
return. Note that this is not an optimization, since calling
4971
realloc() with size 0 means free().
4979
assert(m_rows_buf <= m_rows_cur);
4980
assert(!m_rows_buf || (m_rows_end && m_rows_buf <= m_rows_end));
4981
assert(m_rows_cur <= m_rows_end);
4983
/* The cast will always work since m_rows_cur <= m_rows_end */
4984
if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
4986
size_t const block_size= 1024;
4987
my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
4988
my_ptrdiff_t const new_alloc=
4989
block_size * ((cur_size + length + block_size - 1) / block_size);
4991
unsigned char* const new_buf= (unsigned char*)my_realloc((unsigned char*)m_rows_buf, (uint) new_alloc,
4992
MYF(MY_ALLOW_ZERO_PTR|MY_WME));
4993
if (unlikely(!new_buf))
4994
return(HA_ERR_OUT_OF_MEM);
4996
/* If the memory moved, we need to move the pointers */
4997
if (new_buf != m_rows_buf)
4999
m_rows_buf= new_buf;
5000
m_rows_cur= m_rows_buf + cur_size;
5004
The end pointer should always be changed to point to the end of
5005
the allocated memory.
5007
m_rows_end= m_rows_buf + new_alloc;
5010
assert(m_rows_cur + length <= m_rows_end);
5011
memcpy(m_rows_cur, row_data, length);
5012
m_rows_cur+= length;
5017
int Rows_log_event::do_apply_event(Relay_log_info const *rli)
5021
If m_table_id == UINT32_MAX, then we have a dummy event that does not
5022
contain any data. In that case, we just remove all tables in the
5023
tables_to_lock list, close the thread tables, and return with
5026
if (m_table_id == UINT32_MAX)
5029
This one is supposed to be set: just an extra check so that
5030
nothing strange has happened.
5032
assert(get_flags(STMT_END_F));
5034
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5035
close_thread_tables(thd);
5041
'thd' has been set by exec_relay_log_event(), just before calling
5042
do_apply_event(). We still check here to prevent future coding
5045
assert(rli->sql_thd == thd);
5048
If there is no locks taken, this is the first binrow event seen
5049
after the table map events. We should then lock all the tables
5050
used in the transaction and proceed with execution of the actual
5055
bool need_reopen= 1; /* To execute the first lap of the loop below */
5058
lock_tables() reads the contents of thd->lex, so they must be
5059
initialized. Contrary to in
5060
Table_map_log_event::do_apply_event() we don't call
5061
mysql_init_query() as that may reset the binlog format.
5066
There are a few flags that are replicated with each row event.
5067
Make sure to set/clear them before executing the main body of
5070
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
5071
thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
5073
thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
5075
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
5076
thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
5078
thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
5079
/* A small test to verify that objects have consistent types */
5080
assert(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
5083
while ((error= lock_tables(thd, rli->tables_to_lock,
5084
rli->tables_to_lock_count, &need_reopen)))
5088
if (thd->is_slave_error || thd->is_fatal_error)
5091
Error reporting borrowed from Query_log_event with many excessive
5092
simplifications (we don't honour --slave-skip-errors)
5094
uint32_t actual_error= thd->main_da.sql_errno();
5095
rli->report(ERROR_LEVEL, actual_error,
5096
_("Error '%s' in %s event: when locking tables"),
5098
? thd->main_da.message()
5099
: _("unexpected success or fatal error")),
5101
thd->is_fatal_error= 1;
5105
rli->report(ERROR_LEVEL, error,
5106
_("Error in %s event: when locking tables"),
5109
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5114
So we need to reopen the tables.
5116
We need to flush the pending RBR event, since it keeps a
5117
pointer to an open table.
5119
ALTERNATIVE SOLUTION (not implemented): Extract a pointer to
5120
the pending RBR event and reset the table pointer after the
5121
tables has been reopened.
5123
NOTE: For this new scheme there should be no pending event:
5124
need to add code to assert that is the case.
5126
thd->binlog_flush_pending_rows_event(false);
5127
TableList *tables= rli->tables_to_lock;
5128
close_tables_for_reopen(thd, &tables);
5130
uint32_t tables_count= rli->tables_to_lock_count;
5131
if ((error= open_tables(thd, &tables, &tables_count, 0)))
5133
if (thd->is_slave_error || thd->is_fatal_error)
5136
Error reporting borrowed from Query_log_event with many excessive
5137
simplifications (we don't honour --slave-skip-errors)
5139
uint32_t actual_error= thd->main_da.sql_errno();
5140
rli->report(ERROR_LEVEL, actual_error,
5141
_("Error '%s' on reopening tables"),
5143
? thd->main_da.message()
5144
: _("unexpected success or fatal error")));
5145
thd->is_slave_error= 1;
5147
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5153
When the open and locking succeeded, we check all tables to
5154
ensure that they still have the correct type.
5156
We can use a down cast here since we know that every table added
5157
to the tables_to_lock is a RPL_TableList.
5161
RPL_TableList *ptr= rli->tables_to_lock;
5162
for ( ; ptr ; ptr= static_cast<RPL_TableList*>(ptr->next_global))
5164
if (ptr->m_tabledef.compatible_with(rli, ptr->table))
5166
mysql_unlock_tables(thd, thd->lock);
5168
thd->is_slave_error= 1;
5169
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5170
return(ERR_BAD_TABLE_DEF);
5176
... and then we add all the tables to the table map and remove
5177
them from tables to lock.
5179
We also invalidate the query cache for all the tables, since
5180
they will now be changed.
5182
TODO [/Matz]: Maybe the query cache should not be invalidated
5183
here? It might be that a table is not changed, even though it
5184
was locked for the statement. We do know that each
5185
Rows_log_event contain at least one row, so after processing one
5186
Rows_log_event, we can invalidate the query cache for the
5189
for (TableList *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
5191
const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
5197
m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
5202
table == NULL means that this table should not be replicated
5203
(this was set up by Table_map_log_event::do_apply_event()
5204
which tested replicate-* rules).
5208
It's not needed to set_time() but
5209
1) it continues the property that "Time" in SHOW PROCESSLIST shows how
5210
much slave is behind
5211
2) it will be needed when we allow replication from a table with no
5212
TIMESTAMP column to a table with one.
5213
So we call set_time(), like in SBR. Presently it changes nothing.
5215
thd->set_time((time_t)when);
5217
There are a few flags that are replicated with each row event.
5218
Make sure to set/clear them before executing the main body of
5221
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
5222
thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
5224
thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
5226
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
5227
thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
5229
thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
5231
if (slave_allow_batching)
5232
thd->options|= OPTION_ALLOW_BATCH;
5234
thd->options&= ~OPTION_ALLOW_BATCH;
5236
/* A small test to verify that objects have consistent types */
5237
assert(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
5240
Now we are in a statement and will stay in a statement until we
5243
We set this flag here, before actually applying any rows, in
5244
case the SQL thread is stopped and we need to detect that we're
5245
inside a statement and halting abruptly might cause problems
5248
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
5250
if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
5251
set_flags(COMPLETE_ROWS_F);
5254
Set tables write and read sets.
5256
Read_set contains all slave columns (in case we are going to fetch
5257
a complete record from slave)
5259
Write_set equals the m_cols bitmap sent from master but it can be
5260
longer if slave has extra columns.
5263
bitmap_set_all(table->read_set);
5264
bitmap_set_all(table->write_set);
5265
if (!get_flags(COMPLETE_ROWS_F))
5266
bitmap_intersect(table->write_set,&m_cols);
5268
this->slave_exec_mode= slave_exec_mode_options; // fix the mode
5270
// Do event specific preparations
5271
error= do_before_row_operations(rli);
5273
// row processing loop
5275
while (error == 0 && m_curr_row < m_rows_end)
5277
/* in_use can have been set to NULL in close_tables_for_reopen */
5278
THD* old_thd= table->in_use;
5282
error= do_exec_row(rli);
5284
table->in_use = old_thd;
5290
The following list of "idempotent" errors
5291
means that an error from the list might happen
5292
because of idempotent (more than once)
5293
applying of a binlog file.
5294
Notice, that binlog has a ddl operation its
5295
second applying may cause
5297
case HA_ERR_TABLE_DEF_CHANGED:
5298
case HA_ERR_CANNOT_ADD_FOREIGN:
5300
which are not included into to the list.
5302
case HA_ERR_RECORD_CHANGED:
5303
case HA_ERR_RECORD_DELETED:
5304
case HA_ERR_KEY_NOT_FOUND:
5305
case HA_ERR_END_OF_FILE:
5306
case HA_ERR_FOUND_DUPP_KEY:
5307
case HA_ERR_FOUND_DUPP_UNIQUE:
5308
case HA_ERR_FOREIGN_DUPLICATE_KEY:
5309
case HA_ERR_NO_REFERENCED_ROW:
5310
case HA_ERR_ROW_IS_REFERENCED:
5311
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5313
if (global_system_variables.log_warnings)
5314
slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
5316
RPL_LOG_NAME, (ulong) log_pos);
5322
thd->is_slave_error= 1;
5327
If m_curr_row_end was not set during event execution (e.g., because
5328
of errors) we can't proceed to the next row. If the error is transient
5329
(i.e., error==0 at this point) we must call unpack_current_row() to set
5332
if (!m_curr_row_end && !error)
5333
unpack_current_row(rli, &m_cols);
5335
// at this moment m_curr_row_end should be set
5336
assert(error || m_curr_row_end != NULL);
5337
assert(error || m_curr_row < m_curr_row_end);
5338
assert(error || m_curr_row_end <= m_rows_end);
5340
m_curr_row= m_curr_row_end;
5342
} // row processing loop
5344
error= do_after_row_operations(rli, error);
5347
thd->options|= OPTION_KEEP_LOG;
5352
We need to delay this clear until here bacause unpack_current_row() uses
5353
master-side table definitions stored in rli.
5355
if (rli->tables_to_lock && get_flags(STMT_END_F))
5356
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5357
/* reset OPTION_ALLOW_BATCH as not affect later events */
5358
thd->options&= ~OPTION_ALLOW_BATCH;
5361
{ /* error has occured during the transaction */
5362
slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
5363
get_type_str(), RPL_LOG_NAME, (ulong) log_pos);
5368
If one day we honour --skip-slave-errors in row-based replication, and
5369
the error should be skipped, then we would clear mappings, rollback,
5370
close tables, but the slave SQL thread would not stop and then may
5371
assume the mapping is still available, the tables are still open...
5372
So then we should clear mappings/rollback/close here only if this is a
5374
For now we code, knowing that error is not skippable and so slave SQL
5375
thread is certainly going to stop.
5376
rollback at the caller along with sbr.
5378
thd->reset_current_stmt_binlog_row_based();
5379
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
5380
thd->is_slave_error= 1;
5385
This code would ideally be placed in do_update_pos() instead, but
5386
since we have no access to table there, we do the setting of
5387
last_event_start_time here instead.
5389
if (table && (table->s->primary_key == MAX_KEY) &&
5390
!cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS)
5393
------------ Temporary fix until WL#2975 is implemented ---------
5395
This event is not the last one (no STMT_END_F). If we stop now
5396
(in case of terminate_slave_thread()), how will we restart? We
5397
have to restart from Table_map_log_event, but as this table is
5398
not transactional, the rows already inserted will still be
5399
present, and idempotency is not guaranteed (no PK) so we risk
5400
that repeating leads to double insert. So we desperately try to
5401
continue, hope we'll eventually leave this buggy situation (by
5402
executing the final Rows_log_event). If we are in a hopeless
5403
wait (reached end of last relay log and nothing gets appended
5404
there), we timeout after one minute, and notify DBA about the
5405
problem. When WL#2975 is implemented, just remove the member
5406
Relay_log_info::last_event_start_time and all its occurrences.
5408
const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
5414
Log_event::enum_skip_reason
5415
Rows_log_event::do_shall_skip(Relay_log_info *rli)
5418
If the slave skip counter is 1 and this event does not end a
5419
statement, then we should not start executing on the next event.
5420
Otherwise, we defer the decision to the normal skipping logic.
5422
if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
5423
return Log_event::EVENT_SKIP_IGNORE;
5425
return Log_event::do_shall_skip(rli);
5429
Rows_log_event::do_update_pos(Relay_log_info *rli)
5433
if (get_flags(STMT_END_F))
5436
This is the end of a statement or transaction, so close (and
5437
unlock) the tables we opened when processing the
5438
Table_map_log_event starting the statement.
5440
OBSERVER. This will clear *all* mappings, not only those that
5441
are open for the table. There is not good handle for on-close
5444
NOTE. Even if we have no table ('table' == 0) we still need to be
5445
here, so that we increase the group relay log position. If we didn't, we
5446
could have a group relay log position which lags behind "forever"
5447
(assume the last master's transaction is ignored by the slave because of
5448
replicate-ignore rules).
5450
thd->binlog_flush_pending_rows_event(true);
5453
If this event is not in a transaction, the call below will, if some
5454
transactional storage engines are involved, commit the statement into
5455
them and flush the pending event to binlog.
5456
If this event is in a transaction, the call will do nothing, but a
5457
Xid_log_event will come next which will, if some transactional engines
5458
are involved, commit the transaction and flush the pending event to the
5461
error= ha_autocommit_or_rollback(thd, 0);
5464
Now what if this is not a transactional engine? we still need to
5465
flush the pending event to the binlog; we did it with
5466
thd->binlog_flush_pending_rows_event(). Note that we imitate
5467
what is done for real queries: a call to
5468
ha_autocommit_or_rollback() (sometimes only if involves a
5469
transactional engine), and a call to be sure to have the pending
5473
thd->reset_current_stmt_binlog_row_based();
5475
rli->cleanup_context(thd, 0);
5479
Indicate that a statement is finished.
5480
Step the group log position if we are not in a transaction,
5481
otherwise increase the event log position.
5483
rli->stmt_done(log_pos, when);
5486
Clear any errors pushed in thd->net.last_err* if for example "no key
5487
found" (as this is allowed). This is a safety measure; apparently
5488
those errors (e.g. when executing a Delete_rows_log_event of a
5489
non-existing row, like in rpl_row_mystery22.test,
5490
thd->net.last_error = "Can't find record in 't1'" and last_errno=1032)
5491
do not become visible. We still prefer to wipe them out.
5496
rli->report(ERROR_LEVEL, error,
5497
_("Error in %s event: commit of row events failed, "
5499
get_type_str(), m_table->s->db.str,
5500
m_table->s->table_name.str);
5504
rli->inc_event_relay_log_pos();
5510
bool Rows_log_event::write_data_header(IO_CACHE *file)
5512
unsigned char buf[ROWS_HEADER_LEN]; // No need to init the buffer
5513
assert(m_table_id != UINT32_MAX);
5514
int6store(buf + RW_MAPID_OFFSET, (uint64_t)m_table_id);
5515
int2store(buf + RW_FLAGS_OFFSET, m_flags);
5516
return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
5519
bool Rows_log_event::write_data_body(IO_CACHE*file)
5522
Note that this should be the number of *bits*, not the number of
5525
unsigned char sbuf[sizeof(m_width)];
5526
my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
5528
unsigned char *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
5529
assert(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
5531
res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
5533
res= res || my_b_safe_write(file, (unsigned char*) m_cols.bitmap,
5534
no_bytes_in_map(&m_cols));
5536
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
5538
if (get_type_code() == UPDATE_ROWS_EVENT)
5540
res= res || my_b_safe_write(file, (unsigned char*) m_cols_ai.bitmap,
5541
no_bytes_in_map(&m_cols_ai));
5543
res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
5550
void Rows_log_event::pack_info(Protocol *protocol)
5553
char const *const flagstr=
5554
get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
5555
size_t bytes= snprintf(buf, sizeof(buf),
5556
"table_id: %lu%s", m_table_id, flagstr);
5557
protocol->store(buf, bytes, &my_charset_bin);
5561
/**************************************************************************
5562
Table_map_log_event member functions and support functions
5563
**************************************************************************/
5566
@page How replication of field metadata works.
5568
When a table map is created, the master first calls
5569
Table_map_log_event::save_field_metadata() which calculates how many
5570
values will be in the field metadata. Only those fields that require the
5571
extra data are added. The method also loops through all of the fields in
5572
the table calling the method Field::save_field_metadata() which returns the
5573
values for the field that will be saved in the metadata and replicated to
5574
the slave. Once all fields have been processed, the table map is written to
5575
the binlog adding the size of the field metadata and the field metadata to
5576
the end of the body of the table map.
5578
When a table map is read on the slave, the field metadata is read from the
5579
table map and passed to the table_def class constructor which saves the
5580
field metadata from the table map into an array based on the type of the
5581
field. Field metadata values not present (those fields that do not use extra
5582
data) in the table map are initialized as zero (0). The array size is the
5583
same as the columns for the table on the slave.
5585
Additionally, values saved for field metadata on the master are saved as a
5586
string of bytes (unsigned char) in the binlog. A field may require 1 or more bytes
5587
to store the information. In cases where values require multiple bytes
5588
(e.g. values > 255), the endian-safe methods are used to properly encode
5589
the values on the master and decode them on the slave. When the field
5590
metadata values are captured on the slave, they are stored in an array of
5591
type uint16_t. This allows the least number of casts to prevent casting bugs
5592
when the field metadata is used in comparisons of field attributes. When
5593
the field metadata is used for calculating addresses in pointer math, the
5594
type used is uint32_t.
5598
Save the field metadata based on the real_type of the field.
5599
The metadata saved depends on the type of the field. Some fields
5600
store a single byte for pack_length() while others store two bytes
5601
for field_length (max length).
5606
We may want to consider changing the encoding of the information.
5607
Currently, the code attempts to minimize the number of bytes written to
5608
the tablemap. There are at least two other alternatives; 1) using
5609
net_store_length() to store the data allowing it to choose the number of
5610
bytes that are appropriate thereby making the code much easier to
5611
maintain (only 1 place to change the encoding), or 2) use a fixed number
5612
of bytes for each field. The problem with option 1 is that net_store_length()
5613
will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
5614
for fields like CHAR which can be no larger than 255 characters, the method
5615
will use 3 bytes when the value is > 250. Further, every value that is
5616
encoded using 2 parts (e.g., pack_length, field_length) will be numerically
5617
> 250 therefore will use 3 bytes for eah value. The problem with option 2
5618
is less wasteful for space but does waste 1 byte for every field that does
5621
int Table_map_log_event::save_field_metadata()
5624
for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
5625
index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
5630
Constructor used to build an event for writing to the binary log.
5631
Mats says tbl->s lives longer than this event so it's ok to copy pointers
5632
(tbl->s->db etc) and not pointer content.
5634
Table_map_log_event::Table_map_log_event(THD *thd, Table *tbl, ulong tid,
5635
bool is_transactional __attribute__((unused)),
5637
: Log_event(thd, 0, true),
5639
m_dbnam(tbl->s->db.str),
5640
m_dblen(m_dbnam ? tbl->s->db.length : 0),
5641
m_tblnam(tbl->s->table_name.str),
5642
m_tbllen(tbl->s->table_name.length),
5643
m_colcnt(tbl->s->fields),
5648
m_field_metadata(0),
5649
m_field_metadata_size(0),
5653
assert(m_table_id != UINT32_MAX);
5655
In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
5656
table.cc / alloc_table_share():
5657
Use the fact the key is db/0/table_name/0
5658
As we rely on this let's assert it.
5660
assert((tbl->s->db.str == 0) ||
5661
(tbl->s->db.str[tbl->s->db.length] == 0));
5662
assert(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
5665
m_data_size= TABLE_MAP_HEADER_LEN;
5666
m_data_size+= m_dblen + 2; // Include length and terminating \0
5667
m_data_size+= m_tbllen + 2; // Include length and terminating \0
5668
m_data_size+= 1 + m_colcnt; // COLCNT and column types
5670
/* If malloc fails, caught in is_valid() */
5671
if ((m_memory= (unsigned char*) my_malloc(m_colcnt, MYF(MY_WME))))
5673
m_coltype= reinterpret_cast<unsigned char*>(m_memory);
5674
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
5675
m_coltype[i]= m_table->field[i]->type();
5679
Calculate a bitmap for the results of maybe_null() for all columns.
5680
The bitmap is used to determine when there is a column from the master
5681
that is not on the slave and is null and thus not in the row data during
5684
uint32_t num_null_bytes= (m_table->s->fields + 7) / 8;
5685
m_data_size+= num_null_bytes;
5686
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
5687
&m_null_bits, num_null_bytes,
5688
&m_field_metadata, (m_colcnt * 2),
5691
memset(m_field_metadata, 0, (m_colcnt * 2));
5694
Create an array for the field metadata and store it.
5696
m_field_metadata_size= save_field_metadata();
5697
assert(m_field_metadata_size <= (m_colcnt * 2));
5700
Now set the size of the data to the size of the field metadata array
5701
plus one or two bytes for number of elements in the field metadata array.
5703
if (m_field_metadata_size > 255)
5704
m_data_size+= m_field_metadata_size + 2;
5706
m_data_size+= m_field_metadata_size + 1;
5708
memset(m_null_bits, 0, num_null_bytes);
5709
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
5710
if (m_table->field[i]->maybe_null())
5711
m_null_bits[(i / 8)]+= 1 << (i % 8);
5717
Constructor used by slave to read the event from the binary log.
5719
Table_map_log_event::Table_map_log_event(const char *buf, uint32_t event_len,
5720
const Format_description_log_event
5723
: Log_event(buf, description_event),
5725
m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
5726
m_colcnt(0), m_coltype(0),
5727
m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
5728
m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
5729
m_null_bits(0), m_meta_memory(NULL)
5731
unsigned int bytes_read= 0;
5733
uint8_t common_header_len= description_event->common_header_len;
5734
uint8_t post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
5736
/* Read the post-header */
5737
const char *post_start= buf + common_header_len;
5739
post_start+= TM_MAPID_OFFSET;
5740
if (post_header_len == 6)
5742
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
5743
m_table_id= uint4korr(post_start);
5748
assert(post_header_len == TABLE_MAP_HEADER_LEN);
5749
m_table_id= (ulong) uint6korr(post_start);
5750
post_start+= TM_FLAGS_OFFSET;
5753
assert(m_table_id != UINT32_MAX);
5755
m_flags= uint2korr(post_start);
5757
/* Read the variable part of the event */
5758
const char *const vpart= buf + common_header_len + post_header_len;
5760
/* Extract the length of the various parts from the buffer */
5761
unsigned char const *const ptr_dblen= (unsigned char const*)vpart + 0;
5762
m_dblen= *(unsigned char*) ptr_dblen;
5764
/* Length of database name + counter + terminating null */
5765
unsigned char const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
5766
m_tbllen= *(unsigned char*) ptr_tbllen;
5768
/* Length of table name + counter + terminating null */
5769
unsigned char const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
5770
unsigned char *ptr_after_colcnt= (unsigned char*) ptr_colcnt;
5771
m_colcnt= net_field_length(&ptr_after_colcnt);
5773
/* Allocate mem for all fields in one go. If fails, caught in is_valid() */
5774
m_memory= (unsigned char*) my_multi_malloc(MYF(MY_WME),
5775
&m_dbnam, (uint) m_dblen + 1,
5776
&m_tblnam, (uint) m_tbllen + 1,
5777
&m_coltype, (uint) m_colcnt,
5782
/* Copy the different parts into their memory */
5783
strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
5784
strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
5785
memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
5787
ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
5788
bytes_read= ptr_after_colcnt - (unsigned char *)buf;
5789
if (bytes_read < event_len)
5791
m_field_metadata_size= net_field_length(&ptr_after_colcnt);
5792
assert(m_field_metadata_size <= (m_colcnt * 2));
5793
uint32_t num_null_bytes= (m_colcnt + 7) / 8;
5794
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
5795
&m_null_bits, num_null_bytes,
5796
&m_field_metadata, m_field_metadata_size,
5798
memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
5799
ptr_after_colcnt= (unsigned char*)ptr_after_colcnt + m_field_metadata_size;
5800
memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
5807
Table_map_log_event::~Table_map_log_event()
5809
free(m_meta_memory);
5814
Return value is an error code, one of:
5816
-1 Failure to open table [from open_tables()]
5818
1 No room for more tables [from set_table()]
5819
2 Out of memory [from set_table()]
5820
3 Wrong table definition
5821
4 Daisy-chaining RBR with SBR not possible
5824
int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
5826
RPL_TableList *table_list;
5827
char *db_mem, *tname_mem;
5830
assert(rli->sql_thd == thd);
5832
/* Step the query id to mark what columns that are actually used. */
5833
pthread_mutex_lock(&LOCK_thread_count);
5834
thd->query_id= next_query_id();
5835
pthread_mutex_unlock(&LOCK_thread_count);
5837
if (!(memory= my_multi_malloc(MYF(MY_WME),
5838
&table_list, (uint) sizeof(RPL_TableList),
5839
&db_mem, (uint) NAME_LEN + 1,
5840
&tname_mem, (uint) NAME_LEN + 1,
5842
return(HA_ERR_OUT_OF_MEM);
5844
memset(table_list, 0, sizeof(*table_list));
5845
table_list->db = db_mem;
5846
table_list->alias= table_list->table_name = tname_mem;
5847
table_list->lock_type= TL_WRITE;
5848
table_list->next_global= table_list->next_local= 0;
5849
table_list->table_id= m_table_id;
5850
table_list->updating= 1;
5851
my_stpcpy(table_list->db, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
5852
my_stpcpy(table_list->table_name, m_tblnam);
5856
if (!rpl_filter->db_ok(table_list->db) ||
5857
(rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list)))
5864
open_tables() reads the contents of thd->lex, so they must be
5865
initialized, so we should call lex_start(); to be even safer, we
5866
call mysql_init_query() which does a more complete set of inits.
5869
mysql_reset_thd_for_next_command(thd);
5871
Check if the slave is set to use SBR. If so, it should switch
5872
to using RBR until the end of the "statement", i.e., next
5873
STMT_END_F or next error.
5875
if (!thd->current_stmt_binlog_row_based &&
5876
mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG))
5878
thd->set_current_stmt_binlog_row_based();
5882
Open the table if it is not already open and add the table to
5883
table map. Note that for any table that should not be
5884
replicated, a filter is needed.
5886
The creation of a new TableList is used to up-cast the
5887
table_list consisting of RPL_TableList items. This will work
5888
since the only case where the argument to open_tables() is
5889
changed, is when thd->lex->query_tables == table_list, i.e.,
5890
when the statement requires prelocking. Since this is not
5891
executed when a statement is executed, this case will not occur.
5892
As a precaution, an assertion is added to ensure that the bad
5895
Either way, the memory in the list is *never* released
5896
internally in the open_tables() function, hence we take a copy
5897
of the pointer to make sure that it's not lost.
5900
assert(thd->lex->query_tables != table_list);
5901
TableList *tmp_table_list= table_list;
5902
if ((error= open_tables(thd, &tmp_table_list, &count, 0)))
5904
if (thd->is_slave_error || thd->is_fatal_error)
5907
Error reporting borrowed from Query_log_event with many excessive
5908
simplifications (we don't honour --slave-skip-errors)
5910
uint32_t actual_error= thd->main_da.sql_errno();
5911
rli->report(ERROR_LEVEL, actual_error,
5912
_("Error '%s' on opening table `%s`.`%s`"),
5914
? thd->main_da.message()
5915
: _("unexpected success or fatal error")),
5916
table_list->db, table_list->table_name);
5917
thd->is_slave_error= 1;
5922
m_table= table_list->table;
5925
This will fail later otherwise, the 'in_use' field should be
5926
set to the current thread.
5928
assert(m_table->in_use);
5931
Use placement new to construct the table_def instance in the
5932
memory allocated for it inside table_list.
5934
The memory allocated by the table_def structure (i.e., not the
5935
memory allocated *for* the table_def structure) is released
5936
inside Relay_log_info::clear_tables_to_lock() by calling the
5937
table_def destructor explicitly.
5939
new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt,
5940
m_field_metadata, m_field_metadata_size, m_null_bits);
5941
table_list->m_tabledef_valid= true;
5944
We record in the slave's information that the table should be
5945
locked by linking the table into the list of tables to lock.
5947
table_list->next_global= table_list->next_local= rli->tables_to_lock;
5948
const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
5949
const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
5950
/* 'memory' is freed in clear_tables_to_lock */
5960
Log_event::enum_skip_reason
5961
Table_map_log_event::do_shall_skip(Relay_log_info *rli)
5964
If the slave skip counter is 1, then we should not start executing
5967
return continue_group(rli);
5970
int Table_map_log_event::do_update_pos(Relay_log_info *rli)
5972
rli->inc_event_relay_log_pos();
5977
bool Table_map_log_event::write_data_header(IO_CACHE *file)
5979
assert(m_table_id != UINT32_MAX);
5980
unsigned char buf[TABLE_MAP_HEADER_LEN];
5981
int6store(buf + TM_MAPID_OFFSET, (uint64_t)m_table_id);
5982
int2store(buf + TM_FLAGS_OFFSET, m_flags);
5983
return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
5986
bool Table_map_log_event::write_data_body(IO_CACHE *file)
5988
assert(m_dbnam != NULL);
5989
assert(m_tblnam != NULL);
5990
/* We use only one byte per length for storage in event: */
5991
assert(m_dblen < 128);
5992
assert(m_tbllen < 128);
5994
unsigned char const dbuf[]= { (unsigned char) m_dblen };
5995
unsigned char const tbuf[]= { (unsigned char) m_tbllen };
5997
unsigned char cbuf[sizeof(m_colcnt)];
5998
unsigned char *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
5999
assert(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
6002
Store the size of the field metadata.
6004
unsigned char mbuf[sizeof(m_field_metadata_size)];
6005
unsigned char *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
6007
return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
6008
my_b_safe_write(file, (const unsigned char*)m_dbnam, m_dblen+1) ||
6009
my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
6010
my_b_safe_write(file, (const unsigned char*)m_tblnam, m_tbllen+1) ||
6011
my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
6012
my_b_safe_write(file, m_coltype, m_colcnt) ||
6013
my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
6014
my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
6015
my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
6020
Print some useful information for the SHOW BINARY LOG information
6024
void Table_map_log_event::pack_info(Protocol *protocol)
6027
size_t bytes= snprintf(buf, sizeof(buf),
6028
"table_id: %lu (%s.%s)",
6029
m_table_id, m_dbnam, m_tblnam);
6030
protocol->store(buf, bytes, &my_charset_bin);
6034
/**************************************************************************
6035
Write_rows_log_event member functions
6036
**************************************************************************/
6039
Constructor used to build an event for writing to the binary log.
6041
Write_rows_log_event::Write_rows_log_event(THD *thd_arg, Table *tbl_arg,
6043
bool is_transactional)
6044
: Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
6049
Constructor used by slave to read the event from the binary log.
6051
Write_rows_log_event::Write_rows_log_event(const char *buf, uint32_t event_len,
6052
const Format_description_log_event
6054
: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
6059
Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6064
todo: to introduce a property for the event (handler?) which forces
6065
applying the event in the replace (idempotent) fashion.
6067
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
6070
We are using REPLACE semantics and not INSERT IGNORE semantics
6071
when writing rows, that is: new rows replace old rows. We need to
6072
inform the storage engine that it should use this behaviour.
6075
/* Tell the storage engine that we are using REPLACE semantics. */
6076
thd->lex->duplicates= DUP_REPLACE;
6079
Pretend we're executing a REPLACE command: this is needed for
6080
InnoDB since it is not (properly) checking the
6081
lex->duplicates flag.
6083
thd->lex->sql_command= SQLCOM_REPLACE;
6085
Do not raise the error flag in case of hitting to an unique attribute
6087
m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
6090
m_table->file->ha_start_bulk_insert(0);
6092
We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
6093
any TIMESTAMP column with data from the row but instead will use
6094
the event's current time.
6095
As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
6096
columns, we know that all TIMESTAMP columns on slave will receive explicit
6097
data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
6098
When we allow a table without TIMESTAMP to be replicated to a table having
6099
more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
6100
column to be replicated into a BIGINT column and the slave's table has a
6101
TIMESTAMP column, then the slave's TIMESTAMP column will take its value
6102
from set_time() which we called earlier (consistent with SBR). And then in
6103
some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
6104
analyze if explicit data is provided for slave's TIMESTAMP columns).
6106
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
6112
Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6116
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
6118
m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
6119
m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
6121
resetting the extra with
6122
table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
6124
explanation: file->reset() performs this duty
6125
ultimately. Still todo: fix
6128
if ((local_error= m_table->file->ha_end_bulk_insert()))
6130
m_table->file->print_error(local_error, MYF(0));
6132
return error? error : local_error;
6137
Check if there are more UNIQUE keys after the given key.
6140
last_uniq_key(Table *table, uint32_t keyno)
6142
while (++keyno < table->s->keys)
6143
if (table->key_info[keyno].flags & HA_NOSAME)
6149
Check if an error is a duplicate key error.
6151
This function is used to check if an error code is one of the
6152
duplicate key error, i.e., and error code for which it is sensible
6153
to do a <code>get_dup_key()</code> to retrieve the duplicate key.
6155
@param errcode The error code to check.
6157
@return <code>true</code> if the error code is such that
6158
<code>get_dup_key()</code> will return true, <code>false</code>
6162
is_duplicate_key_error(int errcode)
6166
case HA_ERR_FOUND_DUPP_KEY:
6167
case HA_ERR_FOUND_DUPP_UNIQUE:
6174
Write the current row into event's table.
6176
The row is located in the row buffer, pointed by @c m_curr_row member.
6177
Number of columns of the row is stored in @c m_width member (it can be
6178
different from the number of columns in the table to which we insert).
6179
Bitmap @c m_cols indicates which columns are present in the row. It is assumed
6180
that event's table is already open and pointed by @c m_table.
6182
If the same record already exists in the table it can be either overwritten
6183
or an error is reported depending on the value of @c overwrite flag
6184
(error reporting not yet implemented). Note that the matching record can be
6185
different from the row we insert if we use primary keys to identify records in
6188
The row to be inserted can contain values only for selected columns. The
6189
missing columns are filled with default values using @c prepare_record()
6190
function. If a matching record is found in the table and @c overwritte is
6191
true, the missing columns are taken from it.
6193
@param rli Relay log info (needed for row unpacking).
6195
Shall we overwrite if the row already exists or signal
6196
error (currently ignored).
6198
@returns Error code on failure, 0 on success.
6200
This method, if successful, sets @c m_curr_row_end pointer to point at the
6201
next row in the rows buffer. This is done when unpacking the row to be
6204
@note If a matching record is found, it is either updated using
6205
@c ha_update_row() or first deleted and then new record written.
6209
Rows_log_event::write_row(const Relay_log_info *const rli,
6210
const bool overwrite)
6212
assert(m_table != NULL && thd != NULL);
6214
Table *table= m_table; // pointer to event's table
6217
auto_afree_ptr<char> key(NULL);
6219
/* fill table->record[0] with default values */
6222
We only check if the columns have default values for non-NDB
6223
engines, for NDB we ignore the check since updates are sent as
6224
writes, causing errors when trying to prepare the record.
6226
TODO[ndb]: Elimiate this hard-coded dependency on NDB. Ideally,
6227
the engine should be able to set a flag that it want the default
6228
values filled in and one flag to handle the case that the default
6229
values should be checked. Maybe these two flags can be combined.
6231
if ((error= prepare_record(table, &m_cols, m_width, true)))
6234
/* unpack row into table->record[0] */
6235
error= unpack_current_row(rli, &m_cols);
6237
// Temporary fix to find out why it fails [/Matz]
6238
memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
6241
Try to write record. If a corresponding record already exists in the table,
6242
we try to change it using ha_update_row() if possible. Otherwise we delete
6243
it and repeat the whole process again.
6245
TODO: Add safety measures against infinite looping.
6248
while ((error= table->file->ha_write_row(table->record[0])))
6250
if (error == HA_ERR_LOCK_DEADLOCK ||
6251
error == HA_ERR_LOCK_WAIT_TIMEOUT ||
6252
(keynum= table->file->get_dup_key(error)) < 0 ||
6256
Deadlock, waiting for lock or just an error from the handler
6257
such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
6258
Retrieval of the duplicate key number may fail
6259
- either because the error was not "duplicate key" error
6260
- or because the information which key is not available
6262
table->file->print_error(error, MYF(0));
6266
We need to retrieve the old row into record[1] to be able to
6267
either update or delete the offending record. We either:
6269
- use rnd_pos() with a row-id (available as dupp_row) to the
6270
offending row, if that is possible (MyISAM and Blackhole), or else
6272
- use index_read_idx() with the key that is duplicated, to
6273
retrieve the offending row.
6275
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
6277
if (table->file->inited && (error= table->file->ha_index_end()))
6279
if ((error= table->file->ha_rnd_init(false)))
6282
error= table->file->rnd_pos(table->record[1], table->file->dup_ref);
6283
table->file->ha_rnd_end();
6286
table->file->print_error(error, MYF(0));
6292
if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
6297
if (key.get() == NULL)
6299
key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
6300
if (key.get() == NULL)
6306
key_copy((unsigned char*)key.get(), table->record[0], table->key_info + keynum,
6308
error= table->file->index_read_idx_map(table->record[1], keynum,
6309
(const unsigned char*)key.get(),
6314
table->file->print_error(error, MYF(0));
6320
Now, record[1] should contain the offending row. That
6321
will enable us to update it or, alternatively, delete it (so
6322
that we can insert the new row afterwards).
6326
If row is incomplete we will use the record found to fill
6329
if (!get_flags(COMPLETE_ROWS_F))
6331
restore_record(table,record[1]);
6332
error= unpack_current_row(rli, &m_cols);
6336
REPLACE is defined as either INSERT or DELETE + INSERT. If
6337
possible, we can replace it with an UPDATE, but that will not
6338
work on InnoDB if FOREIGN KEY checks are necessary.
6340
I (Matz) am not sure of the reason for the last_uniq_key()
6341
check as, but I'm guessing that it's something along the
6344
Suppose that we got the duplicate key to be a key that is not
6345
the last unique key for the table and we perform an update:
6346
then there might be another key for which the unique check will
6347
fail, so we're better off just deleting the row and inserting
6350
if (last_uniq_key(table, keynum) &&
6351
!table->file->referenced_by_foreign_key())
6353
error=table->file->ha_update_row(table->record[1],
6357
case HA_ERR_RECORD_IS_THE_SAME:
6364
table->file->print_error(error, MYF(0));
6371
if ((error= table->file->ha_delete_row(table->record[1])))
6373
table->file->print_error(error, MYF(0));
6376
/* Will retry ha_write_row() with the offending row removed. */
6385
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6387
assert(m_table != NULL);
6389
write_row(rli, /* if 1 then overwrite */
6390
bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
6392
if (error && !thd->is_error())
6395
my_error(ER_UNKNOWN_ERROR, MYF(0));
6402
/**************************************************************************
6403
Delete_rows_log_event member functions
6404
**************************************************************************/
6407
Compares table->record[0] and table->record[1]
6409
Returns TRUE if different.
6411
static bool record_compare(Table *table)
6414
Need to set the X bit and the filler bits in both records since
6415
there are engines that do not set it correctly.
6417
In addition, since MyISAM checks that one hasn't tampered with the
6418
record, it is necessary to restore the old bytes into the record
6419
after doing the comparison.
6421
TODO[record format ndb]: Remove it once NDB returns correct
6422
records. Check that the other engines also return correct records.
6425
unsigned char saved_x[2], saved_filler[2];
6427
if (table->s->null_bytes > 0)
6429
for (int i = 0 ; i < 2 ; ++i)
6431
saved_x[i]= table->record[i][0];
6432
saved_filler[i]= table->record[i][table->s->null_bytes - 1];
6433
table->record[i][0]|= 1U;
6434
table->record[i][table->s->null_bytes - 1]|=
6435
256U - (1U << table->s->last_null_bit_pos);
6439
if (table->s->blob_fields + table->s->varchar_fields == 0)
6441
result= cmp_record(table,record[1]);
6442
goto record_compare_exit;
6445
/* Compare null bits */
6446
if (memcmp(table->null_flags,
6447
table->null_flags+table->s->rec_buff_length,
6448
table->s->null_bytes))
6450
result= true; // Diff in NULL value
6451
goto record_compare_exit;
6454
/* Compare updated fields */
6455
for (Field **ptr=table->field ; *ptr ; ptr++)
6457
if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
6460
goto record_compare_exit;
6464
record_compare_exit:
6466
Restore the saved bytes.
6468
TODO[record format ndb]: Remove this code once NDB returns the
6469
correct record format.
6471
if (table->s->null_bytes > 0)
6473
for (int i = 0 ; i < 2 ; ++i)
6475
table->record[i][0]= saved_x[i];
6476
table->record[i][table->s->null_bytes - 1]= saved_filler[i];
6484
Locate the current row in event's table.
6486
The current row is pointed by @c m_curr_row. Member @c m_width tells how many
6487
columns are there in the row (this can be differnet from the number of columns
6488
in the table). It is assumed that event's table is already open and pointed
6491
If a corresponding record is found in the table it is stored in
6492
@c m_table->record[0]. Note that when record is located based on a primary
6493
key, it is possible that the record found differs from the row being located.
6495
If no key is specified or table does not have keys, a table scan is used to
6496
find the row. In that case the row should be complete and contain values for
6497
all columns. However, it can still be shorter than the table, i.e. the table
6498
can contain extra columns not present in the row. It is also possible that
6499
the table has fewer columns than the row being located.
6501
@returns Error code on failure, 0 on success.
6503
@post In case of success @c m_table->record[0] contains the record found.
6504
Also, the internal "cursor" of the table is positioned at the record found.
6506
@note If the engine allows random access of the records, a combination of
6507
@c position() and @c rnd_pos() will be used.
6510
int Rows_log_event::find_row(const Relay_log_info *rli)
6512
assert(m_table && m_table->in_use != NULL);
6514
Table *table= m_table;
6517
/* unpack row - missing fields get default values */
6518
prepare_record(table, &m_cols, m_width, false/* don't check errors */);
6519
error= unpack_current_row(rli, &m_cols);
6521
// Temporary fix to find out why it fails [/Matz]
6522
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
6524
if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
6525
table->s->primary_key < MAX_KEY)
6528
Use a more efficient method to fetch the record given by
6529
table->record[0] if the engine allows it. We first compute a
6530
row reference using the position() member function (it will be
6531
stored in table->file->ref) and the use rnd_pos() to position
6532
the "cursor" (i.e., record[0] in this case) at the correct row.
6534
TODO: Add a check that the correct record has been fetched by
6535
comparing with the original record. Take into account that the
6536
record on the master and slave can be of different
6537
length. Something along these lines should work:
6539
ADD>>> store_record(table,record[1]);
6540
int error= table->file->rnd_pos(table->record[0], table->file->ref);
6541
ADD>>> assert(memcmp(table->record[1], table->record[0],
6542
table->s->reclength) == 0);
6545
int error= table->file->rnd_pos_by_record(table->record[0]);
6546
table->file->ha_rnd_end();
6549
table->file->print_error(error, MYF(0));
6554
// We can't use position() - try other methods.
6557
Save copy of the record in table->record[1]. It might be needed
6558
later if linear search is used to find exact match.
6560
store_record(table,record[1]);
6562
if (table->s->keys > 0)
6564
/* We have a key: search the table using the index */
6565
if (!table->file->inited && (error= table->file->ha_index_init(0, false)))
6567
table->file->print_error(error, MYF(0));
6571
/* Fill key data for the row */
6574
key_copy(m_key, table->record[0], table->key_info, 0);
6577
We need to set the null bytes to ensure that the filler bit are
6578
all set when returning. There are storage engines that just set
6579
the necessary bits on the bytes and don't set the filler bits
6582
my_ptrdiff_t const pos=
6583
table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
6584
table->record[0][pos]= 0xFF;
6586
if ((error= table->file->index_read_map(table->record[0], m_key,
6588
HA_READ_KEY_EXACT)))
6590
table->file->print_error(error, MYF(0));
6591
table->file->ha_index_end();
6596
Below is a minor "optimization". If the key (i.e., key number
6597
0) has the HA_NOSAME flag set, we know that we have found the
6598
correct record (since there can be no duplicates); otherwise, we
6599
have to compare the record with the one found to see if it is
6602
CAVEAT! This behaviour is essential for the replication of,
6603
e.g., the mysql.proc table since the correct record *shall* be
6604
found using the primary key *only*. There shall be no
6605
comparison of non-PK columns to decide if the correct record is
6606
found. I can see no scenario where it would be incorrect to
6607
chose the row to change only using a PK or an UNNI.
6609
if (table->key_info->flags & HA_NOSAME)
6611
table->file->ha_index_end();
6616
In case key is not unique, we still have to iterate over records found
6617
and find the one which is identical to the row given. A copy of the
6618
record we are looking for is stored in record[1].
6620
while (record_compare(table))
6623
We need to set the null bytes to ensure that the filler bit
6624
are all set when returning. There are storage engines that
6625
just set the necessary bits on the bytes and don't set the
6626
filler bits correctly.
6628
TODO[record format ndb]: Remove this code once NDB returns the
6629
correct record format.
6631
if (table->s->null_bytes > 0)
6633
table->record[0][table->s->null_bytes - 1]|=
6634
256U - (1U << table->s->last_null_bit_pos);
6637
if ((error= table->file->index_next(table->record[0])))
6639
table->file->print_error(error, MYF(0));
6640
table->file->ha_index_end();
6646
Have to restart the scan to be able to fetch the next row.
6648
table->file->ha_index_end();
6652
int restart_count= 0; // Number of times scanning has restarted from top
6654
/* We don't have a key: search the table using rnd_next() */
6655
if ((error= table->file->ha_rnd_init(1)))
6657
table->file->print_error(error, MYF(0));
6661
/* Continue until we find the right record or have made a full loop */
6664
error= table->file->rnd_next(table->record[0]);
6669
case HA_ERR_RECORD_DELETED:
6672
case HA_ERR_END_OF_FILE:
6673
if (++restart_count < 2)
6674
table->file->ha_rnd_init(1);
6678
table->file->print_error(error, MYF(0));
6679
table->file->ha_rnd_end();
6683
while (restart_count < 2 && record_compare(table));
6686
Note: above record_compare will take into accout all record fields
6687
which might be incorrect in case a partial row was given in the event
6689
table->file->ha_rnd_end();
6691
assert(error == HA_ERR_END_OF_FILE || error == HA_ERR_RECORD_DELETED || error == 0);
6695
table->default_column_bitmaps();
6698
table->default_column_bitmaps();
6704
Constructor used to build an event for writing to the binary log.
6707
Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, Table *tbl_arg,
6709
bool is_transactional)
6710
: Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
6715
Constructor used by slave to read the event from the binary log.
6717
Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint32_t event_len,
6718
const Format_description_log_event
6720
: Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
6726
Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6728
if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
6729
m_table->s->primary_key < MAX_KEY)
6732
We don't need to allocate any memory for m_key since it is not used.
6737
if (m_table->s->keys > 0)
6739
// Allocate buffer for key searches
6740
m_key= (unsigned char*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
6742
return HA_ERR_OUT_OF_MEM;
6749
Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6752
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6753
m_table->file->ha_index_or_rnd_end();
6760
int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6763
assert(m_table != NULL);
6765
if (!(error= find_row(rli)))
6768
Delete the record found, located in record[0]
6770
error= m_table->file->ha_delete_row(m_table->record[0]);
6776
/**************************************************************************
6777
Update_rows_log_event member functions
6778
**************************************************************************/
6781
Constructor used to build an event for writing to the binary log.
6783
Update_rows_log_event::Update_rows_log_event(THD *thd_arg, Table *tbl_arg,
6785
bool is_transactional)
6786
: Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
6788
init(tbl_arg->write_set);
6791
void Update_rows_log_event::init(MY_BITMAP const *cols)
6793
/* if bitmap_init fails, caught in is_valid() */
6794
if (likely(!bitmap_init(&m_cols_ai,
6795
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
6799
/* Cols can be zero if this is a dummy binrows event */
6800
if (likely(cols != NULL))
6802
memcpy(m_cols_ai.bitmap, cols->bitmap, no_bytes_in_map(cols));
6803
create_last_word_mask(&m_cols_ai);
6809
Update_rows_log_event::~Update_rows_log_event()
6811
if (m_cols_ai.bitmap == m_bitbuf_ai) // no my_malloc happened
6812
m_cols_ai.bitmap= 0; // so no free in bitmap_free
6813
bitmap_free(&m_cols_ai); // To pair with bitmap_init().
6818
Constructor used by slave to read the event from the binary log.
6820
Update_rows_log_event::Update_rows_log_event(const char *buf, uint32_t event_len,
6822
Format_description_log_event
6824
: Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
6830
Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6832
if (m_table->s->keys > 0)
6834
// Allocate buffer for key searches
6835
m_key= (unsigned char*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
6837
return HA_ERR_OUT_OF_MEM;
6840
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
6846
Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6849
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6850
m_table->file->ha_index_or_rnd_end();
6851
free(m_key); // Free for multi_malloc
6858
Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6860
assert(m_table != NULL);
6862
int error= find_row(rli);
6866
We need to read the second image in the event of error to be
6867
able to skip to the next pair of updates
6869
m_curr_row= m_curr_row_end;
6870
unpack_current_row(rli, &m_cols_ai);
6875
This is the situation after locating BI:
6877
===|=== before image ====|=== after image ===|===
6879
m_curr_row m_curr_row_end
6881
BI found in the table is stored in record[0]. We copy it to record[1]
6882
and unpack AI to record[0].
6885
store_record(m_table,record[1]);
6887
m_curr_row= m_curr_row_end;
6888
error= unpack_current_row(rli, &m_cols_ai); // this also updates m_curr_row_end
6891
Now we have the right row to update. The old row (the one we're
6892
looking for) is in record[1] and the new row is in record[0].
6895
// Temporary fix to find out why it fails [/Matz]
6896
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
6897
memcpy(m_table->write_set->bitmap, m_cols_ai.bitmap, (m_table->write_set->n_bits + 7) / 8);
6899
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
6900
if (error == HA_ERR_RECORD_IS_THE_SAME)
6907
Incident_log_event::Incident_log_event(const char *buf, uint32_t event_len,
6908
const Format_description_log_event *descr_event)
6909
: Log_event(buf, descr_event)
6911
uint8_t const common_header_len=
6912
descr_event->common_header_len;
6913
uint8_t const post_header_len=
6914
descr_event->post_header_len[INCIDENT_EVENT-1];
6916
m_incident= static_cast<Incident>(uint2korr(buf + common_header_len));
6917
char const *ptr= buf + common_header_len + post_header_len;
6918
char const *const str_end= buf + event_len;
6919
uint8_t len= 0; // Assignment to keep compiler happy
6920
const char *str= NULL; // Assignment to keep compiler happy
6921
read_str(&ptr, str_end, &str, &len);
6922
m_message.str= const_cast<char*>(str);
6923
m_message.length= len;
6928
Incident_log_event::~Incident_log_event()
6934
Incident_log_event::description() const
6936
static const char *const description[]= {
6937
"NOTHING", // Not used
6941
assert(0 <= m_incident);
6942
assert((size_t) m_incident <= sizeof(description)/sizeof(*description));
6944
return description[m_incident];
6948
void Incident_log_event::pack_info(Protocol *protocol)
6952
if (m_message.length > 0)
6953
bytes= snprintf(buf, sizeof(buf), "#%d (%s)",
6954
m_incident, description());
6956
bytes= snprintf(buf, sizeof(buf), "#%d (%s): %s",
6957
m_incident, description(), m_message.str);
6958
protocol->store(buf, bytes, &my_charset_bin);
6963
Incident_log_event::do_apply_event(Relay_log_info const *rli)
6965
rli->report(ERROR_LEVEL, ER_SLAVE_INCIDENT,
6966
ER(ER_SLAVE_INCIDENT),
6968
m_message.length > 0 ? m_message.str : "<none>");
6974
Incident_log_event::write_data_header(IO_CACHE *file)
6976
unsigned char buf[sizeof(int16_t)];
6977
int2store(buf, (int16_t) m_incident);
6978
return(my_b_safe_write(file, buf, sizeof(buf)));
6982
Incident_log_event::write_data_body(IO_CACHE *file)
6984
return(write_str(file, m_message.str, m_message.length));
6987
Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint32_t event_len,
6988
const Format_description_log_event* description_event)
6989
:Log_event(buf, description_event)
6991
uint8_t header_size= description_event->common_header_len;
6992
ident_len = event_len - header_size;
6993
set_if_smaller(ident_len,FN_REFLEN-1);
6994
log_ident= buf + header_size;