1
/* Copyright (C) 2000-2004 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
19
#include "mysql_priv.h"
23
#ifdef USE_PRAGMA_IMPLEMENTATION
24
#pragma implementation // gcc: Class implementation
27
#include "mysql_priv.h"
31
#include "rpl_filter.h"
32
#include "rpl_utility.h"
33
#include "rpl_record.h"
36
#endif /* MYSQL_CLIENT */
39
#include <my_bitmap.h>
41
#define log_cs &my_charset_latin1
43
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
47
Size of buffer for printing a double in format %.<PREC>g
49
optional '-' + optional zero + '.' + PREC digits + 'e' + sign +
50
exponent digits + '\0'
52
#define FMT_G_BUFSIZE(PREC) (3 + (PREC) + 5 + 1)
55
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
56
static const char *HA_ERR(int i)
59
case HA_ERR_KEY_NOT_FOUND: return "HA_ERR_KEY_NOT_FOUND";
60
case HA_ERR_FOUND_DUPP_KEY: return "HA_ERR_FOUND_DUPP_KEY";
61
case HA_ERR_RECORD_CHANGED: return "HA_ERR_RECORD_CHANGED";
62
case HA_ERR_WRONG_INDEX: return "HA_ERR_WRONG_INDEX";
63
case HA_ERR_CRASHED: return "HA_ERR_CRASHED";
64
case HA_ERR_WRONG_IN_RECORD: return "HA_ERR_WRONG_IN_RECORD";
65
case HA_ERR_OUT_OF_MEM: return "HA_ERR_OUT_OF_MEM";
66
case HA_ERR_NOT_A_TABLE: return "HA_ERR_NOT_A_TABLE";
67
case HA_ERR_WRONG_COMMAND: return "HA_ERR_WRONG_COMMAND";
68
case HA_ERR_OLD_FILE: return "HA_ERR_OLD_FILE";
69
case HA_ERR_NO_ACTIVE_RECORD: return "HA_ERR_NO_ACTIVE_RECORD";
70
case HA_ERR_RECORD_DELETED: return "HA_ERR_RECORD_DELETED";
71
case HA_ERR_RECORD_FILE_FULL: return "HA_ERR_RECORD_FILE_FULL";
72
case HA_ERR_INDEX_FILE_FULL: return "HA_ERR_INDEX_FILE_FULL";
73
case HA_ERR_END_OF_FILE: return "HA_ERR_END_OF_FILE";
74
case HA_ERR_UNSUPPORTED: return "HA_ERR_UNSUPPORTED";
75
case HA_ERR_TO_BIG_ROW: return "HA_ERR_TO_BIG_ROW";
76
case HA_WRONG_CREATE_OPTION: return "HA_WRONG_CREATE_OPTION";
77
case HA_ERR_FOUND_DUPP_UNIQUE: return "HA_ERR_FOUND_DUPP_UNIQUE";
78
case HA_ERR_UNKNOWN_CHARSET: return "HA_ERR_UNKNOWN_CHARSET";
79
case HA_ERR_WRONG_MRG_TABLE_DEF: return "HA_ERR_WRONG_MRG_TABLE_DEF";
80
case HA_ERR_CRASHED_ON_REPAIR: return "HA_ERR_CRASHED_ON_REPAIR";
81
case HA_ERR_CRASHED_ON_USAGE: return "HA_ERR_CRASHED_ON_USAGE";
82
case HA_ERR_LOCK_WAIT_TIMEOUT: return "HA_ERR_LOCK_WAIT_TIMEOUT";
83
case HA_ERR_LOCK_TABLE_FULL: return "HA_ERR_LOCK_TABLE_FULL";
84
case HA_ERR_READ_ONLY_TRANSACTION: return "HA_ERR_READ_ONLY_TRANSACTION";
85
case HA_ERR_LOCK_DEADLOCK: return "HA_ERR_LOCK_DEADLOCK";
86
case HA_ERR_CANNOT_ADD_FOREIGN: return "HA_ERR_CANNOT_ADD_FOREIGN";
87
case HA_ERR_NO_REFERENCED_ROW: return "HA_ERR_NO_REFERENCED_ROW";
88
case HA_ERR_ROW_IS_REFERENCED: return "HA_ERR_ROW_IS_REFERENCED";
89
case HA_ERR_NO_SAVEPOINT: return "HA_ERR_NO_SAVEPOINT";
90
case HA_ERR_NON_UNIQUE_BLOCK_SIZE: return "HA_ERR_NON_UNIQUE_BLOCK_SIZE";
91
case HA_ERR_NO_SUCH_TABLE: return "HA_ERR_NO_SUCH_TABLE";
92
case HA_ERR_TABLE_EXIST: return "HA_ERR_TABLE_EXIST";
93
case HA_ERR_NO_CONNECTION: return "HA_ERR_NO_CONNECTION";
94
case HA_ERR_NULL_IN_SPATIAL: return "HA_ERR_NULL_IN_SPATIAL";
95
case HA_ERR_TABLE_DEF_CHANGED: return "HA_ERR_TABLE_DEF_CHANGED";
96
case HA_ERR_NO_PARTITION_FOUND: return "HA_ERR_NO_PARTITION_FOUND";
97
case HA_ERR_RBR_LOGGING_FAILED: return "HA_ERR_RBR_LOGGING_FAILED";
98
case HA_ERR_DROP_INDEX_FK: return "HA_ERR_DROP_INDEX_FK";
99
case HA_ERR_FOREIGN_DUPLICATE_KEY: return "HA_ERR_FOREIGN_DUPLICATE_KEY";
100
case HA_ERR_TABLE_NEEDS_UPGRADE: return "HA_ERR_TABLE_NEEDS_UPGRADE";
101
case HA_ERR_TABLE_READONLY: return "HA_ERR_TABLE_READONLY";
102
case HA_ERR_AUTOINC_READ_FAILED: return "HA_ERR_AUTOINC_READ_FAILED";
103
case HA_ERR_AUTOINC_ERANGE: return "HA_ERR_AUTOINC_ERANGE";
104
case HA_ERR_GENERIC: return "HA_ERR_GENERIC";
105
case HA_ERR_RECORD_IS_THE_SAME: return "HA_ERR_RECORD_IS_THE_SAME";
106
case HA_ERR_LOGGING_IMPOSSIBLE: return "HA_ERR_LOGGING_IMPOSSIBLE";
107
case HA_ERR_CORRUPT_EVENT: return "HA_ERR_CORRUPT_EVENT";
108
case HA_ERR_ROWS_EVENT_APPLY : return "HA_ERR_ROWS_EVENT_APPLY";
114
Error reporting facility for Rows_log_event::do_apply_event
116
@param level error, warning or info
117
@param ha_error HA_ERR_ code
118
@param rli pointer to the active Relay_log_info instance
119
@param thd pointer to the slave thread's thd
120
@param table pointer to the event's table object
121
@param type the type of the event
122
@param log_name the master binlog file name
123
@param pos the master binlog file pos (the next after the event)
126
static void inline slave_rows_error_report(enum loglevel level, int ha_error,
127
Relay_log_info const *rli, THD *thd,
128
TABLE *table, const char * type,
129
const char *log_name, ulong pos)
131
const char *handler_error= HA_ERR(ha_error);
132
char buff[MAX_SLAVE_ERRMSG], *slider;
133
const char *buff_end= buff + sizeof(buff);
135
List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
139
for (err= it++, slider= buff; err && slider < buff_end - 1;
140
slider += len, err= it++)
142
len= snprintf(slider, buff_end - slider,
143
" %s, Error_code: %d;", err->msg, err->code);
146
rli->report(level, thd->is_error()? thd->main_da.sql_errno() : 0,
147
"Could not execute %s event on table %s.%s;"
148
"%s handler error %s; "
149
"the event's master log %s, end_log_pos %lu",
150
type, table->s->db.str,
151
table->s->table_name.str,
153
handler_error == NULL? "<unknown>" : handler_error,
159
Cache that will automatically be written to a dedicated file on
165
class Write_on_release_cache
173
typedef unsigned short flag_set;
179
Write_on_release_cache
180
cache Pointer to cache to use
181
file File to write cache to upon destruction
182
flags Flags for the cache
186
Class used to guarantee copy of cache to file before exiting the
187
current block. On successful copy of the cache, the cache will
188
be reinited as a WRITE_CACHE.
190
Currently, a pointer to the cache is provided in the
191
constructor, but it would be possible to create a subclass
192
holding the IO_CACHE itself.
194
Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0)
195
: m_cache(cache), m_file(file), m_flags(flags)
197
reinit_io_cache(m_cache, WRITE_CACHE, 0L, false, true);
200
~Write_on_release_cache()
202
copy_event_cache_to_file_and_reinit(m_cache, m_file);
203
if (m_flags | FLUSH_F)
208
Return a pointer to the internal IO_CACHE.
215
Function to return a pointer to the internal cache, so that the
216
object can be treated as a IO_CACHE and used with the my_b_*
220
A pointer to the internal IO_CACHE.
222
IO_CACHE *operator&()
228
// Hidden, to prevent usage.
229
Write_on_release_cache(Write_on_release_cache const&);
236
uint debug_not_change_ts_if_art_event= 1; // bug#29309 simulation
243
static void pretty_print_str(IO_CACHE* cache, const char* str, int len)
245
const char* end = str + len;
246
my_b_printf(cache, "\'");
250
switch ((c=*str++)) {
251
case '\n': my_b_printf(cache, "\\n"); break;
252
case '\r': my_b_printf(cache, "\\r"); break;
253
case '\\': my_b_printf(cache, "\\\\"); break;
254
case '\b': my_b_printf(cache, "\\b"); break;
255
case '\t': my_b_printf(cache, "\\t"); break;
256
case '\'': my_b_printf(cache, "\\'"); break;
257
case 0 : my_b_printf(cache, "\\0"); break;
259
my_b_printf(cache, "%c", c);
263
my_b_printf(cache, "\'");
265
#endif /* MYSQL_CLIENT */
267
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
269
static void clear_all_errors(THD *thd, Relay_log_info *rli)
271
thd->is_slave_error = 0;
278
Ignore error code specified on command line.
281
inline int ignored_error_code(int err_code)
283
return ((err_code == ER_SLAVE_IGNORED_TABLE) ||
284
(use_slave_mask && bitmap_is_set(&slave_error_mask, err_code)));
293
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
294
static char *pretty_print_str(char *packet, const char *str, int len)
296
const char *end= str + len;
302
switch ((c=*str++)) {
303
case '\n': *pos++= '\\'; *pos++= 'n'; break;
304
case '\r': *pos++= '\\'; *pos++= 'r'; break;
305
case '\\': *pos++= '\\'; *pos++= '\\'; break;
306
case '\b': *pos++= '\\'; *pos++= 'b'; break;
307
case '\t': *pos++= '\\'; *pos++= 't'; break;
308
case '\'': *pos++= '\\'; *pos++= '\''; break;
309
case 0 : *pos++= '\\'; *pos++= '0'; break;
318
#endif /* !MYSQL_CLIENT */
321
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
324
Creates a temporary name for load data infile:.
326
@param buf Store new filename here
327
@param file_id File_id (part of file name)
328
@param event_server_id Event_id (part of file name)
329
@param ext Extension for file name
332
Pointer to start of extension
335
static char *slave_load_file_stem(char *buf, uint file_id,
336
int event_server_id, const char *ext)
339
fn_format(buf,"SQL_LOAD-",slave_load_tmpdir, "", MY_UNPACK_FILENAME);
343
buf = int10_to_str(::server_id, buf, 10);
345
buf = int10_to_str(event_server_id, buf, 10);
347
res= int10_to_str(file_id, buf, 10);
348
strmov(res, ext); // Add extension last
349
return res; // Pointer to extension
354
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
357
Delete all temporary files used for SQL_LOAD.
360
static void cleanup_load_tmpdir()
365
char fname[FN_REFLEN], prefbuf[31], *p;
367
if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
371
When we are deleting temporary files, we should only remove
372
the files associated with the server id of our server.
373
We don't use event_server_id here because since we've disabled
374
direct binlogging of Create_file/Append_file/Exec_load events
375
we cannot meet Start_log event in the middle of events from one
378
p= strmake(prefbuf, STRING_WITH_LEN("SQL_LOAD-"));
379
p= int10_to_str(::server_id, p, 10);
383
for (i=0 ; i < (uint)dirp->number_off_files; i++)
385
file=dirp->dir_entry+i;
386
if (is_prefix(file->name, prefbuf))
388
fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME);
389
my_delete(fname, MYF(0));
402
static bool write_str(IO_CACHE *file, const char *str, uint length)
405
tmp[0]= (uchar) length;
406
return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
407
my_b_safe_write(file, (uchar*) str, length));
415
static inline int read_str(const char **buf, const char *buf_end,
416
const char **str, uint8 *len)
418
if (*buf + ((uint) (uchar) **buf) >= buf_end)
422
(*buf)+= (uint) *len+1;
428
Transforms a string into "" or its expression in 0x... form.
431
char *str_to_hex(char *to, const char *from, uint len)
437
to= octet2hex(to, from, len);
440
to= strmov(to, "\"\"");
441
return to; // pointer to end 0 of 'to'
447
Append a version of the 'from' string suitable for use in a query to
448
the 'to' string. To generate a correct escaping, the character set
449
information in 'csinfo' is used.
453
append_query_string(CHARSET_INFO *csinfo,
454
String const *from, String *to)
457
uint32 const orig_len= to->length();
458
if (to->reserve(orig_len + from->length()*2+3))
461
beg= to->c_ptr_quick() + to->length();
463
if (csinfo->escape_with_backslash_is_dangerous)
464
ptr= str_to_hex(ptr, from->ptr(), from->length());
468
ptr+= escape_string_for_mysql(csinfo, ptr, 0,
469
from->ptr(), from->length());
472
to->length(orig_len + ptr - beg);
479
Prints a "session_var=value" string. Used by mysqlbinlog to print some SET
480
commands just before it prints a query.
485
static void print_set_option(IO_CACHE* file, uint32 bits_changed,
486
uint32 option, uint32 flags, const char* name,
489
if (bits_changed & option)
492
my_b_printf(file,", ");
493
my_b_printf(file,"%s=%d", name, test(flags & option));
499
/**************************************************************************
500
Log_event methods (= the parent class of all events)
501
**************************************************************************/
505
returns the human readable name of the event's type
508
const char* Log_event::get_type_str(Log_event_type type)
511
case START_EVENT_V3: return "Start_v3";
512
case STOP_EVENT: return "Stop";
513
case QUERY_EVENT: return "Query";
514
case ROTATE_EVENT: return "Rotate";
515
case INTVAR_EVENT: return "Intvar";
516
case LOAD_EVENT: return "Load";
517
case NEW_LOAD_EVENT: return "New_load";
518
case SLAVE_EVENT: return "Slave";
519
case CREATE_FILE_EVENT: return "Create_file";
520
case APPEND_BLOCK_EVENT: return "Append_block";
521
case DELETE_FILE_EVENT: return "Delete_file";
522
case EXEC_LOAD_EVENT: return "Exec_load";
523
case RAND_EVENT: return "RAND";
524
case XID_EVENT: return "Xid";
525
case USER_VAR_EVENT: return "User var";
526
case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
527
case TABLE_MAP_EVENT: return "Table_map";
528
case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old";
529
case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old";
530
case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old";
531
case WRITE_ROWS_EVENT: return "Write_rows";
532
case UPDATE_ROWS_EVENT: return "Update_rows";
533
case DELETE_ROWS_EVENT: return "Delete_rows";
534
case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
535
case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
536
case INCIDENT_EVENT: return "Incident";
537
default: return "Unknown"; /* impossible */
541
const char* Log_event::get_type_str()
543
return get_type_str(get_type_code());
548
Log_event::Log_event()
552
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
553
:log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), thd(thd_arg)
555
server_id= thd->server_id;
556
when= thd->start_time;
557
cache_stmt= using_trans;
562
This minimal constructor is for when you are not even sure that there
563
is a valid THD. For example in the server when we are shutting down or
564
flushing logs after receiving a SIGHUP (then we must write a Rotate to
565
the binlog but we have no THD, so we need this minimal constructor).
568
Log_event::Log_event()
569
:temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
572
server_id= ::server_id;
574
We can't call my_time() here as this would cause a call before
580
#endif /* !MYSQL_CLIENT */
584
Log_event::Log_event()
587
Log_event::Log_event(const char* buf,
588
const Format_description_log_event* description_event)
589
:temp_buf(0), cache_stmt(0)
594
when = uint4korr(buf);
595
server_id = uint4korr(buf + SERVER_ID_OFFSET);
596
data_written= uint4korr(buf + EVENT_LEN_OFFSET);
597
if (description_event->binlog_version==1)
604
log_pos= uint4korr(buf + LOG_POS_OFFSET);
606
If the log is 4.0 (so here it can only be a 4.0 relay log read by
607
the SQL thread or a 4.0 master binlog read by the I/O thread),
608
log_pos is the beginning of the event: we transform it into the end
609
of the event, which is more useful.
610
But how do you know that the log is 4.0: you know it if
611
description_event is version 3 *and* you are not reading a
612
Format_desc (remember that mysqlbinlog starts by assuming that 5.0
613
logs are in 4.0 format, until it finds a Format_desc).
615
if (description_event->binlog_version==3 &&
616
buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
619
If log_pos=0, don't change it. log_pos==0 is a marker to mean
620
"don't change rli->group_master_log_pos" (see
621
inc_group_relay_log_pos()). As it is unreal log_pos, adding the
622
event len's is nonsense. For example, a fake Rotate event should
623
not have its log_pos (which is 0) changed or it will modify
624
Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense
625
value of (a non-zero offset which does not exist in the master's
626
binlog, so which will cause problems if the user uses this value
629
log_pos+= data_written; /* purecov: inspected */
632
flags= uint2korr(buf + FLAGS_OFFSET);
633
if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
634
(buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
637
These events always have a header which stops here (i.e. their
641
Initialization to zero of all other Log_event members as they're
642
not specified. Currently there are no such members; in the future
643
there will be an event UID (but Format_description and Rotate
644
don't need this UID, as they are not propagated through
645
--log-slave-updates (remember the UID is used to not play a query
646
twice when you have two masters which are slaves of a 3rd master).
651
/* otherwise, go on with reading the header from buf (nothing now) */
655
#ifdef HAVE_REPLICATION
657
int Log_event::do_update_pos(Relay_log_info *rli)
660
rli is null when (as far as I (Guilhem) know) the caller is
661
Load_log_event::do_apply_event *and* that one is called from
662
Execute_load_log_event::do_apply_event. In this case, we don't
663
do anything here ; Execute_load_log_event::do_apply_event will
664
call Log_event::do_apply_event again later with the proper rli.
665
Strictly speaking, if we were sure that rli is null only in the
666
case discussed above, 'if (rli)' is useless here. But as we are
667
not 100% sure, keep it for now.
669
Matz: I don't think we will need this check with this refactoring.
674
bug#29309 simulation: resetting the flag to force
675
wrong behaviour of artificial event to update
676
rli->last_master_timestamp for only one time -
677
the first FLUSH LOGS in the test.
679
if (debug_not_change_ts_if_art_event == 1
680
&& is_artificial_event())
681
debug_not_change_ts_if_art_event= 0;
682
rli->stmt_done(log_pos,
683
is_artificial_event() &&
684
debug_not_change_ts_if_art_event > 0 ? 0 : when);
685
if (debug_not_change_ts_if_art_event == 0)
686
debug_not_change_ts_if_art_event= 2;
688
return 0; // Cannot fail currently
692
Log_event::enum_skip_reason
693
Log_event::do_shall_skip(Relay_log_info *rli)
695
if ((server_id == ::server_id && !rli->replicate_same_server_id) || (rli->slave_skip_counter == 1 && rli->is_in_group()))
696
return EVENT_SKIP_IGNORE;
697
else if (rli->slave_skip_counter > 0)
698
return EVENT_SKIP_COUNT;
700
return EVENT_SKIP_NOT;
705
Log_event::pack_info()
708
void Log_event::pack_info(Protocol *protocol)
710
protocol->store("", &my_charset_bin);
715
Only called by SHOW BINLOG EVENTS
717
int Log_event::net_send(Protocol *protocol, const char* log_name, my_off_t pos)
719
const char *p= strrchr(log_name, FN_LIBCHAR);
720
const char *event_type;
724
protocol->prepare_for_resend();
725
protocol->store(log_name, &my_charset_bin);
726
protocol->store((ulonglong) pos);
727
event_type = get_type_str();
728
protocol->store(event_type, strlen(event_type), &my_charset_bin);
729
protocol->store((uint32) server_id);
730
protocol->store((ulonglong) log_pos);
732
return protocol->write();
734
#endif /* HAVE_REPLICATION */
738
init_show_field_list() prepares the column names and types for the
739
output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
743
void Log_event::init_show_field_list(List<Item>* field_list)
745
field_list->push_back(new Item_empty_string("Log_name", 20));
746
field_list->push_back(new Item_return_int("Pos", MY_INT32_NUM_DECIMAL_DIGITS,
747
MYSQL_TYPE_LONGLONG));
748
field_list->push_back(new Item_empty_string("Event_type", 20));
749
field_list->push_back(new Item_return_int("Server_id", 10,
751
field_list->push_back(new Item_return_int("End_log_pos",
752
MY_INT32_NUM_DECIMAL_DIGITS,
753
MYSQL_TYPE_LONGLONG));
754
field_list->push_back(new Item_empty_string("Info", 20));
761
bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
763
uchar header[LOG_EVENT_HEADER_LEN];
766
/* Store number of bytes that will be written by this event */
767
data_written= event_data_length + sizeof(header);
770
log_pos != 0 if this is relay-log event. In this case we should not
774
if (is_artificial_event())
777
We should not do any cleanup on slave when reading this. We
778
mark this by setting log_pos to 0. Start_log_event_v3() will
779
detect this on reading and set artificial_event=1 for the event.
786
Calculate position of end of event
788
Note that with a SEQ_READ_APPEND cache, my_b_tell() does not
789
work well. So this will give slightly wrong positions for the
790
Format_desc/Rotate/Stop events which the slave writes to its
791
relay log. For example, the initial Format_desc will have
792
end_log_pos=91 instead of 95. Because after writing the first 4
793
bytes of the relay log, my_b_tell() still reports 0. Because
794
my_b_append() does not update the counter which my_b_tell()
795
later uses (one should probably use my_b_append_tell() to work
796
around this). To get right positions even when writing to the
797
relay log, we use the (new) my_b_safe_tell().
799
Note that this raises a question on the correctness of all these
800
assert(my_b_tell()=rli->event_relay_log_pos).
802
If in a transaction, the log_pos which we calculate below is not
803
very good (because then my_b_safe_tell() returns start position
804
of the BEGIN, so it's like the statement was at the BEGIN's
805
place), but it's not a very serious problem (as the slave, when
806
it is in a transaction, does not take those end_log_pos into
807
account (as it calls inc_event_relay_log_pos()). To be fixed
808
later, so that it looks less strange. But not bug.
811
log_pos= my_b_safe_tell(file)+data_written;
814
now= (ulong) get_time(); // Query start time
817
Header will be of size LOG_EVENT_HEADER_LEN for all events, except for
818
FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT, where it will be
819
LOG_EVENT_MINIMAL_HEADER_LEN (remember these 2 have a frozen header,
820
because we read them before knowing the format).
823
int4store(header, now); // timestamp
824
header[EVENT_TYPE_OFFSET]= get_type_code();
825
int4store(header+ SERVER_ID_OFFSET, server_id);
826
int4store(header+ EVENT_LEN_OFFSET, data_written);
827
int4store(header+ LOG_POS_OFFSET, log_pos);
828
int2store(header+ FLAGS_OFFSET, flags);
830
return(my_b_safe_write(file, header, sizeof(header)) != 0);
835
This needn't be format-tolerant, because we only read
836
LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length).
839
int Log_event::read_log_event(IO_CACHE* file, String* packet,
840
pthread_mutex_t* log_lock)
844
char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
847
pthread_mutex_lock(log_lock);
848
if (my_b_read(file, (uchar*) buf, sizeof(buf)))
851
If the read hits eof, we must report it as eof so the caller
852
will know it can go into cond_wait to be woken up on the next
856
result= LOG_READ_EOF;
858
result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
861
data_len= uint4korr(buf + EVENT_LEN_OFFSET);
862
if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
863
data_len > current_thd->variables.max_allowed_packet)
865
result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
870
/* Append the log event header to packet */
871
if (packet->append(buf, sizeof(buf)))
873
/* Failed to allocate packet */
874
result= LOG_READ_MEM;
877
data_len-= LOG_EVENT_MINIMAL_HEADER_LEN;
880
/* Append rest of event, read directly from file into packet */
881
if (packet->append(file, data_len))
884
Fatal error occured when appending rest of the event
885
to packet, possible failures:
886
1. EOF occured when reading from file, it's really an error
887
as data_len is >=0 there's supposed to be more bytes available.
888
file->error will have been set to number of bytes left to read
889
2. Read was interrupted, file->error would normally be set to -1
890
3. Failed to allocate memory for packet, my_errno
891
will be ENOMEM(file->error shuold be 0, but since the
892
memory allocation occurs before the call to read it might
895
result= (my_errno == ENOMEM ? LOG_READ_MEM :
896
(file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
897
/* Implicit goto end; */
903
pthread_mutex_unlock(log_lock);
906
#endif /* !MYSQL_CLIENT */
909
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
910
#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
919
Allocates memory; The caller is responsible for clean-up.
921
Log_event* Log_event::read_log_event(IO_CACHE* file,
922
pthread_mutex_t* log_lock,
923
const Format_description_log_event
926
Log_event* Log_event::read_log_event(IO_CACHE* file,
927
const Format_description_log_event
931
assert(description_event != 0);
932
char head[LOG_EVENT_MINIMAL_HEADER_LEN];
934
First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
935
check the event for sanity and to know its length; no need to really parse
936
it. We say "at most" because this could be a 3.23 master, which has header
937
of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's
938
"minimal" over the set {MySQL >=4.0}).
940
uint header_size= min(description_event->common_header_len,
941
LOG_EVENT_MINIMAL_HEADER_LEN);
944
if (my_b_read(file, (uchar *) head, header_size))
948
No error here; it could be that we are at the file's end. However
949
if the next my_b_read() fails (below), it will be an error as we
950
were able to read the first bytes.
954
uint data_len = uint4korr(head + EVENT_LEN_OFFSET);
956
const char *error= 0;
958
#ifndef max_allowed_packet
959
THD *thd=current_thd;
960
uint max_allowed_packet= thd ? thd->variables.max_allowed_packet : ~(ulong)0;
963
if (data_len > max_allowed_packet)
965
error = "Event too big";
969
if (data_len < header_size)
971
error = "Event too small";
975
// some events use the extra byte to null-terminate strings
976
if (!(buf = (char*) my_malloc(data_len+1, MYF(MY_WME))))
978
error = "Out of memory";
982
memcpy(buf, head, header_size);
983
if (my_b_read(file, (uchar*) buf + header_size, data_len - header_size))
985
error = "read error";
988
if ((res= read_log_event(buf, data_len, &error, description_event)))
989
res->register_temp_buf(buf);
996
sql_print_error("Error in Log_event::read_log_event(): "
997
"'%s', data_len: %d, event_type: %d",
998
error,data_len,head[EVENT_TYPE_OFFSET]);
999
my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
1001
The SQL slave thread will check if file->error<0 to know
1002
if there was an I/O error. Even if there is no "low-level" I/O errors
1003
with 'file', any of the high-level above errors is worrying
1004
enough to stop the SQL thread now ; as we are skipping the current event,
1005
going on with reading and successfully executing other events can
1006
only corrupt the slave's databases. So stop.
1015
Binlog format tolerance is in (buf, event_len, description_event)
1019
Log_event* Log_event::read_log_event(const char* buf, uint event_len,
1021
const Format_description_log_event *description_event)
1024
assert(description_event != 0);
1026
/* Check the integrity */
1027
if (event_len < EVENT_LEN_OFFSET ||
1028
buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
1029
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
1031
*error="Sanity check failed"; // Needed to free buffer
1032
return(NULL); // general sanity check - will fail on a partial read
1035
uint event_type= buf[EVENT_TYPE_OFFSET];
1036
if (event_type > description_event->number_of_event_types &&
1037
event_type != FORMAT_DESCRIPTION_EVENT)
1040
It is unsafe to use the description_event if its post_header_len
1041
array does not include the event type.
1048
In some previuos versions (see comment in
1049
Format_description_log_event::Format_description_log_event(char*,...)),
1050
event types were assigned different id numbers than in the
1051
present version. In order to replicate from such versions to the
1052
present version, we must map those event type id's to our event
1053
type id's. The mapping is done with the event_type_permutation
1054
array, which was set up when the Format_description_log_event
1057
if (description_event->event_type_permutation)
1058
event_type= description_event->event_type_permutation[event_type];
1060
switch(event_type) {
1062
ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
1065
ev = new Load_log_event(buf, event_len, description_event);
1067
case NEW_LOAD_EVENT:
1068
ev = new Load_log_event(buf, event_len, description_event);
1071
ev = new Rotate_log_event(buf, event_len, description_event);
1073
case SLAVE_EVENT: /* can never happen (unused event) */
1074
ev = new Slave_log_event(buf, event_len);
1076
case CREATE_FILE_EVENT:
1077
ev = new Create_file_log_event(buf, event_len, description_event);
1079
case APPEND_BLOCK_EVENT:
1080
ev = new Append_block_log_event(buf, event_len, description_event);
1082
case DELETE_FILE_EVENT:
1083
ev = new Delete_file_log_event(buf, event_len, description_event);
1085
case EXEC_LOAD_EVENT:
1086
ev = new Execute_load_log_event(buf, event_len, description_event);
1088
case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
1089
ev = new Start_log_event_v3(buf, description_event);
1092
ev = new Stop_log_event(buf, description_event);
1095
ev = new Intvar_log_event(buf, description_event);
1098
ev = new Xid_log_event(buf, description_event);
1101
ev = new Rand_log_event(buf, description_event);
1103
case USER_VAR_EVENT:
1104
ev = new User_var_log_event(buf, description_event);
1106
case FORMAT_DESCRIPTION_EVENT:
1107
ev = new Format_description_log_event(buf, event_len, description_event);
1109
#if defined(HAVE_REPLICATION)
1110
case WRITE_ROWS_EVENT:
1111
ev = new Write_rows_log_event(buf, event_len, description_event);
1113
case UPDATE_ROWS_EVENT:
1114
ev = new Update_rows_log_event(buf, event_len, description_event);
1116
case DELETE_ROWS_EVENT:
1117
ev = new Delete_rows_log_event(buf, event_len, description_event);
1119
case TABLE_MAP_EVENT:
1120
ev = new Table_map_log_event(buf, event_len, description_event);
1123
case BEGIN_LOAD_QUERY_EVENT:
1124
ev = new Begin_load_query_log_event(buf, event_len, description_event);
1126
case EXECUTE_LOAD_QUERY_EVENT:
1127
ev= new Execute_load_query_log_event(buf, event_len, description_event);
1129
case INCIDENT_EVENT:
1130
ev = new Incident_log_event(buf, event_len, description_event);
1139
is_valid() are small event-specific sanity tests which are
1140
important; for example there are some my_malloc() in constructors
1141
(e.g. Query_log_event::Query_log_event(char*...)); when these
1142
my_malloc() fail we can't return an error out of the constructor
1143
(because constructor is "void") ; so instead we leave the pointer we
1144
wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
1145
Same for Format_description_log_event, member 'post_header_len'.
1147
if (!ev || !ev->is_valid())
1151
if (!force_opt) /* then mysqlbinlog dies */
1153
*error= "Found invalid event in binary log";
1156
ev= new Unknown_log_event(buf, description_event);
1158
*error= "Found invalid event in binary log";
1168
Log_event::print_header()
1171
void Log_event::print_header(IO_CACHE* file,
1172
PRINT_EVENT_INFO* print_event_info,
1173
bool is_more __attribute__((unused)))
1176
my_off_t hexdump_from= print_event_info->hexdump_from;
1178
my_b_printf(file, "#");
1179
print_timestamp(file);
1180
my_b_printf(file, " server id %d end_log_pos %s ", server_id,
1181
llstr(log_pos,llbuff));
1183
/* mysqlbinlog --hexdump */
1184
if (print_event_info->hexdump_from)
1186
my_b_printf(file, "\n");
1187
uchar *ptr= (uchar*)temp_buf;
1189
uint4korr(ptr + EVENT_LEN_OFFSET) - LOG_EVENT_MINIMAL_HEADER_LEN;
1192
/* Header len * 4 >= header len * (2 chars + space + extra space) */
1193
char *h, hex_string[LOG_EVENT_MINIMAL_HEADER_LEN*4]= {0};
1194
char *c, char_string[16+1]= {0};
1196
/* Pretty-print event common header if header is exactly 19 bytes */
1197
if (print_event_info->common_header_len == LOG_EVENT_MINIMAL_HEADER_LEN)
1199
char emit_buf[256]; // Enough for storing one line
1200
my_b_printf(file, "# Position Timestamp Type Master ID "
1201
"Size Master Pos Flags \n");
1202
int const bytes_written=
1203
snprintf(emit_buf, sizeof(emit_buf),
1204
"# %8.8lx %02x %02x %02x %02x %02x "
1205
"%02x %02x %02x %02x %02x %02x %02x %02x "
1206
"%02x %02x %02x %02x %02x %02x\n",
1207
(unsigned long) hexdump_from,
1208
ptr[0], ptr[1], ptr[2], ptr[3], ptr[4], ptr[5], ptr[6],
1209
ptr[7], ptr[8], ptr[9], ptr[10], ptr[11], ptr[12], ptr[13],
1210
ptr[14], ptr[15], ptr[16], ptr[17], ptr[18]);
1211
assert(bytes_written >= 0);
1212
assert(static_cast<size_t>(bytes_written) < sizeof(emit_buf));
1213
my_b_write(file, (uchar*) emit_buf, bytes_written);
1214
ptr += LOG_EVENT_MINIMAL_HEADER_LEN;
1215
hexdump_from += LOG_EVENT_MINIMAL_HEADER_LEN;
1218
/* Rest of event (without common header) */
1219
for (i= 0, c= char_string, h=hex_string;
1223
snprintf(h, 4, "%02x ", *ptr);
1226
*c++= my_isalnum(&my_charset_bin, *ptr) ? *ptr : '.';
1231
my_b_printf() does not support full printf() formats, so we
1232
have to do it this way.
1234
TODO: Rewrite my_b_printf() to support full printf() syntax.
1237
int const bytes_written=
1238
snprintf(emit_buf, sizeof(emit_buf),
1239
"# %8.8lx %-48.48s |%16s|\n",
1240
(unsigned long) (hexdump_from + (i & 0xfffffff0)),
1241
hex_string, char_string);
1242
assert(bytes_written >= 0);
1243
assert(static_cast<size_t>(bytes_written) < sizeof(emit_buf));
1244
my_b_write(file, (uchar*) emit_buf, bytes_written);
1250
else if (i % 8 == 7) *h++ = ' ';
1257
int const bytes_written=
1258
snprintf(emit_buf, sizeof(emit_buf),
1259
"# %8.8lx %-48.48s |%s|\n",
1260
(unsigned long) (hexdump_from + (i & 0xfffffff0)),
1261
hex_string, char_string);
1262
assert(bytes_written >= 0);
1263
assert(static_cast<size_t>(bytes_written) < sizeof(emit_buf));
1264
my_b_write(file, (uchar*) emit_buf, bytes_written);
1267
need a # to prefix the rest of printouts for example those of
1268
Rows_log_event::print_helper().
1270
my_b_write(file, reinterpret_cast<const uchar*>("# "), 2);
1276
void Log_event::print_base64(IO_CACHE* file,
1277
PRINT_EVENT_INFO* print_event_info,
1280
const uchar *ptr= (const uchar *)temp_buf;
1281
uint32 size= uint4korr(ptr + EVENT_LEN_OFFSET);
1283
size_t const tmp_str_sz= base64_needed_encoded_length((int) size);
1284
char *const tmp_str= (char *) my_malloc(tmp_str_sz, MYF(MY_WME));
1286
fprintf(stderr, "\nError: Out of memory. "
1287
"Could not print correct binlog event.\n");
1291
if (base64_encode(ptr, (size_t) size, tmp_str))
1296
if (my_b_tell(file) == 0)
1297
my_b_printf(file, "\nBINLOG '\n");
1299
my_b_printf(file, "%s\n", tmp_str);
1302
my_b_printf(file, "'%s\n", print_event_info->delimiter);
1304
my_free(tmp_str, MYF(0));
1310
Log_event::print_timestamp()
1313
void Log_event::print_timestamp(IO_CACHE* file, time_t* ts)
1318
#ifdef MYSQL_SERVER // This is always false
1320
localtime_r(ts,(res= &tm_tmp));
1325
my_b_printf(file,"%02d%02d%02d %2d:%02d:%02d",
1335
#endif /* MYSQL_CLIENT */
1338
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
1339
inline Log_event::enum_skip_reason
1340
Log_event::continue_group(Relay_log_info *rli)
1342
if (rli->slave_skip_counter == 1)
1343
return Log_event::EVENT_SKIP_IGNORE;
1344
return Log_event::do_shall_skip(rli);
1348
/**************************************************************************
1349
Query_log_event methods
1350
**************************************************************************/
1352
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
1355
This (which is used only for SHOW BINLOG EVENTS) could be updated to
1356
print SET @@session_var=. But this is not urgent, as SHOW BINLOG EVENTS is
1357
only an information, it does not produce suitable queries to replay (for
1358
example it does not print LOAD DATA INFILE).
1363
void Query_log_event::pack_info(Protocol *protocol)
1365
// TODO: show the catalog ??
1367
if (!(buf= (char*) my_malloc(9 + db_len + q_len, MYF(MY_WME))))
1370
if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
1373
pos= strmov(buf, "use `");
1374
memcpy(pos, db, db_len);
1375
pos= strmov(pos+db_len, "`; ");
1379
memcpy(pos, query, q_len);
1382
protocol->store(buf, pos-buf, &my_charset_bin);
1383
my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
1387
#ifndef MYSQL_CLIENT
1390
Utility function for the next method (Query_log_event::write()) .
1392
static void write_str_with_code_and_len(char **dst, const char *src,
1397
*((*dst)++)= (uchar) len;
1398
bmove(*dst, src, len);
1404
Query_log_event::write().
1407
In this event we have to modify the header to have the correct
1408
EVENT_LEN_OFFSET as we don't yet know how many status variables we
1412
bool Query_log_event::write(IO_CACHE* file)
1415
@todo if catalog can be of length FN_REFLEN==512, then we are not
1416
replicating it correctly, since the length is stored in a byte
1419
uchar buf[QUERY_HEADER_LEN+
1420
1+4+ // code of flags2 and flags2
1421
1+8+ // code of sql_mode and sql_mode
1422
1+1+FN_REFLEN+ // code of catalog and catalog length and catalog
1423
1+4+ // code of autoinc and the 2 autoinc variables
1424
1+6+ // code of charset and charset
1425
1+1+MAX_TIME_ZONE_NAME_LENGTH+ // code of tz and tz length and tz name
1426
1+2+ // code of lc_time_names and lc_time_names_number
1427
1+2 // code of charset_database and charset_database_number
1428
], *start, *start_of_status;
1432
return 1; // Something wrong with event
1435
We want to store the thread id:
1436
(- as an information for the user when he reads the binlog)
1437
- if the query uses temporary table: for the slave SQL thread to know to
1438
which master connection the temp table belongs.
1439
Now imagine we (write()) are called by the slave SQL thread (we are
1440
logging a query executed by this thread; the slave runs with
1441
--log-slave-updates). Then this query will be logged with
1442
thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of
1443
the same name were created simultaneously on the master (in the master
1445
CREATE TEMPORARY TABLE t; (thread 1)
1446
CREATE TEMPORARY TABLE t; (thread 2)
1448
then in the slave's binlog there will be
1449
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1450
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1451
which is bad (same thread id!).
1453
To avoid this, we log the thread's thread id EXCEPT for the SQL
1454
slave thread for which we log the original (master's) thread id.
1455
Now this moves the bug: what happens if the thread id on the
1456
master was 10 and when the slave replicates the query, a
1457
connection number 10 is opened by a normal client on the slave,
1458
and updates a temp table of the same name? We get a problem
1459
again. To avoid this, in the handling of temp tables (sql_base.cc)
1460
we use thread_id AND server_id. TODO when this is merged into
1461
4.1: in 4.1, slave_proxy_id has been renamed to pseudo_thread_id
1462
and is a session variable: that's to make mysqlbinlog work with
1463
temp tables. We probably need to introduce
1465
SET PSEUDO_SERVER_ID
1466
for mysqlbinlog in 4.1. mysqlbinlog would print:
1467
SET PSEUDO_SERVER_ID=
1468
SET PSEUDO_THREAD_ID=
1469
for each query using temp tables.
1471
int4store(buf + Q_THREAD_ID_OFFSET, slave_proxy_id);
1472
int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
1473
buf[Q_DB_LEN_OFFSET] = (char) db_len;
1474
int2store(buf + Q_ERR_CODE_OFFSET, error_code);
1477
You MUST always write status vars in increasing order of code. This
1478
guarantees that a slightly older slave will be able to parse those he
1481
start_of_status= start= buf+QUERY_HEADER_LEN;
1484
*start++= Q_FLAGS2_CODE;
1485
int4store(start, flags2);
1488
if (sql_mode_inited)
1490
*start++= Q_SQL_MODE_CODE;
1491
int8store(start, (ulonglong)sql_mode);
1494
if (catalog_len) // i.e. this var is inited (false for 4.0 events)
1496
write_str_with_code_and_len((char **)(&start),
1497
catalog, catalog_len, Q_CATALOG_NZ_CODE);
1499
In 5.0.x where x<4 masters we used to store the end zero here. This was
1500
a waste of one byte so we don't do it in x>=4 masters. We change code to
1501
Q_CATALOG_NZ_CODE, because re-using the old code would make x<4 slaves
1502
of this x>=4 master segfault (expecting a zero when there is
1503
none). Remaining compatibility problems are: the older slave will not
1504
find the catalog; but it is will not crash, and it's not an issue
1505
that it does not find the catalog as catalogs were not used in these
1506
older MySQL versions (we store it in binlog and read it from relay log
1507
but do nothing useful with it). What is an issue is that the older slave
1508
will stop processing the Q_* blocks (and jumps to the db/query) as soon
1509
as it sees unknown Q_CATALOG_NZ_CODE; so it will not be able to read
1510
Q_AUTO_INCREMENT*, Q_CHARSET and so replication will fail silently in
1511
various ways. Documented that you should not mix alpha/beta versions if
1512
they are not exactly the same version, with example of 5.0.3->5.0.2 and
1513
5.0.4->5.0.3. If replication is from older to new, the new will
1514
recognize Q_CATALOG_CODE and have no problem.
1517
if (auto_increment_increment != 1 || auto_increment_offset != 1)
1519
*start++= Q_AUTO_INCREMENT;
1520
int2store(start, auto_increment_increment);
1521
int2store(start+2, auto_increment_offset);
1526
*start++= Q_CHARSET_CODE;
1527
memcpy(start, charset, 6);
1532
/* In the TZ sys table, column Name is of length 64 so this should be ok */
1533
assert(time_zone_len <= MAX_TIME_ZONE_NAME_LENGTH);
1534
*start++= Q_TIME_ZONE_CODE;
1535
*start++= time_zone_len;
1536
memcpy(start, time_zone_str, time_zone_len);
1537
start+= time_zone_len;
1539
if (lc_time_names_number)
1541
assert(lc_time_names_number <= 0xFFFF);
1542
*start++= Q_LC_TIME_NAMES_CODE;
1543
int2store(start, lc_time_names_number);
1546
if (charset_database_number)
1548
assert(charset_database_number <= 0xFFFF);
1549
*start++= Q_CHARSET_DATABASE_CODE;
1550
int2store(start, charset_database_number);
1554
Here there could be code like
1555
if (command-line-option-which-says-"log_this_variable" && inited)
1557
*start++= Q_THIS_VARIABLE_CODE;
1558
int4store(start, this_variable);
1563
/* Store length of status variables */
1564
status_vars_len= (uint) (start-start_of_status);
1565
assert(status_vars_len <= MAX_SIZE_LOG_EVENT_STATUS);
1566
int2store(buf + Q_STATUS_VARS_LEN_OFFSET, status_vars_len);
1569
Calculate length of whole event
1570
The "1" below is the \0 in the db's length
1572
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
1574
return (write_header(file, event_length) ||
1575
my_b_safe_write(file, (uchar*) buf, QUERY_HEADER_LEN) ||
1576
write_post_header_for_derived(file) ||
1577
my_b_safe_write(file, (uchar*) start_of_status,
1578
(uint) (start-start_of_status)) ||
1579
my_b_safe_write(file, (db) ? (uchar*) db : (uchar*)"", db_len + 1) ||
1580
my_b_safe_write(file, (uchar*) query, q_len)) ? 1 : 0;
1584
The simplest constructor that could possibly work. This is used for
1585
creating static objects that have a special meaning and are invisible
1588
Query_log_event::Query_log_event()
1589
:Log_event(), data_buf(0)
1596
Query_log_event::Query_log_event()
1597
thd_arg - thread handle
1598
query_arg - array of char representing the query
1599
query_length - size of the `query_arg' array
1600
using_trans - there is a modified transactional table
1601
suppress_use - suppress the generation of 'USE' statements
1602
killed_status_arg - an optional with default to THD::KILLED_NO_VALUE
1603
if the value is different from the default, the arg
1604
is set to the current thd->killed value.
1605
A caller might need to masquerade thd->killed with
1608
Creates an event for binlogging
1609
The value for local `killed_status' can be supplied by caller.
1611
Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
1612
ulong query_length, bool using_trans,
1614
THD::killed_state killed_status_arg)
1616
(thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
1618
(suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1620
data_buf(0), query(query_arg), catalog(thd_arg->catalog),
1621
db(thd_arg->db), q_len((uint32) query_length),
1622
thread_id(thd_arg->thread_id),
1623
/* save the original thread id; we already know the server id */
1624
slave_proxy_id(thd_arg->variables.pseudo_thread_id),
1625
flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1627
auto_increment_increment(thd_arg->variables.auto_increment_increment),
1628
auto_increment_offset(thd_arg->variables.auto_increment_offset),
1629
lc_time_names_number(thd_arg->variables.lc_time_names->number),
1630
charset_database_number(0)
1634
if (killed_status_arg == THD::KILLED_NO_VALUE)
1635
killed_status_arg= thd_arg->killed;
1638
(killed_status_arg == THD::NOT_KILLED) ?
1639
(thd_arg->is_error() ? thd_arg->main_da.sql_errno() : 0) :
1640
(thd_arg->killed_errno());
1643
exec_time = (ulong) (end_time - thd_arg->start_time);
1645
@todo this means that if we have no catalog, then it is replicated
1646
as an existing catalog of length zero. is that safe? /sven
1648
catalog_len = (catalog) ? (uint32) strlen(catalog) : 0;
1649
/* status_vars_len is set just before writing the event */
1650
db_len = (db) ? (uint32) strlen(db) : 0;
1651
if (thd_arg->variables.collation_database != thd_arg->db_charset)
1652
charset_database_number= thd_arg->variables.collation_database->number;
1655
If we don't use flags2 for anything else than options contained in
1656
thd_arg->options, it would be more efficient to flags2=thd_arg->options
1657
(OPTIONS_WRITTEN_TO_BIN_LOG would be used only at reading time).
1658
But it's likely that we don't want to use 32 bits for 3 bits; in the future
1659
we will probably want to reclaim the 29 bits. So we need the &.
1661
flags2= (uint32) (thd_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
1662
assert(thd_arg->variables.character_set_client->number < 256*256);
1663
assert(thd_arg->variables.collation_connection->number < 256*256);
1664
assert(thd_arg->variables.collation_server->number < 256*256);
1665
assert(thd_arg->variables.character_set_client->mbminlen == 1);
1666
int2store(charset, thd_arg->variables.character_set_client->number);
1667
int2store(charset+2, thd_arg->variables.collation_connection->number);
1668
int2store(charset+4, thd_arg->variables.collation_server->number);
1669
if (thd_arg->time_zone_used)
1672
Note that our event becomes dependent on the Time_zone object
1673
representing the time zone. Fortunately such objects are never deleted
1674
or changed during mysqld's lifetime.
1676
time_zone_len= thd_arg->variables.time_zone->get_name()->length();
1677
time_zone_str= thd_arg->variables.time_zone->get_name()->ptr();
1682
#endif /* MYSQL_CLIENT */
1685
/* 2 utility functions for the next method */
1688
Read a string with length from memory.
1690
This function reads the string-with-length stored at
1691
<code>src</code> and extract the length into <code>*len</code> and
1692
a pointer to the start of the string into <code>*dst</code>. The
1693
string can then be copied using <code>memcpy()</code> with the
1694
number of bytes given in <code>*len</code>.
1696
@param src Pointer to variable holding a pointer to the memory to
1697
read the string from.
1698
@param dst Pointer to variable holding a pointer where the actual
1699
string starts. Starting from this position, the string
1700
can be copied using @c memcpy().
1701
@param len Pointer to variable where the length will be stored.
1702
@param end One-past-the-end of the memory where the string is
1705
@return Zero if the entire string can be copied successfully,
1706
@c UINT_MAX if the length could not be read from memory
1707
(that is, if <code>*src >= end</code>), otherwise the
1708
number of bytes that are missing to read the full
1709
string, which happends <code>*dst + *len >= end</code>.
1712
get_str_len_and_pointer(const Log_event::Byte **src,
1715
const Log_event::Byte *end)
1718
return -1; // Will be UINT_MAX in two-complement arithmetics
1722
if (*src + length >= end)
1723
return *src + length - end + 1; // Number of bytes missing
1724
*dst= (char *)*src + 1; // Will be copied later
1731
static void copy_str_and_move(const char **src,
1732
Log_event::Byte **dst,
1735
memcpy(*dst, *src, len);
1736
*src= (const char *)*dst;
1743
Macro to check that there is enough space to read from memory.
1745
@param PTR Pointer to memory
1746
@param END End of memory
1747
@param CNT Number of bytes that should be read.
1749
#define CHECK_SPACE(PTR,END,CNT) \
1751
assert((PTR) + (CNT) <= (END)); \
1752
if ((PTR) + (CNT) > (END)) { \
1760
This is used by the SQL slave thread to prepare the event before execution.
1762
Query_log_event::Query_log_event(const char* buf, uint event_len,
1763
const Format_description_log_event
1765
Log_event_type event_type)
1766
:Log_event(buf, description_event), data_buf(0), query(NullS),
1767
db(NullS), catalog_len(0), status_vars_len(0),
1768
flags2_inited(0), sql_mode_inited(0), charset_inited(0),
1769
auto_increment_increment(1), auto_increment_offset(1),
1770
time_zone_len(0), lc_time_names_number(0), charset_database_number(0)
1774
uint8 common_header_len, post_header_len;
1775
Log_event::Byte *start;
1776
const Log_event::Byte *end;
1779
common_header_len= description_event->common_header_len;
1780
post_header_len= description_event->post_header_len[event_type-1];
1783
We test if the event's length is sensible, and if so we compute data_len.
1784
We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
1785
We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
1787
if (event_len < (uint)(common_header_len + post_header_len))
1789
data_len = event_len - (common_header_len + post_header_len);
1790
buf+= common_header_len;
1792
slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
1793
exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
1794
db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
1795
error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
1798
5.0 format starts here.
1799
Depending on the format, we may or not have affected/warnings etc
1800
The remnent post-header to be parsed has length:
1802
tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
1805
status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
1807
Check if status variable length is corrupt and will lead to very
1808
wrong data. We could be even more strict and require data_len to
1809
be even bigger, but this will suffice to catch most corruption
1810
errors that can lead to a crash.
1812
if (status_vars_len > min(data_len, MAX_SIZE_LOG_EVENT_STATUS))
1817
data_len-= status_vars_len;
1821
We have parsed everything we know in the post header for QUERY_EVENT,
1822
the rest of post header is either comes from older version MySQL or
1823
dedicated to derived events (e.g. Execute_load_query...)
1826
/* variable-part: the status vars; only in MySQL 5.0 */
1828
start= (Log_event::Byte*) (buf+post_header_len);
1829
end= (const Log_event::Byte*) (start+status_vars_len);
1830
for (const Log_event::Byte* pos= start; pos < end;)
1834
CHECK_SPACE(pos, end, 4);
1836
flags2= uint4korr(pos);
1839
case Q_SQL_MODE_CODE:
1841
CHECK_SPACE(pos, end, 8);
1843
sql_mode= (ulong) uint8korr(pos); // QQ: Fix when sql_mode is ulonglong
1847
case Q_CATALOG_NZ_CODE:
1848
if (get_str_len_and_pointer(&pos, &catalog, &catalog_len, end))
1854
case Q_AUTO_INCREMENT:
1855
CHECK_SPACE(pos, end, 4);
1856
auto_increment_increment= uint2korr(pos);
1857
auto_increment_offset= uint2korr(pos+2);
1860
case Q_CHARSET_CODE:
1862
CHECK_SPACE(pos, end, 6);
1864
memcpy(charset, pos, 6);
1868
case Q_TIME_ZONE_CODE:
1870
if (get_str_len_and_pointer(&pos, &time_zone_str, &time_zone_len, end))
1877
case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
1878
CHECK_SPACE(pos, end, 1);
1879
if ((catalog_len= *pos))
1880
catalog= (char*) pos+1; // Will be copied later
1881
CHECK_SPACE(pos, end, catalog_len + 2);
1882
pos+= catalog_len+2; // leap over end 0
1883
catalog_nz= 0; // catalog has end 0 in event
1885
case Q_LC_TIME_NAMES_CODE:
1886
CHECK_SPACE(pos, end, 2);
1887
lc_time_names_number= uint2korr(pos);
1890
case Q_CHARSET_DATABASE_CODE:
1891
CHECK_SPACE(pos, end, 2);
1892
charset_database_number= uint2korr(pos);
1896
/* That's why you must write status vars in growing order of code */
1897
pos= (const uchar*) end; // Break loop
1901
if (!(start= data_buf = (Log_event::Byte*) my_malloc(catalog_len + 1 +
1906
if (catalog_len) // If catalog is given
1909
@todo we should clean up and do only copy_str_and_move; it
1910
works for both cases. Then we can remove the catalog_nz
1913
if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
1914
copy_str_and_move(&catalog, &start, catalog_len);
1917
memcpy(start, catalog, catalog_len+1); // copy end 0
1918
catalog= (const char *)start;
1919
start+= catalog_len+1;
1923
copy_str_and_move(&time_zone_str, &start, time_zone_len);
1926
if time_zone_len or catalog_len are 0, then time_zone and catalog
1927
are uninitialized at this point. shouldn't they point to the
1928
zero-length null-terminated strings we allocated space for in the
1929
my_alloc call above? /sven
1932
/* A 2nd variable part; this is common to all versions */
1933
memcpy((char*) start, end, data_len); // Copy db and query
1934
start[data_len]= '\0'; // End query with \0 (For safetly)
1936
query= (char *)(start + db_len + 1);
1937
q_len= data_len - db_len -1;
1944
Query_log_event::print().
1947
print the catalog ??
1949
void Query_log_event::print_query_header(IO_CACHE* file,
1950
PRINT_EVENT_INFO* print_event_info)
1952
// TODO: print the catalog ??
1953
char buff[40],*end; // Enough for SET TIMESTAMP
1954
bool different_db= 1;
1957
if (!print_event_info->short_form)
1959
print_header(file, print_event_info, false);
1960
my_b_printf(file, "\t%s\tthread_id=%lu\texec_time=%lu\terror_code=%d\n",
1961
get_type_str(), (ulong) thread_id, (ulong) exec_time,
1965
if (!(flags & LOG_EVENT_SUPPRESS_USE_F) && db)
1967
if ((different_db= memcmp(print_event_info->db, db, db_len + 1)))
1968
memcpy(print_event_info->db, db, db_len + 1);
1969
if (db[0] && different_db)
1970
my_b_printf(file, "use %s%s\n", db, print_event_info->delimiter);
1973
end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10);
1974
end= strmov(end, print_event_info->delimiter);
1976
my_b_write(file, (uchar*) buff, (uint) (end-buff));
1977
if ((!print_event_info->thread_id_printed ||
1978
((flags & LOG_EVENT_THREAD_SPECIFIC_F) &&
1979
thread_id != print_event_info->thread_id)))
1981
// If --short-form, print deterministic value instead of pseudo_thread_id.
1982
my_b_printf(file,"SET @@session.pseudo_thread_id=%lu%s\n",
1983
short_form ? 999999999 : (ulong)thread_id,
1984
print_event_info->delimiter);
1985
print_event_info->thread_id= thread_id;
1986
print_event_info->thread_id_printed= 1;
1990
If flags2_inited==0, this is an event from 3.23 or 4.0; nothing to
1991
print (remember we don't produce mixed relay logs so there cannot be
1992
5.0 events before that one so there is nothing to reset).
1994
if (likely(flags2_inited)) /* likely as this will mainly read 5.0 logs */
1996
/* tmp is a bitmask of bits which have changed. */
1997
if (likely(print_event_info->flags2_inited))
1998
/* All bits which have changed */
1999
tmp= (print_event_info->flags2) ^ flags2;
2000
else /* that's the first Query event we read */
2002
print_event_info->flags2_inited= 1;
2003
tmp= ~((uint32)0); /* all bits have changed */
2006
if (unlikely(tmp)) /* some bits have changed */
2009
my_b_printf(file, "SET ");
2010
print_set_option(file, tmp, OPTION_NO_FOREIGN_KEY_CHECKS, ~flags2,
2011
"@@session.foreign_key_checks", &need_comma);
2012
print_set_option(file, tmp, OPTION_AUTO_IS_NULL, flags2,
2013
"@@session.sql_auto_is_null", &need_comma);
2014
print_set_option(file, tmp, OPTION_RELAXED_UNIQUE_CHECKS, ~flags2,
2015
"@@session.unique_checks", &need_comma);
2016
my_b_printf(file,"%s\n", print_event_info->delimiter);
2017
print_event_info->flags2= flags2;
2022
Now the session variables;
2023
it's more efficient to pass SQL_MODE as a number instead of a
2024
comma-separated list.
2025
FOREIGN_KEY_CHECKS, SQL_AUTO_IS_NULL, UNIQUE_CHECKS are session-only
2026
variables (they have no global version; they're not listed in
2027
sql_class.h), The tests below work for pure binlogs or pure relay
2028
logs. Won't work for mixed relay logs but we don't create mixed
2029
relay logs (that is, there is no relay log with a format change
2030
except within the 3 first events, which mysqlbinlog handles
2031
gracefully). So this code should always be good.
2034
if (print_event_info->auto_increment_increment != auto_increment_increment ||
2035
print_event_info->auto_increment_offset != auto_increment_offset)
2037
my_b_printf(file,"SET @@session.auto_increment_increment=%lu, @@session.auto_increment_offset=%lu%s\n",
2038
auto_increment_increment,auto_increment_offset,
2039
print_event_info->delimiter);
2040
print_event_info->auto_increment_increment= auto_increment_increment;
2041
print_event_info->auto_increment_offset= auto_increment_offset;
2044
/* TODO: print the catalog when we feature SET CATALOG */
2046
if (likely(charset_inited) &&
2047
(unlikely(!print_event_info->charset_inited ||
2048
bcmp((uchar*) print_event_info->charset, (uchar*) charset, 6))))
2050
CHARSET_INFO *cs_info= get_charset(uint2korr(charset), MYF(MY_WME));
2053
/* for mysql client */
2054
my_b_printf(file, "/*!\\C %s */%s\n",
2055
cs_info->csname, print_event_info->delimiter);
2057
my_b_printf(file,"SET "
2058
"@@session.character_set_client=%d,"
2059
"@@session.collation_connection=%d,"
2060
"@@session.collation_server=%d"
2063
uint2korr(charset+2),
2064
uint2korr(charset+4),
2065
print_event_info->delimiter);
2066
memcpy(print_event_info->charset, charset, 6);
2067
print_event_info->charset_inited= 1;
2071
if (bcmp((uchar*) print_event_info->time_zone_str,
2072
(uchar*) time_zone_str, time_zone_len+1))
2074
my_b_printf(file,"SET @@session.time_zone='%s'%s\n",
2075
time_zone_str, print_event_info->delimiter);
2076
memcpy(print_event_info->time_zone_str, time_zone_str, time_zone_len+1);
2079
if (lc_time_names_number != print_event_info->lc_time_names_number)
2081
my_b_printf(file, "SET @@session.lc_time_names=%d%s\n",
2082
lc_time_names_number, print_event_info->delimiter);
2083
print_event_info->lc_time_names_number= lc_time_names_number;
2085
if (charset_database_number != print_event_info->charset_database_number)
2087
if (charset_database_number)
2088
my_b_printf(file, "SET @@session.collation_database=%d%s\n",
2089
charset_database_number, print_event_info->delimiter);
2091
my_b_printf(file, "SET @@session.collation_database=DEFAULT%s\n",
2092
print_event_info->delimiter);
2093
print_event_info->charset_database_number= charset_database_number;
2098
void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
2100
Write_on_release_cache cache(&print_event_info->head_cache, file);
2102
print_query_header(&cache, print_event_info);
2103
my_b_write(&cache, (uchar*) query, q_len);
2104
my_b_printf(&cache, "\n%s\n", print_event_info->delimiter);
2106
#endif /* MYSQL_CLIENT */
2110
Query_log_event::do_apply_event()
2113
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
2115
int Query_log_event::do_apply_event(Relay_log_info const *rli)
2117
return do_apply_event(rli, query, q_len);
2123
Compare the values of "affected rows" around here. Something
2126
if ((uint32) affected_in_event != (uint32) affected_on_slave)
2128
sql_print_error("Slave: did not get the expected number of affected \
2129
rows running query from master - expected %d, got %d (this numbers \
2130
should have matched modulo 4294967296).", 0, ...);
2131
thd->query_error = 1;
2134
We may also want an option to tell the slave to ignore "affected"
2135
mismatch. This mismatch could be implemented with a new ER_ code, and
2136
to ignore it you would use --slave-skip-errors...
2138
int Query_log_event::do_apply_event(Relay_log_info const *rli,
2139
const char *query_arg, uint32 q_len_arg)
2142
int expected_error,actual_error= 0;
2144
Colleagues: please never free(thd->catalog) in MySQL. This would
2145
lead to bugs as here thd->catalog is a part of an alloced block,
2146
not an entire alloced block (see
2147
Query_log_event::do_apply_event()). Same for thd->db. Thank
2150
thd->catalog= catalog_len ? (char *) catalog : (char *)"";
2151
new_db.length= db_len;
2152
new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
2153
thd->set_db(new_db.str, new_db.length); /* allocates a copy of 'db' */
2154
thd->variables.auto_increment_increment= auto_increment_increment;
2155
thd->variables.auto_increment_offset= auto_increment_offset;
2158
InnoDB internally stores the master log position it has executed so far,
2159
i.e. the position just after the COMMIT event.
2160
When InnoDB will want to store, the positions in rli won't have
2161
been updated yet, so group_master_log_* will point to old BEGIN
2162
and event_master_log* will point to the beginning of current COMMIT.
2163
But log_pos of the COMMIT Query event is what we want, i.e. the pos of the
2164
END of the current log event (COMMIT). We save it in rli so that InnoDB can
2167
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
2169
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
2170
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
2173
Note: We do not need to execute reset_one_shot_variables() if this
2175
Reason: The db stored in binlog events is the same for SET and for
2176
its companion query. If the SET is ignored because of
2177
db_ok(), the companion query will also be ignored, and if
2178
the companion query is ignored in the db_ok() test of
2179
::do_apply_event(), then the companion SET also have so
2180
we don't need to reset_one_shot_variables().
2182
if (rpl_filter->db_ok(thd->db))
2184
thd->set_time((time_t)when);
2185
thd->query_length= q_len_arg;
2186
thd->query= (char*)query_arg;
2187
VOID(pthread_mutex_lock(&LOCK_thread_count));
2188
thd->query_id = next_query_id();
2189
VOID(pthread_mutex_unlock(&LOCK_thread_count));
2190
thd->variables.pseudo_thread_id= thread_id; // for temp tables
2192
if (ignored_error_code((expected_error= error_code)) ||
2193
!check_expected_error(thd,rli,expected_error))
2197
all bits of thd->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
2198
must take their value from flags2.
2200
thd->options= flags2|(thd->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
2202
else, we are in a 3.23/4.0 binlog; we previously received a
2203
Rotate_log_event which reset thd->options and sql_mode etc, so
2208
if (rli->cached_charset_compare(charset))
2210
/* Verify that we support the charsets found in the event. */
2211
if (!(thd->variables.character_set_client=
2212
get_charset(uint2korr(charset), MYF(MY_WME))) ||
2213
!(thd->variables.collation_connection=
2214
get_charset(uint2korr(charset+2), MYF(MY_WME))) ||
2215
!(thd->variables.collation_server=
2216
get_charset(uint2korr(charset+4), MYF(MY_WME))))
2219
We updated the thd->variables with nonsensical values (0). Let's
2220
set them to something safe (i.e. which avoids crash), and we'll
2221
stop with EE_UNKNOWN_CHARSET in compare_errors (unless set to
2224
set_slave_thread_default_charset(thd, rli);
2225
goto compare_errors;
2227
thd->update_charset(); // for the charset change to take effect
2232
String tmp(time_zone_str, time_zone_len, &my_charset_bin);
2233
if (!(thd->variables.time_zone= my_tz_find(thd, &tmp)))
2235
my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr());
2236
thd->variables.time_zone= global_system_variables.time_zone;
2237
goto compare_errors;
2240
if (lc_time_names_number)
2242
if (!(thd->variables.lc_time_names=
2243
my_locale_by_number(lc_time_names_number)))
2245
my_printf_error(ER_UNKNOWN_ERROR,
2246
"Unknown locale: '%d'", MYF(0), lc_time_names_number);
2247
thd->variables.lc_time_names= &my_locale_en_US;
2248
goto compare_errors;
2252
thd->variables.lc_time_names= &my_locale_en_US;
2253
if (charset_database_number)
2256
if (!(cs= get_charset(charset_database_number, MYF(0))))
2259
int10_to_str((int) charset_database_number, buf, -10);
2260
my_error(ER_UNKNOWN_COLLATION, MYF(0), buf);
2261
goto compare_errors;
2263
thd->variables.collation_database= cs;
2266
thd->variables.collation_database= thd->db_charset;
2268
/* Execute the query (note that we bypass dispatch_command()) */
2269
const char* found_semicolon= NULL;
2270
mysql_parse(thd, thd->query, thd->query_length, &found_semicolon);
2271
log_slow_statement(thd);
2276
The query got a really bad error on the master (thread killed etc),
2277
which could be inconsistent. Parse it to test the table names: if the
2278
replicate-*-do|ignore-table rules say "this query must be ignored" then
2279
we exit gracefully; otherwise we warn about the bad error and tell DBA
2282
if (mysql_test_parse_for_slave(thd, thd->query, thd->query_length))
2283
clear_all_errors(thd, const_cast<Relay_log_info*>(rli)); /* Can ignore query */
2286
rli->report(ERROR_LEVEL, expected_error,
2288
Query partially completed on the master (error on master: %d) \
2289
and was aborted. There is a chance that your master is inconsistent at this \
2290
point. If you are sure that your master is ok, run this query manually on the \
2291
slave and then restart the slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1; \
2292
START SLAVE; . Query: '%s'", expected_error, thd->query);
2293
thd->is_slave_error= 1;
2298
/* If the query was not ignored, it is printed to the general log */
2299
if (!thd->is_error() || thd->main_da.sql_errno() != ER_SLAVE_IGNORED_TABLE)
2300
general_log_write(thd, COM_QUERY, thd->query, thd->query_length);
2305
If we expected a non-zero error code, and we don't get the same error
2306
code, and none of them should be ignored.
2308
actual_error= thd->is_error() ? thd->main_da.sql_errno() : 0;
2309
if ((expected_error != actual_error) &&
2311
!ignored_error_code(actual_error) &&
2312
!ignored_error_code(expected_error))
2314
rli->report(ERROR_LEVEL, 0,
2316
Query caused different errors on master and slave. \
2317
Error on master: '%s' (%d), Error on slave: '%s' (%d). \
2318
Default database: '%s'. Query: '%s'",
2319
ER_SAFE(expected_error),
2321
actual_error ? thd->main_da.message() : "no error",
2323
print_slave_db_safe(db), query_arg);
2324
thd->is_slave_error= 1;
2327
If we get the same error code as expected, or they should be ignored.
2329
else if (expected_error == actual_error ||
2330
ignored_error_code(actual_error))
2332
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
2333
thd->killed= THD::NOT_KILLED;
2336
Other cases: mostly we expected no error and get one.
2338
else if (thd->is_slave_error || thd->is_fatal_error)
2340
rli->report(ERROR_LEVEL, actual_error,
2341
"Error '%s' on query. Default database: '%s'. Query: '%s'",
2342
(actual_error ? thd->main_da.message() :
2343
"unexpected success or fatal error"),
2344
print_slave_db_safe(thd->db), query_arg);
2345
thd->is_slave_error= 1;
2349
TODO: compare the values of "affected rows" around here. Something
2351
if ((uint32) affected_in_event != (uint32) affected_on_slave)
2353
sql_print_error("Slave: did not get the expected number of affected \
2354
rows running query from master - expected %d, got %d (this numbers \
2355
should have matched modulo 4294967296).", 0, ...);
2356
thd->is_slave_error = 1;
2358
We may also want an option to tell the slave to ignore "affected"
2359
mismatch. This mismatch could be implemented with a new ER_ code, and
2360
to ignore it you would use --slave-skip-errors...
2362
To do the comparison we need to know the value of "affected" which the
2363
above mysql_parse() computed. And we need to know the value of
2364
"affected" in the master's binlog. Both will be implemented later. The
2365
important thing is that we now have the format ready to log the values
2366
of "affected" in the binlog. So we can release 5.0.0 before effectively
2367
logging "affected" and effectively comparing it.
2369
} /* End of if (db_ok(... */
2372
VOID(pthread_mutex_lock(&LOCK_thread_count));
2374
Probably we have set thd->query, thd->db, thd->catalog to point to places
2375
in the data_buf of this event. Now the event is going to be deleted
2376
probably, so data_buf will be freed, so the thd->... listed above will be
2377
pointers to freed memory.
2378
So we must set them to 0, so that those bad pointers values are not later
2379
used. Note that "cleanup" queries like automatic DROP TEMPORARY TABLE
2380
don't suffer from these assignments to 0 as DROP TEMPORARY
2381
TABLE uses the db.table syntax.
2384
thd->set_db(NULL, 0); /* will free the current database */
2385
thd->query= 0; // just to be sure
2386
thd->query_length= 0;
2387
VOID(pthread_mutex_unlock(&LOCK_thread_count));
2388
close_thread_tables(thd);
2390
As a disk space optimization, future masters will not log an event for
2391
LAST_INSERT_ID() if that function returned 0 (and thus they will be able
2392
to replace the THD::stmt_depends_on_first_successful_insert_id_in_prev_stmt
2393
variable by (THD->first_successful_insert_id_in_prev_stmt > 0) ; with the
2394
resetting below we are ready to support that.
2396
thd->first_successful_insert_id_in_prev_stmt_for_binlog= 0;
2397
thd->first_successful_insert_id_in_prev_stmt= 0;
2398
thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
2399
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
2400
return thd->is_slave_error;
2403
int Query_log_event::do_update_pos(Relay_log_info *rli)
2406
Note that we will not increment group* positions if we are just
2407
after a SET ONE_SHOT, because SET ONE_SHOT should not be separated
2408
from its following updating query.
2410
if (thd->one_shot_set)
2412
rli->inc_event_relay_log_pos();
2416
return Log_event::do_update_pos(rli);
2420
Log_event::enum_skip_reason
2421
Query_log_event::do_shall_skip(Relay_log_info *rli)
2423
assert(query && q_len > 0);
2425
if (rli->slave_skip_counter > 0)
2427
if (strcmp("BEGIN", query) == 0)
2429
thd->options|= OPTION_BEGIN;
2430
return(Log_event::continue_group(rli));
2433
if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
2435
thd->options&= ~OPTION_BEGIN;
2436
return(Log_event::EVENT_SKIP_COUNT);
2439
return(Log_event::do_shall_skip(rli));
2445
/**************************************************************************
2446
Start_log_event_v3 methods
2447
**************************************************************************/
2449
#ifndef MYSQL_CLIENT
2450
Start_log_event_v3::Start_log_event_v3()
2451
:Log_event(), created(0), binlog_version(BINLOG_VERSION),
2452
artificial_event(0), dont_set_created(0)
2454
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
2459
Start_log_event_v3::pack_info()
2462
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
2463
void Start_log_event_v3::pack_info(Protocol *protocol)
2465
char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
2466
pos= strmov(buf, "Server ver: ");
2467
pos= strmov(pos, server_version);
2468
pos= strmov(pos, ", Binlog ver: ");
2469
pos= int10_to_str(binlog_version, pos, 10);
2470
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
2476
Start_log_event_v3::print()
2480
void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
2482
Write_on_release_cache cache(&print_event_info->head_cache, file,
2483
Write_on_release_cache::FLUSH_F);
2485
if (!print_event_info->short_form)
2487
print_header(&cache, print_event_info, false);
2488
my_b_printf(&cache, "\tStart: binlog v %d, server v %s created ",
2489
binlog_version, server_version);
2490
print_timestamp(&cache);
2492
my_b_printf(&cache," at startup");
2493
my_b_printf(&cache, "\n");
2494
if (flags & LOG_EVENT_BINLOG_IN_USE_F)
2495
my_b_printf(&cache, "# Warning: this binlog was not closed properly. "
2496
"Most probably mysqld crashed writing it.\n");
2498
if (!artificial_event && created)
2500
#ifdef WHEN_WE_HAVE_THE_RESET_CONNECTION_SQL_COMMAND
2502
This is for mysqlbinlog: like in replication, we want to delete the stale
2503
tmp files left by an unclean shutdown of mysqld (temporary tables)
2504
and rollback unfinished transaction.
2505
Probably this can be done with RESET CONNECTION (syntax to be defined).
2507
my_b_printf(&cache,"RESET CONNECTION%s\n", print_event_info->delimiter);
2509
my_b_printf(&cache,"ROLLBACK%s\n", print_event_info->delimiter);
2513
print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER &&
2514
!print_event_info->short_form)
2516
my_b_printf(&cache, "BINLOG '\n");
2517
print_base64(&cache, print_event_info, false);
2518
print_event_info->printed_fd_event= true;
2522
#endif /* MYSQL_CLIENT */
2525
Start_log_event_v3::Start_log_event_v3()
2528
Start_log_event_v3::Start_log_event_v3(const char* buf,
2529
const Format_description_log_event
2531
:Log_event(buf, description_event)
2533
buf+= description_event->common_header_len;
2534
binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET);
2535
memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
2537
// prevent overrun if log is corrupted on disk
2538
server_version[ST_SERVER_VER_LEN-1]= 0;
2539
created= uint4korr(buf+ST_CREATED_OFFSET);
2540
/* We use log_pos to mark if this was an artificial event or not */
2541
artificial_event= (log_pos == 0);
2542
dont_set_created= 1;
2547
Start_log_event_v3::write()
2550
#ifndef MYSQL_CLIENT
2551
bool Start_log_event_v3::write(IO_CACHE* file)
2553
char buff[START_V3_HEADER_LEN];
2554
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
2555
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
2556
if (!dont_set_created)
2557
created= when= get_time();
2558
int4store(buff + ST_CREATED_OFFSET,created);
2559
return (write_header(file, sizeof(buff)) ||
2560
my_b_safe_write(file, (uchar*) buff, sizeof(buff)));
2565
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
2568
Start_log_event_v3::do_apply_event() .
2572
- To handle the case where the master died without having time to write
2573
DROP TEMPORARY TABLE, DO RELEASE_LOCK (prepared statements' deletion is
2574
TODO), we clean up all temporary tables that we got, if we are sure we
2578
- Remove all active user locks.
2579
Guilhem 2003-06: this is true but not urgent: the worst it can cause is
2580
the use of a bit of memory for a user lock which will not be used
2581
anymore. If the user lock is later used, the old one will be released. In
2582
other words, no deadlock problem.
2585
int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
2587
switch (binlog_version)
2592
This can either be 4.x (then a Start_log_event_v3 is only at master
2593
startup so we are sure the master has restarted and cleared his temp
2594
tables; the event always has 'created'>0) or 5.0 (then we have to test
2599
close_temporary_tables(thd);
2600
cleanup_load_tmpdir();
2605
Now the older formats; in that case load_tmpdir is cleaned up by the I/O
2609
if (strncmp(rli->relay_log.description_event_for_exec->server_version,
2610
"3.23.57",7) >= 0 && created)
2613
Can distinguish, based on the value of 'created': this event was
2614
generated at master startup.
2616
close_temporary_tables(thd);
2619
Otherwise, can't distinguish a Start_log_event generated at
2620
master startup and one generated by master FLUSH LOGS, so cannot
2621
be sure temp tables have to be dropped. So do nothing.
2625
/* this case is impossible */
2630
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
2632
/***************************************************************************
2633
Format_description_log_event methods
2634
****************************************************************************/
2637
Format_description_log_event 1st ctor.
2639
Ctor. Can be used to create the event to write to the binary log (when the
2640
server starts or when FLUSH LOGS), or to create artificial events to parse
2641
binlogs from MySQL 3.23 or 4.x.
2642
When in a client, only the 2nd use is possible.
2644
@param binlog_version the binlog version for which we want to build
2645
an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x
2646
x>=2 and 4.1) or 4 (MySQL 5.0). Note that the
2647
old 4.0 (binlog version 2) is not supported;
2648
it should not be used for replication with
2652
Format_description_log_event::
2653
Format_description_log_event(uint8 binlog_ver, const char* server_ver)
2654
:Start_log_event_v3(), event_type_permutation(0)
2656
binlog_version= binlog_ver;
2657
switch (binlog_ver) {
2658
case 4: /* MySQL 5.0 */
2659
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
2660
common_header_len= LOG_EVENT_HEADER_LEN;
2661
number_of_event_types= LOG_EVENT_TYPES;
2662
/* we'll catch my_malloc() error in is_valid() */
2663
post_header_len=(uint8*) my_malloc(number_of_event_types*sizeof(uint8),
2666
This long list of assignments is not beautiful, but I see no way to
2667
make it nicer, as the right members are #defines, not array members, so
2668
it's impossible to write a loop.
2670
if (post_header_len)
2672
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2673
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN;
2674
post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
2675
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2676
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2677
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2678
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2679
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2680
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
2681
post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
2682
post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN;
2683
post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2684
post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2685
post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2686
post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1];
2687
post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
2688
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
2689
post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
2694
case 3: /* 4.0.x x>=2 */
2696
We build an artificial (i.e. not sent by the master) event, which
2697
describes what those old master versions send.
2700
strmov(server_version, server_ver ? server_ver : "3.23");
2702
strmov(server_version, server_ver ? server_ver : "4.0");
2703
common_header_len= binlog_ver==1 ? OLD_HEADER_LEN :
2704
LOG_EVENT_MINIMAL_HEADER_LEN;
2706
The first new event in binlog version 4 is Format_desc. So any event type
2707
after that does not exist in older versions. We use the events known by
2708
version 3, even if version 1 had only a subset of them (this is not a
2709
problem: it uses a few bytes for nothing but unifies code; it does not
2710
make the slave detect less corruptions).
2712
number_of_event_types= FORMAT_DESCRIPTION_EVENT - 1;
2713
post_header_len=(uint8*) my_malloc(number_of_event_types*sizeof(uint8),
2715
if (post_header_len)
2717
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2718
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_MINIMAL_LEN;
2719
post_header_len[STOP_EVENT-1]= 0;
2720
post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN;
2721
post_header_len[INTVAR_EVENT-1]= 0;
2722
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2723
post_header_len[SLAVE_EVENT-1]= 0;
2724
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2725
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2726
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2727
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2728
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
2729
post_header_len[RAND_EVENT-1]= 0;
2730
post_header_len[USER_VAR_EVENT-1]= 0;
2733
default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
2734
post_header_len= 0; /* will make is_valid() fail */
2737
calc_server_version_split();
2742
The problem with this constructor is that the fixed header may have a
2743
length different from this version, but we don't know this length as we
2744
have not read the Format_description_log_event which says it, yet. This
2745
length is in the post-header of the event, but we don't know where the
2748
So this type of event HAS to:
2749
- either have the header's length at the beginning (in the header, at a
2750
fixed position which will never be changed), not in the post-header. That
2751
would make the header be "shifted" compared to other events.
2752
- or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future
2753
versions, so that we know for sure.
2755
I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because
2756
it is sent before Format_description_log_event).
2759
Format_description_log_event::
2760
Format_description_log_event(const char* buf,
2763
Format_description_log_event*
2765
:Start_log_event_v3(buf, description_event), event_type_permutation(0)
2767
buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
2768
if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
2769
return; /* sanity check */
2770
number_of_event_types=
2771
event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1);
2772
/* If alloc fails, we'll detect it in is_valid() */
2773
post_header_len= (uint8*) my_memdup((uchar*)buf+ST_COMMON_HEADER_LEN_OFFSET+1,
2774
number_of_event_types*
2775
sizeof(*post_header_len), MYF(0));
2776
calc_server_version_split();
2779
In some previous versions, the events were given other event type
2780
id numbers than in the present version. When replicating from such
2781
a version, we therefore set up an array that maps those id numbers
2782
to the id numbers of the present server.
2784
If post_header_len is null, it means malloc failed, and is_valid
2785
will fail, so there is no need to do anything.
2787
The trees in which events have wrong id's are:
2789
mysql-5.1-wl1012.old mysql-5.1-wl2325-5.0-drop6p13-alpha
2790
mysql-5.1-wl2325-5.0-drop6 mysql-5.1-wl2325-5.0
2791
mysql-5.1-wl2325-no-dd
2793
(this was found by grepping for two lines in sequence where the
2794
first matches "FORMAT_DESCRIPTION_EVENT," and the second matches
2795
"TABLE_MAP_EVENT," in log_event.h in all trees)
2797
In these trees, the following server_versions existed since
2798
TABLE_MAP_EVENT was introduced:
2800
5.1.1-a_drop5p3 5.1.1-a_drop5p4 5.1.1-alpha
2801
5.1.2-a_drop5p10 5.1.2-a_drop5p11 5.1.2-a_drop5p12
2802
5.1.2-a_drop5p13 5.1.2-a_drop5p14 5.1.2-a_drop5p15
2803
5.1.2-a_drop5p16 5.1.2-a_drop5p16b 5.1.2-a_drop5p16c
2804
5.1.2-a_drop5p17 5.1.2-a_drop5p4 5.1.2-a_drop5p5
2805
5.1.2-a_drop5p6 5.1.2-a_drop5p7 5.1.2-a_drop5p8
2806
5.1.2-a_drop5p9 5.1.3-a_drop5p17 5.1.3-a_drop5p17b
2807
5.1.3-a_drop5p17c 5.1.4-a_drop5p18 5.1.4-a_drop5p19
2808
5.1.4-a_drop5p20 5.1.4-a_drop6p0 5.1.4-a_drop6p1
2809
5.1.4-a_drop6p2 5.1.5-a_drop5p20 5.2.0-a_drop6p3
2810
5.2.0-a_drop6p4 5.2.0-a_drop6p5 5.2.0-a_drop6p6
2811
5.2.1-a_drop6p10 5.2.1-a_drop6p11 5.2.1-a_drop6p12
2812
5.2.1-a_drop6p6 5.2.1-a_drop6p7 5.2.1-a_drop6p8
2813
5.2.2-a_drop6p13 5.2.2-a_drop6p13-alpha 5.2.2-a_drop6p13b
2816
(this was found by grepping for "mysql," in all historical
2817
versions of configure.in in the trees listed above).
2819
There are 5.1.1-alpha versions that use the new event id's, so we
2820
do not test that version string. So replication from 5.1.1-alpha
2821
with the other event id's to a new version does not work.
2822
Moreover, we can safely ignore the part after drop[56]. This
2823
allows us to simplify the big list above to the following regexes:
2825
5\.1\.[1-5]-a_drop5.*
2827
5\.2\.[0-2]-a_drop6.*
2829
This is what we test for in the 'if' below.
2831
if (post_header_len &&
2832
server_version[0] == '5' && server_version[1] == '.' &&
2833
server_version[3] == '.' &&
2834
strncmp(server_version + 5, "-a_drop", 7) == 0 &&
2835
((server_version[2] == '1' &&
2836
server_version[4] >= '1' && server_version[4] <= '5' &&
2837
server_version[12] == '5') ||
2838
(server_version[2] == '1' &&
2839
server_version[4] == '4' &&
2840
server_version[12] == '6') ||
2841
(server_version[2] == '2' &&
2842
server_version[4] >= '0' && server_version[4] <= '2' &&
2843
server_version[12] == '6')))
2845
if (number_of_event_types != 22)
2847
/* this makes is_valid() return false. */
2848
my_free(post_header_len, MYF(MY_ALLOW_ZERO_PTR));
2849
post_header_len= NULL;
2852
static const uint8 perm[23]=
2854
UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
2855
INTVAR_EVENT, LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
2856
APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
2858
RAND_EVENT, USER_VAR_EVENT,
2859
FORMAT_DESCRIPTION_EVENT,
2861
PRE_GA_WRITE_ROWS_EVENT,
2862
PRE_GA_UPDATE_ROWS_EVENT,
2863
PRE_GA_DELETE_ROWS_EVENT,
2865
BEGIN_LOAD_QUERY_EVENT,
2866
EXECUTE_LOAD_QUERY_EVENT,
2868
event_type_permutation= perm;
2870
Since we use (permuted) event id's to index the post_header_len
2871
array, we need to permute the post_header_len array too.
2873
uint8 post_header_len_temp[23];
2874
for (int i= 1; i < 23; i++)
2875
post_header_len_temp[perm[i] - 1]= post_header_len[i - 1];
2876
for (int i= 0; i < 22; i++)
2877
post_header_len[i] = post_header_len_temp[i];
2882
#ifndef MYSQL_CLIENT
2883
bool Format_description_log_event::write(IO_CACHE* file)
2886
We don't call Start_log_event_v3::write() because this would make 2
2889
uchar buff[FORMAT_DESCRIPTION_HEADER_LEN];
2890
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
2891
memcpy((char*) buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
2892
if (!dont_set_created)
2893
created= when= get_time();
2894
int4store(buff + ST_CREATED_OFFSET,created);
2895
buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
2896
memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET+1, (uchar*) post_header_len,
2898
return (write_header(file, sizeof(buff)) ||
2899
my_b_safe_write(file, buff, sizeof(buff)));
2903
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
2904
int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
2907
As a transaction NEVER spans on 2 or more binlogs:
2908
if we have an active transaction at this point, the master died
2909
while writing the transaction to the binary log, i.e. while
2910
flushing the binlog cache to the binlog. XA guarantees that master has
2911
rolled back. So we roll back.
2912
Note: this event could be sent by the master to inform us of the
2913
format of its binlog; in other words maybe it is not at its
2914
original place when it comes to us; we'll know this by checking
2915
log_pos ("artificial" events have log_pos == 0).
2917
if (!artificial_event && created && thd->transaction.all.ha_list)
2919
/* This is not an error (XA is safe), just an information */
2920
rli->report(INFORMATION_LEVEL, 0,
2921
"Rolling back unfinished transaction (no COMMIT "
2922
"or ROLLBACK in relay log). A probable cause is that "
2923
"the master died while writing the transaction to "
2924
"its binary log, thus rolled back too.");
2925
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 1);
2928
If this event comes from ourselves, there is no cleaning task to
2929
perform, we don't call Start_log_event_v3::do_apply_event()
2930
(this was just to update the log's description event).
2932
if (server_id != (uint32) ::server_id)
2935
If the event was not requested by the slave i.e. the master sent
2936
it while the slave asked for a position >4, the event will make
2937
rli->group_master_log_pos advance. Say that the slave asked for
2938
position 1000, and the Format_desc event's end is 96. Then in
2939
the beginning of replication rli->group_master_log_pos will be
2940
0, then 96, then jump to first really asked event (which is
2941
>96). So this is ok.
2943
return(Start_log_event_v3::do_apply_event(rli));
2948
int Format_description_log_event::do_update_pos(Relay_log_info *rli)
2950
/* save the information describing this binlog */
2951
delete rli->relay_log.description_event_for_exec;
2952
rli->relay_log.description_event_for_exec= this;
2954
if (server_id == (uint32) ::server_id)
2957
We only increase the relay log position if we are skipping
2958
events and do not touch any group_* variables, nor flush the
2959
relay log info. If there is a crash, we will have to re-skip
2960
the events again, but that is a minor issue.
2962
If we do not skip stepping the group log position (and the
2963
server id was changed when restarting the server), it might well
2964
be that we start executing at a position that is invalid, e.g.,
2965
at a Rows_log_event or a Query_log_event preceeded by a
2966
Intvar_log_event instead of starting at a Table_map_log_event or
2967
the Intvar_log_event respectively.
2969
rli->inc_event_relay_log_pos();
2974
return Log_event::do_update_pos(rli);
2978
Log_event::enum_skip_reason
2979
Format_description_log_event::do_shall_skip(Relay_log_info *rli __attribute__((__unused__)))
2981
return Log_event::EVENT_SKIP_NOT;
2988
Splits the event's 'server_version' string into three numeric pieces stored
2989
into 'server_version_split':
2990
X.Y.Zabc (X,Y,Z numbers, a not a digit) -> {X,Y,Z}
2993
'server_version_split' is then used for lookups to find if the server which
2994
created this event has some known bug.
2996
void Format_description_log_event::calc_server_version_split()
2998
char *p= server_version, *r;
3000
for (uint i= 0; i<=2; i++)
3002
number= strtoul(p, &r, 10);
3003
server_version_split[i]= (uchar)number;
3004
assert(number < 256); // fit in uchar
3006
assert(!((i == 0) && (*r != '.'))); // should be true in practice
3008
p++; // skip the dot
3013
/**************************************************************************
3014
Load_log_event methods
3015
General note about Load_log_event: the binlogging of LOAD DATA INFILE is
3016
going to be changed in 5.0 (or maybe in 5.1; not decided yet).
3017
However, the 5.0 slave could still have to read such events (from a 4.x
3018
master), convert them (which just means maybe expand the header, when 5.0
3019
servers have a UID in events) (remember that whatever is after the header
3020
will be like in 4.x, as this event's format is not modified in 5.0 as we
3021
will use new types of events to log the new LOAD DATA INFILE features).
3022
To be able to read/convert, we just need to not assume that the common
3023
header is of length LOG_EVENT_HEADER_LEN (we must use the description
3025
Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE
3026
between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the
3027
positions displayed in SHOW SLAVE STATUS then are fine too).
3028
**************************************************************************/
3031
Load_log_event::pack_info()
3034
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
3035
uint Load_log_event::get_query_buffer_length()
3038
5 + db_len + 3 + // "use DB; "
3039
18 + fname_len + 2 + // "LOAD DATA INFILE 'file''"
3041
9 + // " REPLACE or IGNORE "
3042
13 + table_name_len*2 + // "INTO TABLE `table`"
3043
21 + sql_ex.field_term_len*4 + 2 + // " FIELDS TERMINATED BY 'str'"
3044
23 + sql_ex.enclosed_len*4 + 2 + // " OPTIONALLY ENCLOSED BY 'str'"
3045
12 + sql_ex.escaped_len*4 + 2 + // " ESCAPED BY 'str'"
3046
21 + sql_ex.line_term_len*4 + 2 + // " LINES TERMINATED BY 'str'"
3047
19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'"
3048
15 + 22 + // " IGNORE xxx LINES"
3049
3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)"
3053
void Load_log_event::print_query(bool need_db, char *buf,
3054
char **end, char **fn_start, char **fn_end)
3058
if (need_db && db && db_len)
3060
pos= strmov(pos, "use `");
3061
memcpy(pos, db, db_len);
3062
pos= strmov(pos+db_len, "`; ");
3065
pos= strmov(pos, "LOAD DATA ");
3070
if (check_fname_outside_temp_buf())
3071
pos= strmov(pos, "LOCAL ");
3072
pos= strmov(pos, "INFILE '");
3073
memcpy(pos, fname, fname_len);
3074
pos= strmov(pos+fname_len, "' ");
3076
if (sql_ex.opt_flags & REPLACE_FLAG)
3077
pos= strmov(pos, " REPLACE ");
3078
else if (sql_ex.opt_flags & IGNORE_FLAG)
3079
pos= strmov(pos, " IGNORE ");
3081
pos= strmov(pos ,"INTO");
3086
pos= strmov(pos ," TABLE `");
3087
memcpy(pos, table_name, table_name_len);
3088
pos+= table_name_len;
3090
/* We have to create all optinal fields as the default is not empty */
3091
pos= strmov(pos, "` FIELDS TERMINATED BY ");
3092
pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len);
3093
if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
3094
pos= strmov(pos, " OPTIONALLY ");
3095
pos= strmov(pos, " ENCLOSED BY ");
3096
pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len);
3098
pos= strmov(pos, " ESCAPED BY ");
3099
pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
3101
pos= strmov(pos, " LINES TERMINATED BY ");
3102
pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
3103
if (sql_ex.line_start_len)
3105
pos= strmov(pos, " STARTING BY ");
3106
pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
3109
if ((long) skip_lines > 0)
3111
pos= strmov(pos, " IGNORE ");
3112
pos= longlong10_to_str((longlong) skip_lines, pos, 10);
3113
pos= strmov(pos," LINES ");
3119
const char *field= fields;
3120
pos= strmov(pos, " (");
3121
for (i = 0; i < num_fields; i++)
3128
memcpy(pos, field, field_lens[i]);
3129
pos+= field_lens[i];
3130
field+= field_lens[i] + 1;
3139
void Load_log_event::pack_info(Protocol *protocol)
3143
if (!(buf= (char*) my_malloc(get_query_buffer_length(), MYF(MY_WME))))
3145
print_query(true, buf, &end, 0, 0);
3146
protocol->store(buf, end-buf, &my_charset_bin);
3147
my_free(buf, MYF(0));
3149
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
3152
#ifndef MYSQL_CLIENT
3155
Load_log_event::write_data_header()
3158
bool Load_log_event::write_data_header(IO_CACHE* file)
3160
char buf[LOAD_HEADER_LEN];
3161
int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
3162
int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
3163
int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
3164
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
3165
buf[L_DB_LEN_OFFSET] = (char)db_len;
3166
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
3167
return my_b_safe_write(file, (uchar*)buf, LOAD_HEADER_LEN) != 0;
3172
Load_log_event::write_data_body()
3175
bool Load_log_event::write_data_body(IO_CACHE* file)
3177
if (sql_ex.write_data(file))
3179
if (num_fields && fields && field_lens)
3181
if (my_b_safe_write(file, (uchar*)field_lens, num_fields) ||
3182
my_b_safe_write(file, (uchar*)fields, field_block_len))
3185
return (my_b_safe_write(file, (uchar*)table_name, table_name_len + 1) ||
3186
my_b_safe_write(file, (uchar*)db, db_len + 1) ||
3187
my_b_safe_write(file, (uchar*)fname, fname_len));
3192
Load_log_event::Load_log_event()
3195
Load_log_event::Load_log_event(THD *thd_arg, sql_exchange *ex,
3196
const char *db_arg, const char *table_name_arg,
3197
List<Item> &fields_arg,
3198
enum enum_duplicates handle_dup,
3199
bool ignore, bool using_trans)
3201
thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
3203
thread_id(thd_arg->thread_id),
3204
slave_proxy_id(thd_arg->variables.pseudo_thread_id),
3205
num_fields(0),fields(0),
3206
field_lens(0),field_block_len(0),
3207
table_name(table_name_arg ? table_name_arg : ""),
3208
db(db_arg), fname(ex->file_name), local_fname(false)
3212
exec_time = (ulong) (end_time - thd_arg->start_time);
3213
/* db can never be a zero pointer in 4.0 */
3214
db_len = (uint32) strlen(db);
3215
table_name_len = (uint32) strlen(table_name);
3216
fname_len = (fname) ? (uint) strlen(fname) : 0;
3217
sql_ex.field_term = (char*) ex->field_term->ptr();
3218
sql_ex.field_term_len = (uint8) ex->field_term->length();
3219
sql_ex.enclosed = (char*) ex->enclosed->ptr();
3220
sql_ex.enclosed_len = (uint8) ex->enclosed->length();
3221
sql_ex.line_term = (char*) ex->line_term->ptr();
3222
sql_ex.line_term_len = (uint8) ex->line_term->length();
3223
sql_ex.line_start = (char*) ex->line_start->ptr();
3224
sql_ex.line_start_len = (uint8) ex->line_start->length();
3225
sql_ex.escaped = (char*) ex->escaped->ptr();
3226
sql_ex.escaped_len = (uint8) ex->escaped->length();
3227
sql_ex.opt_flags = 0;
3228
sql_ex.cached_new_format = -1;
3231
sql_ex.opt_flags|= DUMPFILE_FLAG;
3232
if (ex->opt_enclosed)
3233
sql_ex.opt_flags|= OPT_ENCLOSED_FLAG;
3235
sql_ex.empty_flags= 0;
3237
switch (handle_dup) {
3239
sql_ex.opt_flags|= REPLACE_FLAG;
3241
case DUP_UPDATE: // Impossible here
3246
sql_ex.opt_flags|= IGNORE_FLAG;
3248
if (!ex->field_term->length())
3249
sql_ex.empty_flags |= FIELD_TERM_EMPTY;
3250
if (!ex->enclosed->length())
3251
sql_ex.empty_flags |= ENCLOSED_EMPTY;
3252
if (!ex->line_term->length())
3253
sql_ex.empty_flags |= LINE_TERM_EMPTY;
3254
if (!ex->line_start->length())
3255
sql_ex.empty_flags |= LINE_START_EMPTY;
3256
if (!ex->escaped->length())
3257
sql_ex.empty_flags |= ESCAPED_EMPTY;
3259
skip_lines = ex->skip_lines;
3261
List_iterator<Item> li(fields_arg);
3262
field_lens_buf.length(0);
3263
fields_buf.length(0);
3265
while ((item = li++))
3268
uchar len = (uchar) strlen(item->name);
3269
field_block_len += len + 1;
3270
fields_buf.append(item->name, len + 1);
3271
field_lens_buf.append((char*)&len, 1);
3274
field_lens = (const uchar*)field_lens_buf.ptr();
3275
fields = fields_buf.ptr();
3277
#endif /* !MYSQL_CLIENT */
3282
The caller must do buf[event_len] = 0 before he starts using the
3285
Load_log_event::Load_log_event(const char *buf, uint event_len,
3286
const Format_description_log_event *description_event)
3287
:Log_event(buf, description_event), num_fields(0), fields(0),
3288
field_lens(0),field_block_len(0),
3289
table_name(0), db(0), fname(0), local_fname(false)
3292
I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0,
3293
4.0->5.0 and 5.0->5.0 and it works.
3296
copy_log_event(buf, event_len,
3297
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
3299
description_event->common_header_len :
3300
LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
3302
/* otherwise it's a derived class, will call copy_log_event() itself */
3308
Load_log_event::copy_log_event()
3311
int Load_log_event::copy_log_event(const char *buf, ulong event_len,
3313
const Format_description_log_event *description_event)
3316
char* buf_end = (char*)buf + event_len;
3317
/* this is the beginning of the post-header */
3318
const char* data_head = buf + description_event->common_header_len;
3319
slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
3320
exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
3321
skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
3322
table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
3323
db_len = (uint)data_head[L_DB_LEN_OFFSET];
3324
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
3326
if ((int) event_len < body_offset)
3329
Sql_ex.init() on success returns the pointer to the first byte after
3330
the sql_ex structure, which is the start of field lengths array.
3332
if (!(field_lens= (uchar*)sql_ex.init((char*)buf + body_offset,
3334
buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
3337
data_len = event_len - body_offset;
3338
if (num_fields > data_len) // simple sanity check against corruption
3340
for (uint i = 0; i < num_fields; i++)
3341
field_block_len += (uint)field_lens[i] + 1;
3343
fields = (char*)field_lens + num_fields;
3344
table_name = fields + field_block_len;
3345
db = table_name + table_name_len + 1;
3346
fname = db + db_len + 1;
3347
fname_len = strlen(fname);
3348
// null termination is accomplished by the caller doing buf[event_len]=0
3355
Load_log_event::print()
3359
void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
3361
print(file, print_event_info, 0);
3365
void Load_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info,
3368
Write_on_release_cache cache(&print_event_info->head_cache, file_arg);
3370
if (!print_event_info->short_form)
3372
print_header(&cache, print_event_info, false);
3373
my_b_printf(&cache, "\tQuery\tthread_id=%ld\texec_time=%ld\n",
3374
thread_id, exec_time);
3377
bool different_db= 1;
3381
If the database is different from the one of the previous statement, we
3382
need to print the "use" command, and we update the last_db.
3383
But if commented, the "use" is going to be commented so we should not
3386
if ((different_db= memcmp(print_event_info->db, db, db_len + 1)) &&
3388
memcpy(print_event_info->db, db, db_len + 1);
3391
if (db && db[0] && different_db)
3392
my_b_printf(&cache, "%suse %s%s\n",
3393
commented ? "# " : "",
3394
db, print_event_info->delimiter);
3396
if (flags & LOG_EVENT_THREAD_SPECIFIC_F)
3397
my_b_printf(&cache,"%sSET @@session.pseudo_thread_id=%lu%s\n",
3398
commented ? "# " : "", (ulong)thread_id,
3399
print_event_info->delimiter);
3400
my_b_printf(&cache, "%sLOAD DATA ",
3401
commented ? "# " : "");
3402
if (check_fname_outside_temp_buf())
3403
my_b_printf(&cache, "LOCAL ");
3404
my_b_printf(&cache, "INFILE '%-*s' ", fname_len, fname);
3406
if (sql_ex.opt_flags & REPLACE_FLAG)
3407
my_b_printf(&cache," REPLACE ");
3408
else if (sql_ex.opt_flags & IGNORE_FLAG)
3409
my_b_printf(&cache," IGNORE ");
3411
my_b_printf(&cache, "INTO TABLE `%s`", table_name);
3412
my_b_printf(&cache, " FIELDS TERMINATED BY ");
3413
pretty_print_str(&cache, sql_ex.field_term, sql_ex.field_term_len);
3415
if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
3416
my_b_printf(&cache," OPTIONALLY ");
3417
my_b_printf(&cache, " ENCLOSED BY ");
3418
pretty_print_str(&cache, sql_ex.enclosed, sql_ex.enclosed_len);
3420
my_b_printf(&cache, " ESCAPED BY ");
3421
pretty_print_str(&cache, sql_ex.escaped, sql_ex.escaped_len);
3423
my_b_printf(&cache," LINES TERMINATED BY ");
3424
pretty_print_str(&cache, sql_ex.line_term, sql_ex.line_term_len);
3427
if (sql_ex.line_start)
3429
my_b_printf(&cache," STARTING BY ");
3430
pretty_print_str(&cache, sql_ex.line_start, sql_ex.line_start_len);
3432
if ((long) skip_lines > 0)
3433
my_b_printf(&cache, " IGNORE %ld LINES", (long) skip_lines);
3438
const char* field = fields;
3439
my_b_printf(&cache, " (");
3440
for (i = 0; i < num_fields; i++)
3443
my_b_printf(&cache, ",");
3444
my_b_printf(&cache, field);
3446
field += field_lens[i] + 1;
3448
my_b_printf(&cache, ")");
3451
my_b_printf(&cache, "%s\n", print_event_info->delimiter);
3454
#endif /* MYSQL_CLIENT */
3456
#ifndef MYSQL_CLIENT
3459
Load_log_event::set_fields()
3462
This function can not use the member variable
3463
for the database, since LOAD DATA INFILE on the slave
3464
can be for a different database than the current one.
3465
This is the reason for the affected_db argument to this method.
3468
void Load_log_event::set_fields(const char* affected_db,
3469
List<Item> &field_list,
3470
Name_resolution_context *context)
3473
const char* field = fields;
3474
for (i= 0; i < num_fields; i++)
3476
field_list.push_back(new Item_field(context,
3477
affected_db, table_name, field));
3478
field+= field_lens[i] + 1;
3481
#endif /* !MYSQL_CLIENT */
3484
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
3486
Does the data loading job when executing a LOAD DATA on the slave.
3490
@param use_rli_only_for_errors If set to 1, rli is provided to
3491
Load_log_event::exec_event only for this
3492
function to have RPL_LOG_NAME and
3493
rli->last_slave_error, both being used by
3494
error reports. rli's position advancing
3495
is skipped (done by the caller which is
3496
Execute_load_log_event::exec_event).
3497
If set to 0, rli is provided for full use,
3498
i.e. for error reports and position
3502
fix this; this can be done by testing rules in
3503
Create_file_log_event::exec_event() and then discarding Append_block and
3506
this is a bug - this needs to be moved to the I/O thread
3514
int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
3515
bool use_rli_only_for_errors)
3518
new_db.length= db_len;
3519
new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
3520
thd->set_db(new_db.str, new_db.length);
3521
assert(thd->query == 0);
3522
thd->query_length= 0; // Should not be needed
3523
thd->is_slave_error= 0;
3524
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
3526
/* see Query_log_event::do_apply_event() and BUG#13360 */
3527
assert(!rli->m_table_map.count());
3529
Usually lex_start() is called by mysql_parse(), but we need it here
3530
as the present method does not call mysql_parse().
3533
mysql_reset_thd_for_next_command(thd);
3535
if (!use_rli_only_for_errors)
3538
Saved for InnoDB, see comment in
3539
Query_log_event::do_apply_event()
3541
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
3545
We test replicate_*_db rules. Note that we have already prepared
3546
the file to load, even if we are going to ignore and delete it
3547
now. So it is possible that we did a lot of disk writes for
3548
nothing. In other words, a big LOAD DATA INFILE on the master will
3549
still consume a lot of space on the slave (space in the relay log
3550
+ space of temp files: twice the space of the file to load...)
3551
even if it will finally be ignored. TODO: fix this; this can be
3552
done by testing rules in Create_file_log_event::do_apply_event()
3553
and then discarding Append_block and al. Another way is do the
3554
filtering in the I/O thread (more efficient: no disk writes at
3558
Note: We do not need to execute reset_one_shot_variables() if this
3560
Reason: The db stored in binlog events is the same for SET and for
3561
its companion query. If the SET is ignored because of
3562
db_ok(), the companion query will also be ignored, and if
3563
the companion query is ignored in the db_ok() test of
3564
::do_apply_event(), then the companion SET also have so
3565
we don't need to reset_one_shot_variables().
3567
if (rpl_filter->db_ok(thd->db))
3569
thd->set_time((time_t)when);
3570
VOID(pthread_mutex_lock(&LOCK_thread_count));
3571
thd->query_id = next_query_id();
3572
VOID(pthread_mutex_unlock(&LOCK_thread_count));
3574
Initing thd->row_count is not necessary in theory as this variable has no
3575
influence in the case of the slave SQL thread (it is used to generate a
3576
"data truncated" warning but which is absorbed and never gets to the
3577
error log); still we init it to avoid a Valgrind message.
3579
mysql_reset_errors(thd, 0);
3582
bzero((char*) &tables,sizeof(tables));
3583
tables.db= thd->strmake(thd->db, thd->db_length);
3584
tables.alias = tables.table_name = (char*) table_name;
3585
tables.lock_type = TL_WRITE;
3588
// the table will be opened in mysql_load
3589
if (rpl_filter->is_on() && !rpl_filter->tables_ok(thd->db, &tables))
3591
// TODO: this is a bug - this needs to be moved to the I/O thread
3593
skip_load_data_infile(net);
3599
enum enum_duplicates handle_dup;
3601
char *load_data_query;
3604
Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST
3605
and written to slave's binlog if binlogging is on.
3607
if (!(load_data_query= (char *)thd->alloc(get_query_buffer_length() + 1)))
3610
This will set thd->fatal_error in case of OOM. So we surely will notice
3611
that something is wrong.
3616
print_query(false, load_data_query, &end, (char **)&thd->lex->fname_start,
3617
(char **)&thd->lex->fname_end);
3619
thd->query_length= end - load_data_query;
3620
thd->query= load_data_query;
3622
if (sql_ex.opt_flags & REPLACE_FLAG)
3624
handle_dup= DUP_REPLACE;
3626
else if (sql_ex.opt_flags & IGNORE_FLAG)
3629
handle_dup= DUP_ERROR;
3634
When replication is running fine, if it was DUP_ERROR on the
3635
master then we could choose IGNORE here, because if DUP_ERROR
3636
suceeded on master, and data is identical on the master and slave,
3637
then there should be no uniqueness errors on slave, so IGNORE is
3638
the same as DUP_ERROR. But in the unlikely case of uniqueness errors
3639
(because the data on the master and slave happen to be different
3640
(user error or bug), we want LOAD DATA to print an error message on
3641
the slave to discover the problem.
3643
If reading from net (a 3.23 master), mysql_load() will change this
3646
handle_dup= DUP_ERROR;
3649
We need to set thd->lex->sql_command and thd->lex->duplicates
3650
since InnoDB tests these variables to decide if this is a LOAD
3651
DATA ... REPLACE INTO ... statement even though mysql_parse()
3652
is not called. This is not needed in 5.0 since there the LOAD
3653
DATA ... statement is replicated using mysql_parse(), which
3654
sets the thd->lex fields correctly.
3656
thd->lex->sql_command= SQLCOM_LOAD;
3657
thd->lex->duplicates= handle_dup;
3659
sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
3660
String field_term(sql_ex.field_term,sql_ex.field_term_len,log_cs);
3661
String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,log_cs);
3662
String line_term(sql_ex.line_term,sql_ex.line_term_len,log_cs);
3663
String line_start(sql_ex.line_start,sql_ex.line_start_len,log_cs);
3664
String escaped(sql_ex.escaped,sql_ex.escaped_len, log_cs);
3665
ex.field_term= &field_term;
3666
ex.enclosed= &enclosed;
3667
ex.line_term= &line_term;
3668
ex.line_start= &line_start;
3669
ex.escaped= &escaped;
3671
ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
3672
if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
3673
ex.field_term->length(0);
3675
ex.skip_lines = skip_lines;
3676
List<Item> field_list;
3677
thd->lex->select_lex.context.resolve_in_table_list_only(&tables);
3678
set_fields(tables.db, field_list, &thd->lex->select_lex.context);
3679
thd->variables.pseudo_thread_id= thread_id;
3682
// mysql_load will use thd->net to read the file
3683
thd->net.vio = net->vio;
3685
Make sure the client does not get confused about the packet sequence
3687
thd->net.pkt_nr = net->pkt_nr;
3690
It is safe to use tmp_list twice because we are not going to
3691
update it inside mysql_load().
3693
List<Item> tmp_list;
3694
if (mysql_load(thd, &ex, &tables, field_list, tmp_list, tmp_list,
3695
handle_dup, ignore, net != 0))
3696
thd->is_slave_error= 1;
3697
if (thd->cuted_fields)
3699
/* log_pos is the position of the LOAD event in the master log */
3700
sql_print_warning("Slave: load data infile on table '%s' at "
3701
"log position %s in log '%s' produced %ld "
3702
"warning(s). Default database: '%s'",
3704
llstr(log_pos,llbuff), RPL_LOG_NAME,
3705
(ulong) thd->cuted_fields,
3706
print_slave_db_safe(thd->db));
3709
net->pkt_nr= thd->net.pkt_nr;
3715
We will just ask the master to send us /dev/null if we do not
3716
want to load the data.
3717
TODO: this a bug - needs to be done in I/O thread
3720
skip_load_data_infile(net);
3725
const char *remember_db= thd->db;
3726
VOID(pthread_mutex_lock(&LOCK_thread_count));
3728
thd->set_db(NULL, 0); /* will free the current database */
3730
thd->query_length= 0;
3731
VOID(pthread_mutex_unlock(&LOCK_thread_count));
3732
close_thread_tables(thd);
3734
if (thd->is_slave_error)
3736
/* this err/sql_errno code is copy-paste from net_send_error() */
3739
if (thd->is_error())
3741
err= thd->main_da.message();
3742
sql_errno= thd->main_da.sql_errno();
3746
sql_errno=ER_UNKNOWN_ERROR;
3749
rli->report(ERROR_LEVEL, sql_errno,"\
3750
Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'",
3751
err, (char*)table_name, print_slave_db_safe(remember_db));
3752
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
3755
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
3757
if (thd->is_fatal_error)
3760
snprintf(buf, sizeof(buf),
3761
"Running LOAD DATA INFILE on table '%-.64s'."
3762
" Default database: '%-.64s'",
3764
print_slave_db_safe(remember_db));
3766
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
3767
ER(ER_SLAVE_FATAL_ERROR), buf);
3771
return ( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) );
3776
/**************************************************************************
3777
Rotate_log_event methods
3778
**************************************************************************/
3781
Rotate_log_event::pack_info()
3784
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
3785
void Rotate_log_event::pack_info(Protocol *protocol)
3787
char buf1[256], buf[22];
3788
String tmp(buf1, sizeof(buf1), log_cs);
3790
tmp.append(new_log_ident, ident_len);
3791
tmp.append(STRING_WITH_LEN(";pos="));
3792
tmp.append(llstr(pos,buf));
3793
protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
3799
Rotate_log_event::print()
3803
void Rotate_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
3806
Write_on_release_cache cache(&print_event_info->head_cache, file,
3807
Write_on_release_cache::FLUSH_F);
3809
if (print_event_info->short_form)
3811
print_header(&cache, print_event_info, false);
3812
my_b_printf(&cache, "\tRotate to ");
3814
my_b_write(&cache, (uchar*) new_log_ident, (uint)ident_len);
3815
my_b_printf(&cache, " pos: %s\n", llstr(pos, buf));
3817
#endif /* MYSQL_CLIENT */
3822
Rotate_log_event::Rotate_log_event() (2 constructors)
3826
#ifndef MYSQL_CLIENT
3827
Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
3828
uint ident_len_arg, ulonglong pos_arg,
3830
:Log_event(), new_log_ident(new_log_ident_arg),
3831
pos(pos_arg),ident_len(ident_len_arg ? ident_len_arg :
3832
(uint) strlen(new_log_ident_arg)), flags(flags_arg)
3834
if (flags & DUP_NAME)
3835
new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME));
3841
Rotate_log_event::Rotate_log_event(const char* buf, uint event_len,
3842
const Format_description_log_event* description_event)
3843
:Log_event(buf, description_event) ,new_log_ident(0), flags(DUP_NAME)
3845
// The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
3846
uint8 header_size= description_event->common_header_len;
3847
uint8 post_header_len= description_event->post_header_len[ROTATE_EVENT-1];
3849
if (event_len < header_size)
3852
pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
3853
ident_len = (uint)(event_len -
3854
(header_size+post_header_len));
3855
ident_offset = post_header_len;
3856
set_if_smaller(ident_len,FN_REFLEN-1);
3857
new_log_ident= my_strndup(buf + ident_offset, (uint) ident_len, MYF(MY_WME));
3863
Rotate_log_event::write()
3866
#ifndef MYSQL_CLIENT
3867
bool Rotate_log_event::write(IO_CACHE* file)
3869
char buf[ROTATE_HEADER_LEN];
3870
int8store(buf + R_POS_OFFSET, pos);
3871
return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
3872
my_b_safe_write(file, (uchar*)buf, ROTATE_HEADER_LEN) ||
3873
my_b_safe_write(file, (uchar*)new_log_ident, (uint) ident_len));
3878
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
3881
Got a rotate log event from the master.
3883
This is mainly used so that we can later figure out the logname and
3884
position for the master.
3886
We can't rotate the slave's BINlog as this will cause infinitive rotations
3887
in a A -> B -> A setup.
3888
The NOTES below is a wrong comment which will disappear when 4.1 is merged.
3893
int Rotate_log_event::do_update_pos(Relay_log_info *rli)
3895
pthread_mutex_lock(&rli->data_lock);
3896
rli->event_relay_log_pos= my_b_tell(rli->cur_log);
3898
If we are in a transaction or in a group: the only normal case is
3899
when the I/O thread was copying a big transaction, then it was
3900
stopped and restarted: we have this in the relay log:
3908
In that case, we don't want to touch the coordinates which
3909
correspond to the beginning of the transaction. Starting from
3910
5.0.0, there also are some rotates from the slave itself, in the
3911
relay log, which shall not change the group positions.
3913
if ((server_id != ::server_id || rli->replicate_same_server_id) &&
3914
!rli->is_in_group())
3916
memcpy(rli->group_master_log_name, new_log_ident, ident_len+1);
3917
rli->notify_group_master_log_name_update();
3918
rli->group_master_log_pos= pos;
3919
strmake(rli->group_relay_log_name, rli->event_relay_log_name,
3920
sizeof(rli->group_relay_log_name) - 1);
3921
rli->notify_group_relay_log_name_update();
3922
rli->group_relay_log_pos= rli->event_relay_log_pos;
3924
Reset thd->options and sql_mode etc, because this could be the signal of
3925
a master's downgrade from 5.0 to 4.0.
3926
However, no need to reset description_event_for_exec: indeed, if the next
3927
master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
3928
master is 4.0 then the events are in the slave's format (conversion).
3930
set_slave_thread_options(thd);
3931
set_slave_thread_default_charset(thd, rli);
3932
thd->variables.auto_increment_increment=
3933
thd->variables.auto_increment_offset= 1;
3935
pthread_mutex_unlock(&rli->data_lock);
3936
pthread_cond_broadcast(&rli->data_cond);
3937
flush_relay_log_info(rli);
3943
Log_event::enum_skip_reason
3944
Rotate_log_event::do_shall_skip(Relay_log_info *rli)
3946
enum_skip_reason reason= Log_event::do_shall_skip(rli);
3949
case Log_event::EVENT_SKIP_NOT:
3950
case Log_event::EVENT_SKIP_COUNT:
3951
return Log_event::EVENT_SKIP_NOT;
3953
case Log_event::EVENT_SKIP_IGNORE:
3954
return Log_event::EVENT_SKIP_IGNORE;
3957
return Log_event::EVENT_SKIP_NOT; // To keep compiler happy
3963
/**************************************************************************
3964
Intvar_log_event methods
3965
**************************************************************************/
3968
Intvar_log_event::pack_info()
3971
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
3972
void Intvar_log_event::pack_info(Protocol *protocol)
3974
char buf[256], *pos;
3975
pos= strmake(buf, get_var_type_name(), sizeof(buf)-23);
3977
pos= longlong10_to_str(val, pos, -10);
3978
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3984
Intvar_log_event::Intvar_log_event()
3987
Intvar_log_event::Intvar_log_event(const char* buf,
3988
const Format_description_log_event* description_event)
3989
:Log_event(buf, description_event)
3991
buf+= description_event->common_header_len;
3992
type= buf[I_TYPE_OFFSET];
3993
val= uint8korr(buf+I_VAL_OFFSET);
3998
Intvar_log_event::get_var_type_name()
4001
const char* Intvar_log_event::get_var_type_name()
4004
case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
4005
case INSERT_ID_EVENT: return "INSERT_ID";
4006
default: /* impossible */ return "UNKNOWN";
4012
Intvar_log_event::write()
4015
#ifndef MYSQL_CLIENT
4016
bool Intvar_log_event::write(IO_CACHE* file)
4019
buf[I_TYPE_OFFSET]= (uchar) type;
4020
int8store(buf + I_VAL_OFFSET, val);
4021
return (write_header(file, sizeof(buf)) ||
4022
my_b_safe_write(file, buf, sizeof(buf)));
4028
Intvar_log_event::print()
4032
void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
4036
Write_on_release_cache cache(&print_event_info->head_cache, file,
4037
Write_on_release_cache::FLUSH_F);
4039
if (!print_event_info->short_form)
4041
print_header(&cache, print_event_info, false);
4042
my_b_printf(&cache, "\tIntvar\n");
4045
my_b_printf(&cache, "SET ");
4047
case LAST_INSERT_ID_EVENT:
4048
msg="LAST_INSERT_ID";
4050
case INSERT_ID_EVENT:
4053
case INVALID_INT_EVENT:
4054
default: // cannot happen
4058
my_b_printf(&cache, "%s=%s%s\n",
4059
msg, llstr(val,llbuff), print_event_info->delimiter);
4065
Intvar_log_event::do_apply_event()
4068
#if defined(HAVE_REPLICATION)&& !defined(MYSQL_CLIENT)
4069
int Intvar_log_event::do_apply_event(Relay_log_info const *rli)
4072
We are now in a statement until the associated query log event has
4075
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
4078
case LAST_INSERT_ID_EVENT:
4079
thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1;
4080
thd->first_successful_insert_id_in_prev_stmt= val;
4082
case INSERT_ID_EVENT:
4083
thd->force_one_auto_inc_interval(val);
4089
int Intvar_log_event::do_update_pos(Relay_log_info *rli)
4091
rli->inc_event_relay_log_pos();
4096
Log_event::enum_skip_reason
4097
Intvar_log_event::do_shall_skip(Relay_log_info *rli)
4100
It is a common error to set the slave skip counter to 1 instead of
4101
2 when recovering from an insert which used a auto increment,
4102
rand, or user var. Therefore, if the slave skip counter is 1, we
4103
just say that this event should be skipped by ignoring it, meaning
4104
that we do not change the value of the slave skip counter since it
4105
will be decreased by the following insert event.
4107
return continue_group(rli);
4113
/**************************************************************************
4114
Rand_log_event methods
4115
**************************************************************************/
4117
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
4118
void Rand_log_event::pack_info(Protocol *protocol)
4120
char buf1[256], *pos;
4121
pos= strmov(buf1,"rand_seed1=");
4122
pos= int10_to_str((long) seed1, pos, 10);
4123
pos= strmov(pos, ",rand_seed2=");
4124
pos= int10_to_str((long) seed2, pos, 10);
4125
protocol->store(buf1, (uint) (pos-buf1), &my_charset_bin);
4130
Rand_log_event::Rand_log_event(const char* buf,
4131
const Format_description_log_event* description_event)
4132
:Log_event(buf, description_event)
4134
buf+= description_event->common_header_len;
4135
seed1= uint8korr(buf+RAND_SEED1_OFFSET);
4136
seed2= uint8korr(buf+RAND_SEED2_OFFSET);
4140
#ifndef MYSQL_CLIENT
4141
bool Rand_log_event::write(IO_CACHE* file)
4144
int8store(buf + RAND_SEED1_OFFSET, seed1);
4145
int8store(buf + RAND_SEED2_OFFSET, seed2);
4146
return (write_header(file, sizeof(buf)) ||
4147
my_b_safe_write(file, buf, sizeof(buf)));
4153
void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
4155
Write_on_release_cache cache(&print_event_info->head_cache, file,
4156
Write_on_release_cache::FLUSH_F);
4158
char llbuff[22],llbuff2[22];
4159
if (!print_event_info->short_form)
4161
print_header(&cache, print_event_info, false);
4162
my_b_printf(&cache, "\tRand\n");
4164
my_b_printf(&cache, "SET @@RAND_SEED1=%s, @@RAND_SEED2=%s%s\n",
4165
llstr(seed1, llbuff),llstr(seed2, llbuff2),
4166
print_event_info->delimiter);
4168
#endif /* MYSQL_CLIENT */
4171
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
4172
int Rand_log_event::do_apply_event(Relay_log_info const *rli)
4175
We are now in a statement until the associated query log event has
4178
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
4180
thd->rand.seed1= (ulong) seed1;
4181
thd->rand.seed2= (ulong) seed2;
4185
int Rand_log_event::do_update_pos(Relay_log_info *rli)
4187
rli->inc_event_relay_log_pos();
4192
Log_event::enum_skip_reason
4193
Rand_log_event::do_shall_skip(Relay_log_info *rli)
4196
It is a common error to set the slave skip counter to 1 instead of
4197
2 when recovering from an insert which used a auto increment,
4198
rand, or user var. Therefore, if the slave skip counter is 1, we
4199
just say that this event should be skipped by ignoring it, meaning
4200
that we do not change the value of the slave skip counter since it
4201
will be decreased by the following insert event.
4203
return continue_group(rli);
4206
#endif /* !MYSQL_CLIENT */
4209
/**************************************************************************
4210
Xid_log_event methods
4211
**************************************************************************/
4213
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
4214
void Xid_log_event::pack_info(Protocol *protocol)
4216
char buf[128], *pos;
4217
pos= strmov(buf, "COMMIT /* xid=");
4218
pos= longlong10_to_str(xid, pos, 10);
4219
pos= strmov(pos, " */");
4220
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
4226
It's ok not to use int8store here,
4227
as long as xid_t::set(ulonglong) and
4228
xid_t::get_my_xid doesn't do it either.
4229
We don't care about actual values of xids as long as
4230
identical numbers compare identically
4234
Xid_log_event(const char* buf,
4235
const Format_description_log_event *description_event)
4236
:Log_event(buf, description_event)
4238
buf+= description_event->common_header_len;
4239
memcpy((char*) &xid, buf, sizeof(xid));
4243
#ifndef MYSQL_CLIENT
4244
bool Xid_log_event::write(IO_CACHE* file)
4246
return write_header(file, sizeof(xid)) ||
4247
my_b_safe_write(file, (uchar*) &xid, sizeof(xid));
4253
void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
4255
Write_on_release_cache cache(&print_event_info->head_cache, file,
4256
Write_on_release_cache::FLUSH_F);
4258
if (!print_event_info->short_form)
4261
longlong10_to_str(xid, buf, 10);
4263
print_header(&cache, print_event_info, false);
4264
my_b_printf(&cache, "\tXid = %s\n", buf);
4266
my_b_printf(&cache, "COMMIT%s\n", print_event_info->delimiter);
4268
#endif /* MYSQL_CLIENT */
4271
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
4272
int Xid_log_event::do_apply_event(Relay_log_info const *rli __attribute__((__unused__)))
4274
/* For a slave Xid_log_event is COMMIT */
4275
general_log_print(thd, COM_QUERY,
4276
"COMMIT /* implicit, from Xid_log_event */");
4277
return end_trans(thd, COMMIT);
4280
Log_event::enum_skip_reason
4281
Xid_log_event::do_shall_skip(Relay_log_info *rli)
4283
if (rli->slave_skip_counter > 0) {
4284
thd->options&= ~OPTION_BEGIN;
4285
return(Log_event::EVENT_SKIP_COUNT);
4287
return(Log_event::do_shall_skip(rli));
4289
#endif /* !MYSQL_CLIENT */
4292
/**************************************************************************
4293
User_var_log_event methods
4294
**************************************************************************/
4296
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
4297
void User_var_log_event::pack_info(Protocol* protocol)
4300
uint val_offset= 4 + name_len;
4301
uint event_len= val_offset;
4305
if (!(buf= (char*) my_malloc(val_offset + 5, MYF(MY_WME))))
4307
strmov(buf + val_offset, "NULL");
4308
event_len= val_offset + 4;
4315
float8get(real_val, val);
4316
if (!(buf= (char*) my_malloc(val_offset + MY_GCVT_MAX_FIELD_WIDTH + 1,
4319
event_len+= my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, MY_GCVT_MAX_FIELD_WIDTH,
4320
buf + val_offset, NULL);
4323
if (!(buf= (char*) my_malloc(val_offset + 22, MYF(MY_WME))))
4325
event_len= longlong10_to_str(uint8korr(val), buf + val_offset,-10)-buf;
4327
case DECIMAL_RESULT:
4329
if (!(buf= (char*) my_malloc(val_offset + DECIMAL_MAX_STR_LENGTH,
4332
String str(buf+val_offset, DECIMAL_MAX_STR_LENGTH, &my_charset_bin);
4334
binary2my_decimal(E_DEC_FATAL_ERROR, (uchar*) (val+2), &dec, val[0],
4336
my_decimal2string(E_DEC_FATAL_ERROR, &dec, 0, 0, 0, &str);
4337
event_len= str.length() + val_offset;
4341
/* 15 is for 'COLLATE' and other chars */
4342
buf= (char*) my_malloc(event_len+val_len*2+1+2*MY_CS_NAME_SIZE+15,
4347
if (!(cs= get_charset(charset_number, MYF(0))))
4349
strmov(buf+val_offset, "???");
4354
char *p= strxmov(buf + val_offset, "_", cs->csname, " ", NullS);
4355
p= str_to_hex(p, val, val_len);
4356
p= strxmov(p, " COLLATE ", cs->name, NullS);
4368
memcpy(buf+2, name, name_len);
4369
buf[2+name_len]= '`';
4370
buf[3+name_len]= '=';
4371
protocol->store(buf, event_len, &my_charset_bin);
4372
my_free(buf, MYF(0));
4374
#endif /* !MYSQL_CLIENT */
4377
User_var_log_event::
4378
User_var_log_event(const char* buf,
4379
const Format_description_log_event* description_event)
4380
:Log_event(buf, description_event)
4382
buf+= description_event->common_header_len;
4383
name_len= uint4korr(buf);
4384
name= (char *) buf + UV_NAME_LEN_SIZE;
4385
buf+= UV_NAME_LEN_SIZE + name_len;
4386
is_null= (bool) *buf;
4389
type= STRING_RESULT;
4390
charset_number= my_charset_bin.number;
4396
type= (Item_result) buf[UV_VAL_IS_NULL];
4397
charset_number= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE);
4398
val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
4399
UV_CHARSET_NUMBER_SIZE);
4400
val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
4401
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE);
4406
#ifndef MYSQL_CLIENT
4407
bool User_var_log_event::write(IO_CACHE* file)
4409
char buf[UV_NAME_LEN_SIZE];
4410
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
4411
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
4412
uchar buf2[max(8, DECIMAL_MAX_FIELD_SIZE + 2)], *pos= buf2;
4416
int4store(buf, name_len);
4418
if ((buf1[0]= is_null))
4421
val_len= 0; // Length of 'pos'
4426
int4store(buf1 + 2, charset_number);
4430
float8store(buf2, *(double*) val);
4433
int8store(buf2, *(longlong*) val);
4435
case DECIMAL_RESULT:
4437
my_decimal *dec= (my_decimal *)val;
4438
dec->fix_buffer_pointer();
4439
buf2[0]= (char)(dec->intg + dec->frac);
4440
buf2[1]= (char)dec->frac;
4441
decimal2bin((decimal_t*)val, buf2+2, buf2[0], buf2[1]);
4442
val_len= decimal_bin_size(buf2[0], buf2[1]) + 2;
4453
int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
4457
/* Length of the whole event */
4458
event_length= sizeof(buf)+ name_len + buf1_length + val_len;
4460
return (write_header(file, event_length) ||
4461
my_b_safe_write(file, (uchar*) buf, sizeof(buf)) ||
4462
my_b_safe_write(file, (uchar*) name, name_len) ||
4463
my_b_safe_write(file, (uchar*) buf1, buf1_length) ||
4464
my_b_safe_write(file, pos, val_len));
4470
User_var_log_event::print()
4474
void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
4476
Write_on_release_cache cache(&print_event_info->head_cache, file,
4477
Write_on_release_cache::FLUSH_F);
4479
if (!print_event_info->short_form)
4481
print_header(&cache, print_event_info, false);
4482
my_b_printf(&cache, "\tUser_var\n");
4485
my_b_printf(&cache, "SET @`");
4486
my_b_write(&cache, (uchar*) name, (uint) (name_len));
4487
my_b_printf(&cache, "`");
4491
my_b_printf(&cache, ":=NULL%s\n", print_event_info->delimiter);
4498
char real_buf[FMT_G_BUFSIZE(14)];
4499
float8get(real_val, val);
4500
my_sprintf(real_buf, (real_buf, "%.14g", real_val));
4501
my_b_printf(&cache, ":=%s%s\n", real_buf, print_event_info->delimiter);
4505
longlong10_to_str(uint8korr(val), int_buf, -10);
4506
my_b_printf(&cache, ":=%s%s\n", int_buf, print_event_info->delimiter);
4508
case DECIMAL_RESULT:
4511
int str_len= sizeof(str_buf) - 1;
4512
int precision= (int)val[0];
4513
int scale= (int)val[1];
4514
decimal_digit_t dec_buf[10];
4519
bin2decimal((uchar*) val+2, &dec, precision, scale);
4520
decimal2string(&dec, str_buf, &str_len, 0, 0, 0);
4521
str_buf[str_len]= 0;
4522
my_b_printf(&cache, ":=%s%s\n", str_buf, print_event_info->delimiter);
4528
Let's express the string in hex. That's the most robust way. If we
4529
print it in character form instead, we need to escape it with
4530
character_set_client which we don't know (we will know it in 5.0, but
4531
in 4.1 we don't know it easily when we are printing
4532
User_var_log_event). Explanation why we would need to bother with
4533
character_set_client (quoting Bar):
4534
> Note, the parser doesn't switch to another unescaping mode after
4535
> it has met a character set introducer.
4536
> For example, if an SJIS client says something like:
4537
> SET @a= _ucs2 \0a\0b'
4538
> the string constant is still unescaped according to SJIS, not
4539
> according to UCS2.
4544
if (!(hex_str= (char *)my_alloca(2*val_len+1+2))) // 2 hex digits / byte
4545
break; // no error, as we are 'void'
4546
str_to_hex(hex_str, val, val_len);
4548
For proper behaviour when mysqlbinlog|mysql, we need to explicitely
4549
specify the variable's collation. It will however cause problems when
4550
people want to mysqlbinlog|mysql into another server not supporting the
4551
character set. But there's not much to do about this and it's unlikely.
4553
if (!(cs= get_charset(charset_number, MYF(0))))
4555
Generate an unusable command (=> syntax error) is probably the best
4556
thing we can do here.
4558
my_b_printf(&cache, ":=???%s\n", print_event_info->delimiter);
4560
my_b_printf(&cache, ":=_%s %s COLLATE `%s`%s\n",
4561
cs->csname, hex_str, cs->name,
4562
print_event_info->delimiter);
4577
User_var_log_event::do_apply_event()
4580
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
4581
int User_var_log_event::do_apply_event(Relay_log_info const *rli)
4584
CHARSET_INFO *charset;
4585
if (!(charset= get_charset(charset_number, MYF(MY_WME))))
4587
LEX_STRING user_var_name;
4588
user_var_name.str= name;
4589
user_var_name.length= name_len;
4594
We are now in a statement until the associated query log event has
4597
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
4601
it= new Item_null();
4607
float8get(real_val, val);
4608
it= new Item_float(real_val, 0);
4609
val= (char*) &real_val; // Pointer to value in native format
4613
int_val= (longlong) uint8korr(val);
4614
it= new Item_int(int_val);
4615
val= (char*) &int_val; // Pointer to value in native format
4618
case DECIMAL_RESULT:
4620
Item_decimal *dec= new Item_decimal((uchar*) val+2, val[0], val[1]);
4622
val= (char *)dec->val_decimal(NULL);
4623
val_len= sizeof(my_decimal);
4627
it= new Item_string(val, val_len, charset);
4635
Item_func_set_user_var e(user_var_name, it);
4637
Item_func_set_user_var can't substitute something else on its place =>
4638
0 can be passed as last argument (reference on item)
4640
e.fix_fields(thd, 0);
4642
A variable can just be considered as a table with
4643
a single record and with a single column. Thus, like
4644
a column value, it could always have IMPLICIT derivation.
4646
e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0);
4647
free_root(thd->mem_root,0);
4652
int User_var_log_event::do_update_pos(Relay_log_info *rli)
4654
rli->inc_event_relay_log_pos();
4658
Log_event::enum_skip_reason
4659
User_var_log_event::do_shall_skip(Relay_log_info *rli)
4662
It is a common error to set the slave skip counter to 1 instead
4663
of 2 when recovering from an insert which used a auto increment,
4664
rand, or user var. Therefore, if the slave skip counter is 1, we
4665
just say that this event should be skipped by ignoring it, meaning
4666
that we do not change the value of the slave skip counter since it
4667
will be decreased by the following insert event.
4669
return continue_group(rli);
4671
#endif /* !MYSQL_CLIENT */
4674
/**************************************************************************
4675
Slave_log_event methods
4676
**************************************************************************/
4678
#ifdef HAVE_REPLICATION
4680
void Unknown_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info)
4682
Write_on_release_cache cache(&print_event_info->head_cache, file_arg);
4684
if (print_event_info->short_form)
4686
print_header(&cache, print_event_info, false);
4687
my_b_printf(&cache, "\n# %s", "Unknown event\n");
4691
#ifndef MYSQL_CLIENT
4692
void Slave_log_event::pack_info(Protocol *protocol)
4694
char buf[256+HOSTNAME_LENGTH], *pos;
4695
pos= strmov(buf, "host=");
4696
pos= strnmov(pos, master_host, HOSTNAME_LENGTH);
4697
pos= strmov(pos, ",port=");
4698
pos= int10_to_str((long) master_port, pos, 10);
4699
pos= strmov(pos, ",log=");
4700
pos= strmov(pos, master_log);
4701
pos= strmov(pos, ",pos=");
4702
pos= longlong10_to_str(master_pos, pos, 10);
4703
protocol->store(buf, pos-buf, &my_charset_bin);
4705
#endif /* !MYSQL_CLIENT */
4708
#ifndef MYSQL_CLIENT
4711
re-write this better without holding both locks at the same time
4713
Slave_log_event::Slave_log_event(THD* thd_arg,
4714
Relay_log_info* rli)
4715
:Log_event(thd_arg, 0, 0) , mem_pool(0), master_host(0)
4717
if (!rli->inited) // QQ When can this happen ?
4720
Master_info* mi = rli->mi;
4721
// TODO: re-write this better without holding both locks at the same time
4722
pthread_mutex_lock(&mi->data_lock);
4723
pthread_mutex_lock(&rli->data_lock);
4724
master_host_len = strlen(mi->host);
4725
master_log_len = strlen(rli->group_master_log_name);
4726
// on OOM, just do not initialize the structure and print the error
4727
if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
4730
master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
4731
memcpy(master_host, mi->host, master_host_len + 1);
4732
master_log = master_host + master_host_len + 1;
4733
memcpy(master_log, rli->group_master_log_name, master_log_len + 1);
4734
master_port = mi->port;
4735
master_pos = rli->group_master_log_pos;
4738
sql_print_error("Out of memory while recording slave event");
4739
pthread_mutex_unlock(&rli->data_lock);
4740
pthread_mutex_unlock(&mi->data_lock);
4743
#endif /* !MYSQL_CLIENT */
4746
Slave_log_event::~Slave_log_event()
4748
my_free(mem_pool, MYF(MY_ALLOW_ZERO_PTR));
4753
void Slave_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
4755
Write_on_release_cache cache(&print_event_info->head_cache, file);
4758
if (print_event_info->short_form)
4760
print_header(&cache, print_event_info, false);
4761
my_b_printf(&cache, "\n\
4762
Slave: master_host: '%s' master_port: %d master_log: '%s' master_pos: %s\n",
4763
master_host, master_port, master_log, llstr(master_pos, llbuff));
4765
#endif /* MYSQL_CLIENT */
4768
int Slave_log_event::get_data_size()
4770
return master_host_len + master_log_len + 1 + SL_MASTER_HOST_OFFSET;
4774
#ifndef MYSQL_CLIENT
4775
bool Slave_log_event::write(IO_CACHE* file)
4777
ulong event_length= get_data_size();
4778
int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
4779
int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
4780
// log and host are already there
4782
return (write_header(file, event_length) ||
4783
my_b_safe_write(file, (uchar*) mem_pool, event_length));
4788
void Slave_log_event::init_from_mem_pool(int data_size)
4790
master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
4791
master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
4792
master_host = mem_pool + SL_MASTER_HOST_OFFSET;
4793
master_host_len = strlen(master_host);
4795
master_log = master_host + master_host_len + 1;
4796
if (master_log > mem_pool + data_size)
4801
master_log_len = strlen(master_log);
4805
/** This code is not used, so has not been updated to be format-tolerant. */
4806
Slave_log_event::Slave_log_event(const char* buf, uint event_len)
4807
:Log_event(buf,0) /*unused event*/ ,mem_pool(0),master_host(0)
4809
if (event_len < LOG_EVENT_HEADER_LEN)
4811
event_len -= LOG_EVENT_HEADER_LEN;
4812
if (!(mem_pool = (char*) my_malloc(event_len + 1, MYF(MY_WME))))
4814
memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len);
4815
mem_pool[event_len] = 0;
4816
init_from_mem_pool(event_len);
4820
#ifndef MYSQL_CLIENT
4821
int Slave_log_event::do_apply_event(Relay_log_info const *rli __attribute__((__unused__)))
4823
if (mysql_bin_log.is_open())
4824
mysql_bin_log.write(this);
4827
#endif /* !MYSQL_CLIENT */
4830
/**************************************************************************
4831
Stop_log_event methods
4832
**************************************************************************/
4835
Stop_log_event::print()
4839
void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
4841
Write_on_release_cache cache(&print_event_info->head_cache, file,
4842
Write_on_release_cache::FLUSH_F);
4844
if (print_event_info->short_form)
4847
print_header(&cache, print_event_info, false);
4848
my_b_printf(&cache, "\tStop\n");
4850
#endif /* MYSQL_CLIENT */
4853
#ifndef MYSQL_CLIENT
4855
The master stopped. We used to clean up all temporary tables but
4856
this is useless as, as the master has shut down properly, it has
4857
written all DROP TEMPORARY TABLE (prepared statements' deletion is
4858
TODO only when we binlog prep stmts). We used to clean up
4859
slave_load_tmpdir, but this is useless as it has been cleared at the
4860
end of LOAD DATA INFILE. So we have nothing to do here. The place
4861
were we must do this cleaning is in
4862
Start_log_event_v3::do_apply_event(), not here. Because if we come
4863
here, the master was sane.
4865
int Stop_log_event::do_update_pos(Relay_log_info *rli)
4868
We do not want to update master_log pos because we get a rotate event
4869
before stop, so by now group_master_log_name is set to the next log.
4870
If we updated it, we will have incorrect master coordinates and this
4871
could give false triggers in MASTER_POS_WAIT() that we have reached
4872
the target position when in fact we have not.
4874
if (thd->options & OPTION_BEGIN)
4875
rli->inc_event_relay_log_pos();
4878
rli->inc_group_relay_log_pos(0);
4879
flush_relay_log_info(rli);
4884
#endif /* !MYSQL_CLIENT */
4885
#endif /* HAVE_REPLICATION */
4888
/**************************************************************************
4889
Create_file_log_event methods
4890
**************************************************************************/
4893
Create_file_log_event ctor
4896
#ifndef MYSQL_CLIENT
4897
Create_file_log_event::
4898
Create_file_log_event(THD* thd_arg, sql_exchange* ex,
4899
const char* db_arg, const char* table_name_arg,
4900
List<Item>& fields_arg, enum enum_duplicates handle_dup,
4902
uchar* block_arg, uint block_len_arg, bool using_trans)
4903
:Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
4905
fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
4906
file_id(thd_arg->file_id = mysql_bin_log.next_file_id())
4908
sql_ex.force_new_format();
4914
Create_file_log_event::write_data_body()
4917
bool Create_file_log_event::write_data_body(IO_CACHE* file)
4920
if ((res= Load_log_event::write_data_body(file)) || fake_base)
4922
return (my_b_safe_write(file, (uchar*) "", 1) ||
4923
my_b_safe_write(file, (uchar*) block, block_len));
4928
Create_file_log_event::write_data_header()
4931
bool Create_file_log_event::write_data_header(IO_CACHE* file)
4934
uchar buf[CREATE_FILE_HEADER_LEN];
4935
if ((res= Load_log_event::write_data_header(file)) || fake_base)
4937
int4store(buf + CF_FILE_ID_OFFSET, file_id);
4938
return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
4943
Create_file_log_event::write_base()
4946
bool Create_file_log_event::write_base(IO_CACHE* file)
4949
fake_base= 1; // pretend we are Load event
4955
#endif /* !MYSQL_CLIENT */
4958
Create_file_log_event ctor
4961
Create_file_log_event::Create_file_log_event(const char* buf, uint len,
4962
const Format_description_log_event* description_event)
4963
:Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0)
4966
uint header_len= description_event->common_header_len;
4967
uint8 load_header_len= description_event->post_header_len[LOAD_EVENT-1];
4968
uint8 create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
4969
if (!(event_buf= (char*) my_memdup(buf, len, MYF(MY_WME))) ||
4970
copy_log_event(event_buf,len,
4971
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
4972
load_header_len + header_len :
4973
(fake_base ? (header_len+load_header_len) :
4974
(header_len+load_header_len) +
4975
create_file_header_len)),
4978
if (description_event->binlog_version!=1)
4980
file_id= uint4korr(buf +
4982
load_header_len + CF_FILE_ID_OFFSET);
4984
Note that it's ok to use get_data_size() below, because it is computed
4985
with values we have already read from this event (because we called
4986
copy_log_event()); we are not using slave's format info to decode
4987
master's format, we are really using master's format info.
4988
Anyway, both formats should be identical (except the common_header_len)
4989
as these Load events are not changed between 4.0 and 5.0 (as logging of
4990
LOAD DATA INFILE does not use Load_log_event in 5.0).
4992
The + 1 is for \0 terminating fname
4994
block_offset= (description_event->common_header_len +
4995
Load_log_event::get_data_size() +
4996
create_file_header_len + 1);
4997
if (len < block_offset)
4999
block = (uchar*)buf + block_offset;
5000
block_len = len - block_offset;
5004
sql_ex.force_new_format();
5005
inited_from_old = 1;
5012
Create_file_log_event::print()
5016
void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info,
5019
Write_on_release_cache cache(&print_event_info->head_cache, file);
5021
if (print_event_info->short_form)
5023
if (enable_local && check_fname_outside_temp_buf())
5024
Load_log_event::print(file, print_event_info);
5030
Load_log_event::print(file, print_event_info,
5031
!check_fname_outside_temp_buf());
5033
That one is for "file_id: etc" below: in mysqlbinlog we want the #, in
5034
SHOW BINLOG EVENTS we don't.
5036
my_b_printf(&cache, "#");
5039
my_b_printf(&cache, " file_id: %d block_len: %d\n", file_id, block_len);
5043
void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
5045
print(file, print_event_info, 0);
5047
#endif /* MYSQL_CLIENT */
5051
Create_file_log_event::pack_info()
5054
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
5055
void Create_file_log_event::pack_info(Protocol *protocol)
5057
char buf[NAME_LEN*2 + 30 + 21*2], *pos;
5058
pos= strmov(buf, "db=");
5059
memcpy(pos, db, db_len);
5060
pos= strmov(pos + db_len, ";table=");
5061
memcpy(pos, table_name, table_name_len);
5062
pos= strmov(pos + table_name_len, ";file_id=");
5063
pos= int10_to_str((long) file_id, pos, 10);
5064
pos= strmov(pos, ";block_len=");
5065
pos= int10_to_str((long) block_len, pos, 10);
5066
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
5068
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
5072
Create_file_log_event::do_apply_event()
5075
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
5076
int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
5078
char proc_info[17+FN_REFLEN+10], *fname_buf;
5084
bzero((char*)&file, sizeof(file));
5085
fname_buf= strmov(proc_info, "Making temp file ");
5086
ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
5087
thd_proc_info(thd, proc_info);
5088
my_delete(fname_buf, MYF(0)); // old copy may exist already
5089
if ((fd= my_create(fname_buf, CREATE_MODE,
5090
O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
5091
MYF(MY_WME))) < 0 ||
5092
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
5093
MYF(MY_WME|MY_NABP)))
5095
rli->report(ERROR_LEVEL, my_errno,
5096
"Error in Create_file event: could not open file '%s'",
5101
// a trick to avoid allocating another buffer
5103
fname_len= (uint) (strmov(ext, ".data") - fname);
5104
if (write_base(&file))
5106
strmov(ext, ".info"); // to have it right in the error message
5107
rli->report(ERROR_LEVEL, my_errno,
5108
"Error in Create_file event: could not write to file '%s'",
5112
end_io_cache(&file);
5113
my_close(fd, MYF(0));
5115
// fname_buf now already has .data, not .info, because we did our trick
5116
my_delete(fname_buf, MYF(0)); // old copy may exist already
5117
if ((fd= my_create(fname_buf, CREATE_MODE,
5118
O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
5121
rli->report(ERROR_LEVEL, my_errno,
5122
"Error in Create_file event: could not open file '%s'",
5126
if (my_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP)))
5128
rli->report(ERROR_LEVEL, my_errno,
5129
"Error in Create_file event: write to '%s' failed",
5133
error=0; // Everything is ok
5137
end_io_cache(&file);
5139
my_close(fd, MYF(0));
5140
thd_proc_info(thd, 0);
5143
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
5146
/**************************************************************************
5147
Append_block_log_event methods
5148
**************************************************************************/
5151
Append_block_log_event ctor
5154
#ifndef MYSQL_CLIENT
5155
Append_block_log_event::Append_block_log_event(THD *thd_arg,
5160
:Log_event(thd_arg,0, using_trans), block(block_arg),
5161
block_len(block_len_arg), file_id(thd_arg->file_id), db(db_arg)
5168
Append_block_log_event ctor
5171
Append_block_log_event::Append_block_log_event(const char* buf, uint len,
5172
const Format_description_log_event* description_event)
5173
:Log_event(buf, description_event),block(0)
5175
uint8 common_header_len= description_event->common_header_len;
5176
uint8 append_block_header_len=
5177
description_event->post_header_len[APPEND_BLOCK_EVENT-1];
5178
uint total_header_len= common_header_len+append_block_header_len;
5179
if (len < total_header_len)
5181
file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
5182
block= (uchar*)buf + total_header_len;
5183
block_len= len - total_header_len;
5189
Append_block_log_event::write()
5192
#ifndef MYSQL_CLIENT
5193
bool Append_block_log_event::write(IO_CACHE* file)
5195
uchar buf[APPEND_BLOCK_HEADER_LEN];
5196
int4store(buf + AB_FILE_ID_OFFSET, file_id);
5197
return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
5198
my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
5199
my_b_safe_write(file, (uchar*) block, block_len));
5205
Append_block_log_event::print()
5209
void Append_block_log_event::print(FILE* file,
5210
PRINT_EVENT_INFO* print_event_info)
5212
Write_on_release_cache cache(&print_event_info->head_cache, file);
5214
if (print_event_info->short_form)
5216
print_header(&cache, print_event_info, false);
5217
my_b_printf(&cache, "\n#%s: file_id: %d block_len: %d\n",
5218
get_type_str(), file_id, block_len);
5220
#endif /* MYSQL_CLIENT */
5224
Append_block_log_event::pack_info()
5227
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
5228
void Append_block_log_event::pack_info(Protocol *protocol)
5232
length= (uint) my_sprintf(buf,
5233
(buf, ";file_id=%u;block_len=%u", file_id,
5235
protocol->store(buf, length, &my_charset_bin);
5240
Append_block_log_event::get_create_or_append()
5243
int Append_block_log_event::get_create_or_append() const
5245
return 0; /* append to the file, fail if not exists */
5249
Append_block_log_event::do_apply_event()
5252
int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
5254
char proc_info[17+FN_REFLEN+10], *fname= proc_info+17;
5258
fname= strmov(proc_info, "Making temp file ");
5259
slave_load_file_stem(fname, file_id, server_id, ".data");
5260
thd_proc_info(thd, proc_info);
5261
if (get_create_or_append())
5263
my_delete(fname, MYF(0)); // old copy may exist already
5264
if ((fd= my_create(fname, CREATE_MODE,
5265
O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
5268
rli->report(ERROR_LEVEL, my_errno,
5269
"Error in %s event: could not create file '%s'",
5270
get_type_str(), fname);
5274
else if ((fd = my_open(fname, O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW,
5277
rli->report(ERROR_LEVEL, my_errno,
5278
"Error in %s event: could not open file '%s'",
5279
get_type_str(), fname);
5282
if (my_write(fd, (uchar*) block, block_len, MYF(MY_WME+MY_NABP)))
5284
rli->report(ERROR_LEVEL, my_errno,
5285
"Error in %s event: write to '%s' failed",
5286
get_type_str(), fname);
5293
my_close(fd, MYF(0));
5294
thd_proc_info(thd, 0);
5300
/**************************************************************************
5301
Delete_file_log_event methods
5302
**************************************************************************/
5305
Delete_file_log_event ctor
5308
#ifndef MYSQL_CLIENT
5309
Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg,
5311
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
5317
Delete_file_log_event ctor
5320
Delete_file_log_event::Delete_file_log_event(const char* buf, uint len,
5321
const Format_description_log_event* description_event)
5322
:Log_event(buf, description_event),file_id(0)
5324
uint8 common_header_len= description_event->common_header_len;
5325
uint8 delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1];
5326
if (len < (uint)(common_header_len + delete_file_header_len))
5328
file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET);
5333
Delete_file_log_event::write()
5336
#ifndef MYSQL_CLIENT
5337
bool Delete_file_log_event::write(IO_CACHE* file)
5339
uchar buf[DELETE_FILE_HEADER_LEN];
5340
int4store(buf + DF_FILE_ID_OFFSET, file_id);
5341
return (write_header(file, sizeof(buf)) ||
5342
my_b_safe_write(file, buf, sizeof(buf)));
5348
Delete_file_log_event::print()
5352
void Delete_file_log_event::print(FILE* file,
5353
PRINT_EVENT_INFO* print_event_info)
5355
Write_on_release_cache cache(&print_event_info->head_cache, file);
5357
if (print_event_info->short_form)
5359
print_header(&cache, print_event_info, false);
5360
my_b_printf(&cache, "\n#Delete_file: file_id=%u\n", file_id);
5362
#endif /* MYSQL_CLIENT */
5365
Delete_file_log_event::pack_info()
5368
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
5369
void Delete_file_log_event::pack_info(Protocol *protocol)
5373
length= (uint) my_sprintf(buf, (buf, ";file_id=%u", (uint) file_id));
5374
protocol->store(buf, (int32) length, &my_charset_bin);
5379
Delete_file_log_event::do_apply_event()
5382
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
5383
int Delete_file_log_event::do_apply_event(Relay_log_info const *rli __attribute__((__unused__)))
5385
char fname[FN_REFLEN+10];
5386
char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
5387
(void) my_delete(fname, MYF(MY_WME));
5388
strmov(ext, ".info");
5389
(void) my_delete(fname, MYF(MY_WME));
5392
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
5395
/**************************************************************************
5396
Execute_load_log_event methods
5397
**************************************************************************/
5400
Execute_load_log_event ctor
5403
#ifndef MYSQL_CLIENT
5404
Execute_load_log_event::Execute_load_log_event(THD *thd_arg,
5407
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
5414
Execute_load_log_event ctor
5417
Execute_load_log_event::Execute_load_log_event(const char* buf, uint len,
5418
const Format_description_log_event* description_event)
5419
:Log_event(buf, description_event), file_id(0)
5421
uint8 common_header_len= description_event->common_header_len;
5422
uint8 exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1];
5423
if (len < (uint)(common_header_len+exec_load_header_len))
5425
file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET);
5430
Execute_load_log_event::write()
5433
#ifndef MYSQL_CLIENT
5434
bool Execute_load_log_event::write(IO_CACHE* file)
5436
uchar buf[EXEC_LOAD_HEADER_LEN];
5437
int4store(buf + EL_FILE_ID_OFFSET, file_id);
5438
return (write_header(file, sizeof(buf)) ||
5439
my_b_safe_write(file, buf, sizeof(buf)));
5445
Execute_load_log_event::print()
5449
void Execute_load_log_event::print(FILE* file,
5450
PRINT_EVENT_INFO* print_event_info)
5452
Write_on_release_cache cache(&print_event_info->head_cache, file);
5454
if (print_event_info->short_form)
5456
print_header(&cache, print_event_info, false);
5457
my_b_printf(&cache, "\n#Exec_load: file_id=%d\n",
5463
Execute_load_log_event::pack_info()
5466
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
5467
void Execute_load_log_event::pack_info(Protocol *protocol)
5471
length= (uint) my_sprintf(buf, (buf, ";file_id=%u", (uint) file_id));
5472
protocol->store(buf, (int32) length, &my_charset_bin);
5477
Execute_load_log_event::do_apply_event()
5480
int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
5482
char fname[FN_REFLEN+10];
5487
Load_log_event *lev= 0;
5489
ext= slave_load_file_stem(fname, file_id, server_id, ".info");
5490
if ((fd = my_open(fname, O_RDONLY | O_BINARY | O_NOFOLLOW,
5491
MYF(MY_WME))) < 0 ||
5492
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
5493
MYF(MY_WME|MY_NABP)))
5495
rli->report(ERROR_LEVEL, my_errno,
5496
"Error in Exec_load event: could not open file '%s'",
5500
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
5501
(pthread_mutex_t*)0,
5502
rli->relay_log.description_event_for_exec)) ||
5503
lev->get_type_code() != NEW_LOAD_EVENT)
5505
rli->report(ERROR_LEVEL, 0, "Error in Exec_load event: "
5506
"file '%s' appears corrupted", fname);
5512
lev->do_apply_event should use rli only for errors i.e. should
5513
not advance rli's position.
5515
lev->do_apply_event is the place where the table is loaded (it
5516
calls mysql_load()).
5519
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
5520
if (lev->do_apply_event(0,rli,1))
5523
We want to indicate the name of the file that could not be loaded
5525
But as we are here we are sure the error is in rli->last_slave_error and
5526
rli->last_slave_errno (example of error: duplicate entry for key), so we
5527
don't want to overwrite it with the filename.
5528
What we want instead is add the filename to the current error message.
5530
char *tmp= my_strdup(rli->last_error().message, MYF(MY_WME));
5533
rli->report(ERROR_LEVEL, rli->last_error().number,
5534
"%s. Failed executing load from '%s'", tmp, fname);
5535
my_free(tmp,MYF(0));
5540
We have an open file descriptor to the .info file; we need to close it
5541
or Windows will refuse to delete the file in my_delete().
5545
my_close(fd, MYF(0));
5546
end_io_cache(&file);
5549
(void) my_delete(fname, MYF(MY_WME));
5550
memcpy(ext, ".data", 6);
5551
(void) my_delete(fname, MYF(MY_WME));
5558
my_close(fd, MYF(0));
5559
end_io_cache(&file);
5564
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
5567
/**************************************************************************
5568
Begin_load_query_log_event methods
5569
**************************************************************************/
5571
#ifndef MYSQL_CLIENT
5572
Begin_load_query_log_event::
5573
Begin_load_query_log_event(THD* thd_arg, const char* db_arg, uchar* block_arg,
5574
uint block_len_arg, bool using_trans)
5575
:Append_block_log_event(thd_arg, db_arg, block_arg, block_len_arg,
5578
file_id= thd_arg->file_id= mysql_bin_log.next_file_id();
5583
Begin_load_query_log_event::
5584
Begin_load_query_log_event(const char* buf, uint len,
5585
const Format_description_log_event* desc_event)
5586
:Append_block_log_event(buf, len, desc_event)
5591
#if defined( HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
5592
int Begin_load_query_log_event::get_create_or_append() const
5594
return 1; /* create the file */
5596
#endif /* defined( HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
5599
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
5600
Log_event::enum_skip_reason
5601
Begin_load_query_log_event::do_shall_skip(Relay_log_info *rli)
5604
If the slave skip counter is 1, then we should not start executing
5607
return continue_group(rli);
5612
/**************************************************************************
5613
Execute_load_query_log_event methods
5614
**************************************************************************/
5617
#ifndef MYSQL_CLIENT
5618
Execute_load_query_log_event::
5619
Execute_load_query_log_event(THD *thd_arg, const char* query_arg,
5620
ulong query_length_arg, uint fn_pos_start_arg,
5621
uint fn_pos_end_arg,
5622
enum_load_dup_handling dup_handling_arg,
5623
bool using_trans, bool suppress_use,
5624
THD::killed_state killed_err_arg):
5625
Query_log_event(thd_arg, query_arg, query_length_arg, using_trans,
5626
suppress_use, killed_err_arg),
5627
file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg),
5628
fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
5631
#endif /* !MYSQL_CLIENT */
5634
Execute_load_query_log_event::
5635
Execute_load_query_log_event(const char* buf, uint event_len,
5636
const Format_description_log_event* desc_event):
5637
Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT),
5638
file_id(0), fn_pos_start(0), fn_pos_end(0)
5640
if (!Query_log_event::is_valid())
5643
buf+= desc_event->common_header_len;
5645
fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET);
5646
fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET);
5647
dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET));
5649
if (fn_pos_start > q_len || fn_pos_end > q_len ||
5650
dup_handling > LOAD_DUP_REPLACE)
5653
file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET);
5657
ulong Execute_load_query_log_event::get_post_header_size_for_derived()
5659
return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN;
5663
#ifndef MYSQL_CLIENT
5665
Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
5667
uchar buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
5668
int4store(buf, file_id);
5669
int4store(buf + 4, fn_pos_start);
5670
int4store(buf + 4 + 4, fn_pos_end);
5671
*(buf + 4 + 4 + 4)= (uchar) dup_handling;
5672
return my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
5678
void Execute_load_query_log_event::print(FILE* file,
5679
PRINT_EVENT_INFO* print_event_info)
5681
print(file, print_event_info, 0);
5685
Prints the query as LOAD DATA LOCAL and with rewritten filename.
5687
void Execute_load_query_log_event::print(FILE* file,
5688
PRINT_EVENT_INFO* print_event_info,
5689
const char *local_fname)
5691
Write_on_release_cache cache(&print_event_info->head_cache, file);
5693
print_query_header(&cache, print_event_info);
5697
my_b_write(&cache, (uchar*) query, fn_pos_start);
5698
my_b_printf(&cache, " LOCAL INFILE \'");
5699
my_b_printf(&cache, local_fname);
5700
my_b_printf(&cache, "\'");
5701
if (dup_handling == LOAD_DUP_REPLACE)
5702
my_b_printf(&cache, " REPLACE");
5703
my_b_printf(&cache, " INTO");
5704
my_b_write(&cache, (uchar*) query + fn_pos_end, q_len-fn_pos_end);
5705
my_b_printf(&cache, "\n%s\n", print_event_info->delimiter);
5709
my_b_write(&cache, (uchar*) query, q_len);
5710
my_b_printf(&cache, "\n%s\n", print_event_info->delimiter);
5713
if (!print_event_info->short_form)
5714
my_b_printf(&cache, "# file_id: %d \n", file_id);
5719
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
5720
void Execute_load_query_log_event::pack_info(Protocol *protocol)
5723
if (!(buf= (char*) my_malloc(9 + db_len + q_len + 10 + 21, MYF(MY_WME))))
5728
pos= strmov(buf, "use `");
5729
memcpy(pos, db, db_len);
5730
pos= strmov(pos+db_len, "`; ");
5734
memcpy(pos, query, q_len);
5737
pos= strmov(pos, " ;file_id=");
5738
pos= int10_to_str((long) file_id, pos, 10);
5739
protocol->store(buf, pos-buf, &my_charset_bin);
5740
my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
5745
Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli)
5753
buf= (char*) my_malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
5754
(FN_REFLEN + 10) + 10 + 8 + 5, MYF(MY_WME));
5756
/* Replace filename and LOCAL keyword in query before executing it */
5759
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
5760
ER(ER_SLAVE_FATAL_ERROR), "Not enough memory");
5765
memcpy(p, query, fn_pos_start);
5767
fname= (p= strmake(p, STRING_WITH_LEN(" INFILE \'")));
5768
p= slave_load_file_stem(p, file_id, server_id, ".data");
5769
fname_end= p= strend(p); // Safer than p=p+5
5771
switch (dup_handling) {
5772
case LOAD_DUP_IGNORE:
5773
p= strmake(p, STRING_WITH_LEN(" IGNORE"));
5775
case LOAD_DUP_REPLACE:
5776
p= strmake(p, STRING_WITH_LEN(" REPLACE"));
5779
/* Ordinary load data */
5782
p= strmake(p, STRING_WITH_LEN(" INTO"));
5783
p= strmake(p, query+fn_pos_end, q_len-fn_pos_end);
5785
error= Query_log_event::do_apply_event(rli, buf, p-buf);
5787
/* Forging file name for deletion in same buffer */
5791
If there was an error the slave is going to stop, leave the
5792
file so that we can re-execute this event at START SLAVE.
5795
(void) my_delete(fname, MYF(MY_WME));
5797
my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
5803
/**************************************************************************
5805
**************************************************************************/
5808
sql_ex_info::write_data()
5811
bool sql_ex_info::write_data(IO_CACHE* file)
5815
return (write_str(file, field_term, (uint) field_term_len) ||
5816
write_str(file, enclosed, (uint) enclosed_len) ||
5817
write_str(file, line_term, (uint) line_term_len) ||
5818
write_str(file, line_start, (uint) line_start_len) ||
5819
write_str(file, escaped, (uint) escaped_len) ||
5820
my_b_safe_write(file,(uchar*) &opt_flags,1));
5825
@todo This is sensitive to field padding. We should write a
5826
char[7], not an old_sql_ex. /sven
5829
old_ex.field_term= *field_term;
5830
old_ex.enclosed= *enclosed;
5831
old_ex.line_term= *line_term;
5832
old_ex.line_start= *line_start;
5833
old_ex.escaped= *escaped;
5834
old_ex.opt_flags= opt_flags;
5835
old_ex.empty_flags=empty_flags;
5836
return my_b_safe_write(file, (uchar*) &old_ex, sizeof(old_ex)) != 0;
5845
const char *sql_ex_info::init(const char *buf, const char *buf_end,
5846
bool use_new_format)
5848
cached_new_format = use_new_format;
5853
The code below assumes that buf will not disappear from
5854
under our feet during the lifetime of the event. This assumption
5855
holds true in the slave thread if the log is in new format, but is not
5856
the case when we have old format because we will be reusing net buffer
5857
to read the actual file before we write out the Create_file event.
5859
if (read_str(&buf, buf_end, &field_term, &field_term_len) ||
5860
read_str(&buf, buf_end, &enclosed, &enclosed_len) ||
5861
read_str(&buf, buf_end, &line_term, &line_term_len) ||
5862
read_str(&buf, buf_end, &line_start, &line_start_len) ||
5863
read_str(&buf, buf_end, &escaped, &escaped_len))
5869
field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
5870
field_term = buf++; // Use first byte in string
5876
empty_flags= *buf++;
5877
if (empty_flags & FIELD_TERM_EMPTY)
5879
if (empty_flags & ENCLOSED_EMPTY)
5881
if (empty_flags & LINE_TERM_EMPTY)
5883
if (empty_flags & LINE_START_EMPTY)
5885
if (empty_flags & ESCAPED_EMPTY)
5892
/**************************************************************************
5893
Rows_log_event member functions
5894
**************************************************************************/
5896
#ifndef MYSQL_CLIENT
5897
Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
5898
MY_BITMAP const *cols, bool is_transactional)
5899
: Log_event(thd_arg, 0, is_transactional),
5903
m_width(tbl_arg ? tbl_arg->s->fields : 1),
5904
m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
5905
#ifdef HAVE_REPLICATION
5906
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
5910
We allow a special form of dummy event when the table, and cols
5911
are null and the table id is ~0UL. This is a temporary
5912
solution, to be able to terminate a started statement in the
5913
binary log: the extraneous events will be removed in the future.
5915
assert((tbl_arg && tbl_arg->s && tid != ~0UL) || (!tbl_arg && !cols && tid == ~0UL));
5917
if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
5918
set_flags(NO_FOREIGN_KEY_CHECKS_F);
5919
if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
5920
set_flags(RELAXED_UNIQUE_CHECKS_F);
5921
/* if bitmap_init fails, caught in is_valid() */
5922
if (likely(!bitmap_init(&m_cols,
5923
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
5927
/* Cols can be zero if this is a dummy binrows event */
5928
if (likely(cols != NULL))
5930
memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
5931
create_last_word_mask(&m_cols);
5936
// Needed because bitmap_init() does not set it to null on failure
5942
Rows_log_event::Rows_log_event(const char *buf, uint event_len,
5943
Log_event_type event_type,
5944
const Format_description_log_event
5946
: Log_event(buf, description_event),
5948
#ifndef MYSQL_CLIENT
5951
m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
5952
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
5953
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
5956
uint8 const common_header_len= description_event->common_header_len;
5957
uint8 const post_header_len= description_event->post_header_len[event_type-1];
5959
const char *post_start= buf + common_header_len;
5960
post_start+= RW_MAPID_OFFSET;
5961
if (post_header_len == 6)
5963
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
5964
m_table_id= uint4korr(post_start);
5969
m_table_id= (ulong) uint6korr(post_start);
5970
post_start+= RW_FLAGS_OFFSET;
5973
m_flags= uint2korr(post_start);
5975
uchar const *const var_start=
5976
(const uchar *)buf + common_header_len + post_header_len;
5977
uchar const *const ptr_width= var_start;
5978
uchar *ptr_after_width= (uchar*) ptr_width;
5979
m_width = net_field_length(&ptr_after_width);
5980
/* if bitmap_init fails, catched in is_valid() */
5981
if (likely(!bitmap_init(&m_cols,
5982
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
5986
memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
5987
create_last_word_mask(&m_cols);
5988
ptr_after_width+= (m_width + 7) / 8;
5992
// Needed because bitmap_init() does not set it to null on failure
5993
m_cols.bitmap= NULL;
5997
m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
5999
if (event_type == UPDATE_ROWS_EVENT)
6001
/* if bitmap_init fails, caught in is_valid() */
6002
if (likely(!bitmap_init(&m_cols_ai,
6003
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
6007
memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
6008
create_last_word_mask(&m_cols_ai);
6009
ptr_after_width+= (m_width + 7) / 8;
6013
// Needed because bitmap_init() does not set it to null on failure
6014
m_cols_ai.bitmap= 0;
6019
const uchar* const ptr_rows_data= (const uchar*) ptr_after_width;
6021
size_t const data_size= event_len - (ptr_rows_data - (const uchar *) buf);
6023
m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME));
6024
if (likely((bool)m_rows_buf))
6026
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
6027
m_curr_row= m_rows_buf;
6029
m_rows_end= m_rows_buf + data_size;
6030
m_rows_cur= m_rows_end;
6031
memcpy(m_rows_buf, ptr_rows_data, data_size);
6034
m_cols.bitmap= 0; // to not free it
6039
Rows_log_event::~Rows_log_event()
6041
if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
6042
m_cols.bitmap= 0; // so no my_free in bitmap_free
6043
bitmap_free(&m_cols); // To pair with bitmap_init().
6044
my_free((uchar*)m_rows_buf, MYF(MY_ALLOW_ZERO_PTR));
6047
int Rows_log_event::get_data_size()
6049
int const type_code= get_type_code();
6051
uchar buf[sizeof(m_width)+1];
6052
uchar *end= net_store_length(buf, (m_width + 7) / 8);
6054
int data_size= ROWS_HEADER_LEN;
6055
data_size+= no_bytes_in_map(&m_cols);
6056
data_size+= end - buf;
6058
if (type_code == UPDATE_ROWS_EVENT)
6059
data_size+= no_bytes_in_map(&m_cols_ai);
6061
data_size+= (m_rows_cur - m_rows_buf);
6066
#ifndef MYSQL_CLIENT
6067
int Rows_log_event::do_add_row_data(uchar *row_data, size_t length)
6070
When the table has a primary key, we would probably want, by default, to
6071
log only the primary key value instead of the entire "before image". This
6072
would save binlog space. TODO
6076
If length is zero, there is nothing to write, so we just
6077
return. Note that this is not an optimization, since calling
6078
realloc() with size 0 means free().
6086
assert(m_rows_buf <= m_rows_cur);
6087
assert(!m_rows_buf || (m_rows_end && m_rows_buf <= m_rows_end));
6088
assert(m_rows_cur <= m_rows_end);
6090
/* The cast will always work since m_rows_cur <= m_rows_end */
6091
if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
6093
size_t const block_size= 1024;
6094
my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
6095
my_ptrdiff_t const new_alloc=
6096
block_size * ((cur_size + length + block_size - 1) / block_size);
6098
uchar* const new_buf= (uchar*)my_realloc((uchar*)m_rows_buf, (uint) new_alloc,
6099
MYF(MY_ALLOW_ZERO_PTR|MY_WME));
6100
if (unlikely(!new_buf))
6101
return(HA_ERR_OUT_OF_MEM);
6103
/* If the memory moved, we need to move the pointers */
6104
if (new_buf != m_rows_buf)
6106
m_rows_buf= new_buf;
6107
m_rows_cur= m_rows_buf + cur_size;
6111
The end pointer should always be changed to point to the end of
6112
the allocated memory.
6114
m_rows_end= m_rows_buf + new_alloc;
6117
assert(m_rows_cur + length <= m_rows_end);
6118
memcpy(m_rows_cur, row_data, length);
6119
m_rows_cur+= length;
6125
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
6126
int Rows_log_event::do_apply_event(Relay_log_info const *rli)
6130
If m_table_id == ~0UL, then we have a dummy event that does not
6131
contain any data. In that case, we just remove all tables in the
6132
tables_to_lock list, close the thread tables, and return with
6135
if (m_table_id == ~0UL)
6138
This one is supposed to be set: just an extra check so that
6139
nothing strange has happened.
6141
assert(get_flags(STMT_END_F));
6143
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
6144
close_thread_tables(thd);
6150
'thd' has been set by exec_relay_log_event(), just before calling
6151
do_apply_event(). We still check here to prevent future coding
6154
assert(rli->sql_thd == thd);
6157
If there is no locks taken, this is the first binrow event seen
6158
after the table map events. We should then lock all the tables
6159
used in the transaction and proceed with execution of the actual
6164
bool need_reopen= 1; /* To execute the first lap of the loop below */
6167
lock_tables() reads the contents of thd->lex, so they must be
6168
initialized. Contrary to in
6169
Table_map_log_event::do_apply_event() we don't call
6170
mysql_init_query() as that may reset the binlog format.
6175
There are a few flags that are replicated with each row event.
6176
Make sure to set/clear them before executing the main body of
6179
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
6180
thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
6182
thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
6184
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
6185
thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
6187
thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
6188
/* A small test to verify that objects have consistent types */
6189
assert(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
6192
while ((error= lock_tables(thd, rli->tables_to_lock,
6193
rli->tables_to_lock_count, &need_reopen)))
6197
if (thd->is_slave_error || thd->is_fatal_error)
6200
Error reporting borrowed from Query_log_event with many excessive
6201
simplifications (we don't honour --slave-skip-errors)
6203
uint actual_error= thd->main_da.sql_errno();
6204
rli->report(ERROR_LEVEL, actual_error,
6205
"Error '%s' in %s event: when locking tables",
6206
(actual_error ? thd->main_da.message():
6207
"unexpected success or fatal error"),
6209
thd->is_fatal_error= 1;
6213
rli->report(ERROR_LEVEL, error,
6214
"Error in %s event: when locking tables",
6217
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
6222
So we need to reopen the tables.
6224
We need to flush the pending RBR event, since it keeps a
6225
pointer to an open table.
6227
ALTERNATIVE SOLUTION (not implemented): Extract a pointer to
6228
the pending RBR event and reset the table pointer after the
6229
tables has been reopened.
6231
NOTE: For this new scheme there should be no pending event:
6232
need to add code to assert that is the case.
6234
thd->binlog_flush_pending_rows_event(false);
6235
TABLE_LIST *tables= rli->tables_to_lock;
6236
close_tables_for_reopen(thd, &tables);
6238
uint tables_count= rli->tables_to_lock_count;
6239
if ((error= open_tables(thd, &tables, &tables_count, 0)))
6241
if (thd->is_slave_error || thd->is_fatal_error)
6244
Error reporting borrowed from Query_log_event with many excessive
6245
simplifications (we don't honour --slave-skip-errors)
6247
uint actual_error= thd->main_da.sql_errno();
6248
rli->report(ERROR_LEVEL, actual_error,
6249
"Error '%s' on reopening tables",
6250
(actual_error ? thd->main_da.message() :
6251
"unexpected success or fatal error"));
6252
thd->is_slave_error= 1;
6254
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
6260
When the open and locking succeeded, we check all tables to
6261
ensure that they still have the correct type.
6263
We can use a down cast here since we know that every table added
6264
to the tables_to_lock is a RPL_TABLE_LIST.
6268
RPL_TABLE_LIST *ptr= rli->tables_to_lock;
6269
for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
6271
if (ptr->m_tabledef.compatible_with(rli, ptr->table))
6273
mysql_unlock_tables(thd, thd->lock);
6275
thd->is_slave_error= 1;
6276
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
6277
return(ERR_BAD_TABLE_DEF);
6283
... and then we add all the tables to the table map and remove
6284
them from tables to lock.
6286
We also invalidate the query cache for all the tables, since
6287
they will now be changed.
6289
TODO [/Matz]: Maybe the query cache should not be invalidated
6290
here? It might be that a table is not changed, even though it
6291
was locked for the statement. We do know that each
6292
Rows_log_event contain at least one row, so after processing one
6293
Rows_log_event, we can invalidate the query cache for the
6296
for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
6298
const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
6304
m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
6309
table == NULL means that this table should not be replicated
6310
(this was set up by Table_map_log_event::do_apply_event()
6311
which tested replicate-* rules).
6315
It's not needed to set_time() but
6316
1) it continues the property that "Time" in SHOW PROCESSLIST shows how
6317
much slave is behind
6318
2) it will be needed when we allow replication from a table with no
6319
TIMESTAMP column to a table with one.
6320
So we call set_time(), like in SBR. Presently it changes nothing.
6322
thd->set_time((time_t)when);
6324
There are a few flags that are replicated with each row event.
6325
Make sure to set/clear them before executing the main body of
6328
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
6329
thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
6331
thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
6333
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
6334
thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
6336
thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
6338
if (slave_allow_batching)
6339
thd->options|= OPTION_ALLOW_BATCH;
6341
thd->options&= ~OPTION_ALLOW_BATCH;
6343
/* A small test to verify that objects have consistent types */
6344
assert(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
6347
Now we are in a statement and will stay in a statement until we
6350
We set this flag here, before actually applying any rows, in
6351
case the SQL thread is stopped and we need to detect that we're
6352
inside a statement and halting abruptly might cause problems
6355
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
6357
if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
6358
set_flags(COMPLETE_ROWS_F);
6361
Set tables write and read sets.
6363
Read_set contains all slave columns (in case we are going to fetch
6364
a complete record from slave)
6366
Write_set equals the m_cols bitmap sent from master but it can be
6367
longer if slave has extra columns.
6370
bitmap_set_all(table->read_set);
6371
bitmap_set_all(table->write_set);
6372
if (!get_flags(COMPLETE_ROWS_F))
6373
bitmap_intersect(table->write_set,&m_cols);
6375
this->slave_exec_mode= slave_exec_mode_options; // fix the mode
6377
// Do event specific preparations
6378
error= do_before_row_operations(rli);
6380
// row processing loop
6382
while (error == 0 && m_curr_row < m_rows_end)
6384
/* in_use can have been set to NULL in close_tables_for_reopen */
6385
THD* old_thd= table->in_use;
6389
error= do_exec_row(rli);
6391
table->in_use = old_thd;
6397
The following list of "idempotent" errors
6398
means that an error from the list might happen
6399
because of idempotent (more than once)
6400
applying of a binlog file.
6401
Notice, that binlog has a ddl operation its
6402
second applying may cause
6404
case HA_ERR_TABLE_DEF_CHANGED:
6405
case HA_ERR_CANNOT_ADD_FOREIGN:
6407
which are not included into to the list.
6409
case HA_ERR_RECORD_CHANGED:
6410
case HA_ERR_RECORD_DELETED:
6411
case HA_ERR_KEY_NOT_FOUND:
6412
case HA_ERR_END_OF_FILE:
6413
case HA_ERR_FOUND_DUPP_KEY:
6414
case HA_ERR_FOUND_DUPP_UNIQUE:
6415
case HA_ERR_FOREIGN_DUPLICATE_KEY:
6416
case HA_ERR_NO_REFERENCED_ROW:
6417
case HA_ERR_ROW_IS_REFERENCED:
6418
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
6420
if (global_system_variables.log_warnings)
6421
slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
6423
RPL_LOG_NAME, (ulong) log_pos);
6429
thd->is_slave_error= 1;
6434
If m_curr_row_end was not set during event execution (e.g., because
6435
of errors) we can't proceed to the next row. If the error is transient
6436
(i.e., error==0 at this point) we must call unpack_current_row() to set
6439
if (!m_curr_row_end && !error)
6440
unpack_current_row(rli, &m_cols);
6442
// at this moment m_curr_row_end should be set
6443
assert(error || m_curr_row_end != NULL);
6444
assert(error || m_curr_row < m_curr_row_end);
6445
assert(error || m_curr_row_end <= m_rows_end);
6447
m_curr_row= m_curr_row_end;
6449
} // row processing loop
6451
error= do_after_row_operations(rli, error);
6454
thd->options|= OPTION_KEEP_LOG;
6459
We need to delay this clear until here bacause unpack_current_row() uses
6460
master-side table definitions stored in rli.
6462
if (rli->tables_to_lock && get_flags(STMT_END_F))
6463
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
6464
/* reset OPTION_ALLOW_BATCH as not affect later events */
6465
thd->options&= ~OPTION_ALLOW_BATCH;
6468
{ /* error has occured during the transaction */
6469
slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
6470
get_type_str(), RPL_LOG_NAME, (ulong) log_pos);
6475
If one day we honour --skip-slave-errors in row-based replication, and
6476
the error should be skipped, then we would clear mappings, rollback,
6477
close tables, but the slave SQL thread would not stop and then may
6478
assume the mapping is still available, the tables are still open...
6479
So then we should clear mappings/rollback/close here only if this is a
6481
For now we code, knowing that error is not skippable and so slave SQL
6482
thread is certainly going to stop.
6483
rollback at the caller along with sbr.
6485
thd->reset_current_stmt_binlog_row_based();
6486
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
6487
thd->is_slave_error= 1;
6492
This code would ideally be placed in do_update_pos() instead, but
6493
since we have no access to table there, we do the setting of
6494
last_event_start_time here instead.
6496
if (table && (table->s->primary_key == MAX_KEY) &&
6497
!cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS)
6500
------------ Temporary fix until WL#2975 is implemented ---------
6502
This event is not the last one (no STMT_END_F). If we stop now
6503
(in case of terminate_slave_thread()), how will we restart? We
6504
have to restart from Table_map_log_event, but as this table is
6505
not transactional, the rows already inserted will still be
6506
present, and idempotency is not guaranteed (no PK) so we risk
6507
that repeating leads to double insert. So we desperately try to
6508
continue, hope we'll eventually leave this buggy situation (by
6509
executing the final Rows_log_event). If we are in a hopeless
6510
wait (reached end of last relay log and nothing gets appended
6511
there), we timeout after one minute, and notify DBA about the
6512
problem. When WL#2975 is implemented, just remove the member
6513
Relay_log_info::last_event_start_time and all its occurrences.
6515
const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
6521
Log_event::enum_skip_reason
6522
Rows_log_event::do_shall_skip(Relay_log_info *rli)
6525
If the slave skip counter is 1 and this event does not end a
6526
statement, then we should not start executing on the next event.
6527
Otherwise, we defer the decision to the normal skipping logic.
6529
if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
6530
return Log_event::EVENT_SKIP_IGNORE;
6532
return Log_event::do_shall_skip(rli);
6536
Rows_log_event::do_update_pos(Relay_log_info *rli)
6540
if (get_flags(STMT_END_F))
6543
This is the end of a statement or transaction, so close (and
6544
unlock) the tables we opened when processing the
6545
Table_map_log_event starting the statement.
6547
OBSERVER. This will clear *all* mappings, not only those that
6548
are open for the table. There is not good handle for on-close
6551
NOTE. Even if we have no table ('table' == 0) we still need to be
6552
here, so that we increase the group relay log position. If we didn't, we
6553
could have a group relay log position which lags behind "forever"
6554
(assume the last master's transaction is ignored by the slave because of
6555
replicate-ignore rules).
6557
thd->binlog_flush_pending_rows_event(true);
6560
If this event is not in a transaction, the call below will, if some
6561
transactional storage engines are involved, commit the statement into
6562
them and flush the pending event to binlog.
6563
If this event is in a transaction, the call will do nothing, but a
6564
Xid_log_event will come next which will, if some transactional engines
6565
are involved, commit the transaction and flush the pending event to the
6568
error= ha_autocommit_or_rollback(thd, 0);
6571
Now what if this is not a transactional engine? we still need to
6572
flush the pending event to the binlog; we did it with
6573
thd->binlog_flush_pending_rows_event(). Note that we imitate
6574
what is done for real queries: a call to
6575
ha_autocommit_or_rollback() (sometimes only if involves a
6576
transactional engine), and a call to be sure to have the pending
6580
thd->reset_current_stmt_binlog_row_based();
6582
rli->cleanup_context(thd, 0);
6586
Indicate that a statement is finished.
6587
Step the group log position if we are not in a transaction,
6588
otherwise increase the event log position.
6590
rli->stmt_done(log_pos, when);
6593
Clear any errors pushed in thd->net.last_err* if for example "no key
6594
found" (as this is allowed). This is a safety measure; apparently
6595
those errors (e.g. when executing a Delete_rows_log_event of a
6596
non-existing row, like in rpl_row_mystery22.test,
6597
thd->net.last_error = "Can't find record in 't1'" and last_errno=1032)
6598
do not become visible. We still prefer to wipe them out.
6603
rli->report(ERROR_LEVEL, error,
6604
"Error in %s event: commit of row events failed, "
6606
get_type_str(), m_table->s->db.str,
6607
m_table->s->table_name.str);
6611
rli->inc_event_relay_log_pos();
6617
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
6619
#ifndef MYSQL_CLIENT
6620
bool Rows_log_event::write_data_header(IO_CACHE *file)
6622
uchar buf[ROWS_HEADER_LEN]; // No need to init the buffer
6623
assert(m_table_id != ~0UL);
6624
int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
6625
int2store(buf + RW_FLAGS_OFFSET, m_flags);
6626
return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
6629
bool Rows_log_event::write_data_body(IO_CACHE*file)
6632
Note that this should be the number of *bits*, not the number of
6635
uchar sbuf[sizeof(m_width)];
6636
my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
6638
uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
6639
assert(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
6641
res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
6643
res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
6644
no_bytes_in_map(&m_cols));
6646
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
6648
if (get_type_code() == UPDATE_ROWS_EVENT)
6650
res= res || my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
6651
no_bytes_in_map(&m_cols_ai));
6653
res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
6660
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
6661
void Rows_log_event::pack_info(Protocol *protocol)
6664
char const *const flagstr=
6665
get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
6666
size_t bytes= snprintf(buf, sizeof(buf),
6667
"table_id: %lu%s", m_table_id, flagstr);
6668
protocol->store(buf, bytes, &my_charset_bin);
6673
void Rows_log_event::print_helper(FILE *file,
6674
PRINT_EVENT_INFO *print_event_info,
6675
char const *const name)
6677
IO_CACHE *const head= &print_event_info->head_cache;
6678
IO_CACHE *const body= &print_event_info->body_cache;
6679
if (!print_event_info->short_form)
6681
bool const last_stmt_event= get_flags(STMT_END_F);
6682
print_header(head, print_event_info, !last_stmt_event);
6683
my_b_printf(head, "\t%s: table id %lu%s\n",
6685
last_stmt_event ? " flags: STMT_END_F" : "");
6686
print_base64(body, print_event_info, !last_stmt_event);
6689
if (get_flags(STMT_END_F))
6691
copy_event_cache_to_file_and_reinit(head, file);
6692
copy_event_cache_to_file_and_reinit(body, file);
6697
/**************************************************************************
6698
Table_map_log_event member functions and support functions
6699
**************************************************************************/
6702
@page How replication of field metadata works.
6704
When a table map is created, the master first calls
6705
Table_map_log_event::save_field_metadata() which calculates how many
6706
values will be in the field metadata. Only those fields that require the
6707
extra data are added. The method also loops through all of the fields in
6708
the table calling the method Field::save_field_metadata() which returns the
6709
values for the field that will be saved in the metadata and replicated to
6710
the slave. Once all fields have been processed, the table map is written to
6711
the binlog adding the size of the field metadata and the field metadata to
6712
the end of the body of the table map.
6714
When a table map is read on the slave, the field metadata is read from the
6715
table map and passed to the table_def class constructor which saves the
6716
field metadata from the table map into an array based on the type of the
6717
field. Field metadata values not present (those fields that do not use extra
6718
data) in the table map are initialized as zero (0). The array size is the
6719
same as the columns for the table on the slave.
6721
Additionally, values saved for field metadata on the master are saved as a
6722
string of bytes (uchar) in the binlog. A field may require 1 or more bytes
6723
to store the information. In cases where values require multiple bytes
6724
(e.g. values > 255), the endian-safe methods are used to properly encode
6725
the values on the master and decode them on the slave. When the field
6726
metadata values are captured on the slave, they are stored in an array of
6727
type uint16. This allows the least number of casts to prevent casting bugs
6728
when the field metadata is used in comparisons of field attributes. When
6729
the field metadata is used for calculating addresses in pointer math, the
6730
type used is uint32.
6733
#if !defined(MYSQL_CLIENT)
6735
Save the field metadata based on the real_type of the field.
6736
The metadata saved depends on the type of the field. Some fields
6737
store a single byte for pack_length() while others store two bytes
6738
for field_length (max length).
6743
We may want to consider changing the encoding of the information.
6744
Currently, the code attempts to minimize the number of bytes written to
6745
the tablemap. There are at least two other alternatives; 1) using
6746
net_store_length() to store the data allowing it to choose the number of
6747
bytes that are appropriate thereby making the code much easier to
6748
maintain (only 1 place to change the encoding), or 2) use a fixed number
6749
of bytes for each field. The problem with option 1 is that net_store_length()
6750
will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
6751
for fields like CHAR which can be no larger than 255 characters, the method
6752
will use 3 bytes when the value is > 250. Further, every value that is
6753
encoded using 2 parts (e.g., pack_length, field_length) will be numerically
6754
> 250 therefore will use 3 bytes for eah value. The problem with option 2
6755
is less wasteful for space but does waste 1 byte for every field that does
6758
int Table_map_log_event::save_field_metadata()
6761
for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
6762
index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
6765
#endif /* !defined(MYSQL_CLIENT) */
6768
Constructor used to build an event for writing to the binary log.
6769
Mats says tbl->s lives longer than this event so it's ok to copy pointers
6770
(tbl->s->db etc) and not pointer content.
6772
#if !defined(MYSQL_CLIENT)
6773
Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
6774
bool is_transactional __attribute__((__unused__)),
6776
: Log_event(thd, 0, true),
6778
m_dbnam(tbl->s->db.str),
6779
m_dblen(m_dbnam ? tbl->s->db.length : 0),
6780
m_tblnam(tbl->s->table_name.str),
6781
m_tbllen(tbl->s->table_name.length),
6782
m_colcnt(tbl->s->fields),
6787
m_field_metadata(0),
6788
m_field_metadata_size(0),
6792
assert(m_table_id != ~0UL);
6794
In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
6795
table.cc / alloc_table_share():
6796
Use the fact the key is db/0/table_name/0
6797
As we rely on this let's assert it.
6799
assert((tbl->s->db.str == 0) ||
6800
(tbl->s->db.str[tbl->s->db.length] == 0));
6801
assert(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
6804
m_data_size= TABLE_MAP_HEADER_LEN;
6805
m_data_size+= m_dblen + 2; // Include length and terminating \0
6806
m_data_size+= m_tbllen + 2; // Include length and terminating \0
6807
m_data_size+= 1 + m_colcnt; // COLCNT and column types
6809
/* If malloc fails, caught in is_valid() */
6810
if ((m_memory= (uchar*) my_malloc(m_colcnt, MYF(MY_WME))))
6812
m_coltype= reinterpret_cast<uchar*>(m_memory);
6813
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
6814
m_coltype[i]= m_table->field[i]->type();
6818
Calculate a bitmap for the results of maybe_null() for all columns.
6819
The bitmap is used to determine when there is a column from the master
6820
that is not on the slave and is null and thus not in the row data during
6823
uint num_null_bytes= (m_table->s->fields + 7) / 8;
6824
m_data_size+= num_null_bytes;
6825
m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
6826
&m_null_bits, num_null_bytes,
6827
&m_field_metadata, (m_colcnt * 2),
6830
bzero(m_field_metadata, (m_colcnt * 2));
6833
Create an array for the field metadata and store it.
6835
m_field_metadata_size= save_field_metadata();
6836
assert(m_field_metadata_size <= (m_colcnt * 2));
6839
Now set the size of the data to the size of the field metadata array
6840
plus one or two bytes for number of elements in the field metadata array.
6842
if (m_field_metadata_size > 255)
6843
m_data_size+= m_field_metadata_size + 2;
6845
m_data_size+= m_field_metadata_size + 1;
6847
bzero(m_null_bits, num_null_bytes);
6848
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
6849
if (m_table->field[i]->maybe_null())
6850
m_null_bits[(i / 8)]+= 1 << (i % 8);
6853
#endif /* !defined(MYSQL_CLIENT) */
6856
Constructor used by slave to read the event from the binary log.
6858
#if defined(HAVE_REPLICATION)
6859
Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
6860
const Format_description_log_event
6863
: Log_event(buf, description_event),
6864
#ifndef MYSQL_CLIENT
6867
m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
6868
m_colcnt(0), m_coltype(0),
6869
m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
6870
m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
6871
m_null_bits(0), m_meta_memory(NULL)
6873
unsigned int bytes_read= 0;
6875
uint8 common_header_len= description_event->common_header_len;
6876
uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
6878
/* Read the post-header */
6879
const char *post_start= buf + common_header_len;
6881
post_start+= TM_MAPID_OFFSET;
6882
if (post_header_len == 6)
6884
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
6885
m_table_id= uint4korr(post_start);
6890
assert(post_header_len == TABLE_MAP_HEADER_LEN);
6891
m_table_id= (ulong) uint6korr(post_start);
6892
post_start+= TM_FLAGS_OFFSET;
6895
assert(m_table_id != ~0UL);
6897
m_flags= uint2korr(post_start);
6899
/* Read the variable part of the event */
6900
const char *const vpart= buf + common_header_len + post_header_len;
6902
/* Extract the length of the various parts from the buffer */
6903
uchar const *const ptr_dblen= (uchar const*)vpart + 0;
6904
m_dblen= *(uchar*) ptr_dblen;
6906
/* Length of database name + counter + terminating null */
6907
uchar const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
6908
m_tbllen= *(uchar*) ptr_tbllen;
6910
/* Length of table name + counter + terminating null */
6911
uchar const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
6912
uchar *ptr_after_colcnt= (uchar*) ptr_colcnt;
6913
m_colcnt= net_field_length(&ptr_after_colcnt);
6915
/* Allocate mem for all fields in one go. If fails, caught in is_valid() */
6916
m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
6917
&m_dbnam, (uint) m_dblen + 1,
6918
&m_tblnam, (uint) m_tbllen + 1,
6919
&m_coltype, (uint) m_colcnt,
6924
/* Copy the different parts into their memory */
6925
strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
6926
strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
6927
memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
6929
ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
6930
bytes_read= ptr_after_colcnt - (uchar *)buf;
6931
if (bytes_read < event_len)
6933
m_field_metadata_size= net_field_length(&ptr_after_colcnt);
6934
assert(m_field_metadata_size <= (m_colcnt * 2));
6935
uint num_null_bytes= (m_colcnt + 7) / 8;
6936
m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
6937
&m_null_bits, num_null_bytes,
6938
&m_field_metadata, m_field_metadata_size,
6940
memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
6941
ptr_after_colcnt= (uchar*)ptr_after_colcnt + m_field_metadata_size;
6942
memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
6950
Table_map_log_event::~Table_map_log_event()
6952
my_free(m_meta_memory, MYF(MY_ALLOW_ZERO_PTR));
6953
my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR));
6957
Return value is an error code, one of:
6959
-1 Failure to open table [from open_tables()]
6961
1 No room for more tables [from set_table()]
6962
2 Out of memory [from set_table()]
6963
3 Wrong table definition
6964
4 Daisy-chaining RBR with SBR not possible
6967
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
6968
int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
6970
RPL_TABLE_LIST *table_list;
6971
char *db_mem, *tname_mem;
6974
assert(rli->sql_thd == thd);
6976
/* Step the query id to mark what columns that are actually used. */
6977
pthread_mutex_lock(&LOCK_thread_count);
6978
thd->query_id= next_query_id();
6979
pthread_mutex_unlock(&LOCK_thread_count);
6981
if (!(memory= my_multi_malloc(MYF(MY_WME),
6982
&table_list, (uint) sizeof(RPL_TABLE_LIST),
6983
&db_mem, (uint) NAME_LEN + 1,
6984
&tname_mem, (uint) NAME_LEN + 1,
6986
return(HA_ERR_OUT_OF_MEM);
6988
bzero(table_list, sizeof(*table_list));
6989
table_list->db = db_mem;
6990
table_list->alias= table_list->table_name = tname_mem;
6991
table_list->lock_type= TL_WRITE;
6992
table_list->next_global= table_list->next_local= 0;
6993
table_list->table_id= m_table_id;
6994
table_list->updating= 1;
6995
strmov(table_list->db, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
6996
strmov(table_list->table_name, m_tblnam);
7000
if (!rpl_filter->db_ok(table_list->db) ||
7001
(rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list)))
7003
my_free(memory, MYF(MY_WME));
7008
open_tables() reads the contents of thd->lex, so they must be
7009
initialized, so we should call lex_start(); to be even safer, we
7010
call mysql_init_query() which does a more complete set of inits.
7013
mysql_reset_thd_for_next_command(thd);
7015
Check if the slave is set to use SBR. If so, it should switch
7016
to using RBR until the end of the "statement", i.e., next
7017
STMT_END_F or next error.
7019
if (!thd->current_stmt_binlog_row_based &&
7020
mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG))
7022
thd->set_current_stmt_binlog_row_based();
7026
Open the table if it is not already open and add the table to
7027
table map. Note that for any table that should not be
7028
replicated, a filter is needed.
7030
The creation of a new TABLE_LIST is used to up-cast the
7031
table_list consisting of RPL_TABLE_LIST items. This will work
7032
since the only case where the argument to open_tables() is
7033
changed, is when thd->lex->query_tables == table_list, i.e.,
7034
when the statement requires prelocking. Since this is not
7035
executed when a statement is executed, this case will not occur.
7036
As a precaution, an assertion is added to ensure that the bad
7039
Either way, the memory in the list is *never* released
7040
internally in the open_tables() function, hence we take a copy
7041
of the pointer to make sure that it's not lost.
7044
assert(thd->lex->query_tables != table_list);
7045
TABLE_LIST *tmp_table_list= table_list;
7046
if ((error= open_tables(thd, &tmp_table_list, &count, 0)))
7048
if (thd->is_slave_error || thd->is_fatal_error)
7051
Error reporting borrowed from Query_log_event with many excessive
7052
simplifications (we don't honour --slave-skip-errors)
7054
uint actual_error= thd->main_da.sql_errno();
7055
rli->report(ERROR_LEVEL, actual_error,
7056
"Error '%s' on opening table `%s`.`%s`",
7057
(actual_error ? thd->main_da.message() :
7058
"unexpected success or fatal error"),
7059
table_list->db, table_list->table_name);
7060
thd->is_slave_error= 1;
7065
m_table= table_list->table;
7068
This will fail later otherwise, the 'in_use' field should be
7069
set to the current thread.
7071
assert(m_table->in_use);
7074
Use placement new to construct the table_def instance in the
7075
memory allocated for it inside table_list.
7077
The memory allocated by the table_def structure (i.e., not the
7078
memory allocated *for* the table_def structure) is released
7079
inside Relay_log_info::clear_tables_to_lock() by calling the
7080
table_def destructor explicitly.
7082
new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt,
7083
m_field_metadata, m_field_metadata_size, m_null_bits);
7084
table_list->m_tabledef_valid= true;
7087
We record in the slave's information that the table should be
7088
locked by linking the table into the list of tables to lock.
7090
table_list->next_global= table_list->next_local= rli->tables_to_lock;
7091
const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
7092
const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
7093
/* 'memory' is freed in clear_tables_to_lock */
7099
my_free(memory, MYF(MY_WME));
7103
Log_event::enum_skip_reason
7104
Table_map_log_event::do_shall_skip(Relay_log_info *rli)
7107
If the slave skip counter is 1, then we should not start executing
7110
return continue_group(rli);
7113
int Table_map_log_event::do_update_pos(Relay_log_info *rli)
7115
rli->inc_event_relay_log_pos();
7119
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
7121
#ifndef MYSQL_CLIENT
7122
bool Table_map_log_event::write_data_header(IO_CACHE *file)
7124
assert(m_table_id != ~0UL);
7125
uchar buf[TABLE_MAP_HEADER_LEN];
7126
int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
7127
int2store(buf + TM_FLAGS_OFFSET, m_flags);
7128
return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
7131
bool Table_map_log_event::write_data_body(IO_CACHE *file)
7133
assert(m_dbnam != NULL);
7134
assert(m_tblnam != NULL);
7135
/* We use only one byte per length for storage in event: */
7136
assert(m_dblen < 128);
7137
assert(m_tbllen < 128);
7139
uchar const dbuf[]= { (uchar) m_dblen };
7140
uchar const tbuf[]= { (uchar) m_tbllen };
7142
uchar cbuf[sizeof(m_colcnt)];
7143
uchar *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
7144
assert(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
7147
Store the size of the field metadata.
7149
uchar mbuf[sizeof(m_field_metadata_size)];
7150
uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
7152
return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
7153
my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) ||
7154
my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
7155
my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) ||
7156
my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
7157
my_b_safe_write(file, m_coltype, m_colcnt) ||
7158
my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
7159
my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
7160
my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
7164
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
7167
Print some useful information for the SHOW BINARY LOG information
7171
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
7172
void Table_map_log_event::pack_info(Protocol *protocol)
7175
size_t bytes= snprintf(buf, sizeof(buf),
7176
"table_id: %lu (%s.%s)",
7177
m_table_id, m_dbnam, m_tblnam);
7178
protocol->store(buf, bytes, &my_charset_bin);
7187
void Table_map_log_event::print(FILE * /* unused */,
7188
PRINT_EVENT_INFO *print_event_info)
7190
if (!print_event_info->short_form)
7192
print_header(&print_event_info->head_cache, print_event_info, true);
7193
my_b_printf(&print_event_info->head_cache,
7194
"\tTable_map: `%s`.`%s` mapped to number %lu\n",
7195
m_dbnam, m_tblnam, m_table_id);
7196
print_base64(&print_event_info->body_cache, print_event_info, true);
7201
/**************************************************************************
7202
Write_rows_log_event member functions
7203
**************************************************************************/
7206
Constructor used to build an event for writing to the binary log.
7208
#if !defined(MYSQL_CLIENT)
7209
Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
7211
bool is_transactional)
7212
: Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
7218
Constructor used by slave to read the event from the binary log.
7220
#ifdef HAVE_REPLICATION
7221
Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len,
7222
const Format_description_log_event
7224
: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
7229
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
7231
Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
7236
todo: to introduce a property for the event (handler?) which forces
7237
applying the event in the replace (idempotent) fashion.
7239
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
7242
We are using REPLACE semantics and not INSERT IGNORE semantics
7243
when writing rows, that is: new rows replace old rows. We need to
7244
inform the storage engine that it should use this behaviour.
7247
/* Tell the storage engine that we are using REPLACE semantics. */
7248
thd->lex->duplicates= DUP_REPLACE;
7251
Pretend we're executing a REPLACE command: this is needed for
7252
InnoDB since it is not (properly) checking the
7253
lex->duplicates flag.
7255
thd->lex->sql_command= SQLCOM_REPLACE;
7257
Do not raise the error flag in case of hitting to an unique attribute
7259
m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
7262
m_table->file->ha_start_bulk_insert(0);
7264
We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
7265
any TIMESTAMP column with data from the row but instead will use
7266
the event's current time.
7267
As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
7268
columns, we know that all TIMESTAMP columns on slave will receive explicit
7269
data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
7270
When we allow a table without TIMESTAMP to be replicated to a table having
7271
more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
7272
column to be replicated into a BIGINT column and the slave's table has a
7273
TIMESTAMP column, then the slave's TIMESTAMP column will take its value
7274
from set_time() which we called earlier (consistent with SBR). And then in
7275
some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
7276
analyze if explicit data is provided for slave's TIMESTAMP columns).
7278
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
7284
Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
7288
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
7290
m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
7291
m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
7293
resetting the extra with
7294
table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
7296
explanation: file->reset() performs this duty
7297
ultimately. Still todo: fix
7300
if ((local_error= m_table->file->ha_end_bulk_insert()))
7302
m_table->file->print_error(local_error, MYF(0));
7304
return error? error : local_error;
7307
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
7310
Check if there are more UNIQUE keys after the given key.
7313
last_uniq_key(TABLE *table, uint keyno)
7315
while (++keyno < table->s->keys)
7316
if (table->key_info[keyno].flags & HA_NOSAME)
7322
Check if an error is a duplicate key error.
7324
This function is used to check if an error code is one of the
7325
duplicate key error, i.e., and error code for which it is sensible
7326
to do a <code>get_dup_key()</code> to retrieve the duplicate key.
7328
@param errcode The error code to check.
7330
@return <code>true</code> if the error code is such that
7331
<code>get_dup_key()</code> will return true, <code>false</code>
7335
is_duplicate_key_error(int errcode)
7339
case HA_ERR_FOUND_DUPP_KEY:
7340
case HA_ERR_FOUND_DUPP_UNIQUE:
7347
Write the current row into event's table.
7349
The row is located in the row buffer, pointed by @c m_curr_row member.
7350
Number of columns of the row is stored in @c m_width member (it can be
7351
different from the number of columns in the table to which we insert).
7352
Bitmap @c m_cols indicates which columns are present in the row. It is assumed
7353
that event's table is already open and pointed by @c m_table.
7355
If the same record already exists in the table it can be either overwritten
7356
or an error is reported depending on the value of @c overwrite flag
7357
(error reporting not yet implemented). Note that the matching record can be
7358
different from the row we insert if we use primary keys to identify records in
7361
The row to be inserted can contain values only for selected columns. The
7362
missing columns are filled with default values using @c prepare_record()
7363
function. If a matching record is found in the table and @c overwritte is
7364
true, the missing columns are taken from it.
7366
@param rli Relay log info (needed for row unpacking).
7368
Shall we overwrite if the row already exists or signal
7369
error (currently ignored).
7371
@returns Error code on failure, 0 on success.
7373
This method, if successful, sets @c m_curr_row_end pointer to point at the
7374
next row in the rows buffer. This is done when unpacking the row to be
7377
@note If a matching record is found, it is either updated using
7378
@c ha_update_row() or first deleted and then new record written.
7382
Rows_log_event::write_row(const Relay_log_info *const rli,
7383
const bool overwrite)
7385
assert(m_table != NULL && thd != NULL);
7387
TABLE *table= m_table; // pointer to event's table
7390
auto_afree_ptr<char> key(NULL);
7392
/* fill table->record[0] with default values */
7395
We only check if the columns have default values for non-NDB
7396
engines, for NDB we ignore the check since updates are sent as
7397
writes, causing errors when trying to prepare the record.
7399
TODO[ndb]: Elimiate this hard-coded dependency on NDB. Ideally,
7400
the engine should be able to set a flag that it want the default
7401
values filled in and one flag to handle the case that the default
7402
values should be checked. Maybe these two flags can be combined.
7404
if ((error= prepare_record(table, &m_cols, m_width,
7405
table->file->ht->db_type != DB_TYPE_NDBCLUSTER)))
7408
/* unpack row into table->record[0] */
7409
error= unpack_current_row(rli, &m_cols);
7411
// Temporary fix to find out why it fails [/Matz]
7412
memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
7415
Try to write record. If a corresponding record already exists in the table,
7416
we try to change it using ha_update_row() if possible. Otherwise we delete
7417
it and repeat the whole process again.
7419
TODO: Add safety measures against infinite looping.
7422
while ((error= table->file->ha_write_row(table->record[0])))
7424
if (error == HA_ERR_LOCK_DEADLOCK ||
7425
error == HA_ERR_LOCK_WAIT_TIMEOUT ||
7426
(keynum= table->file->get_dup_key(error)) < 0 ||
7430
Deadlock, waiting for lock or just an error from the handler
7431
such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
7432
Retrieval of the duplicate key number may fail
7433
- either because the error was not "duplicate key" error
7434
- or because the information which key is not available
7436
table->file->print_error(error, MYF(0));
7440
We need to retrieve the old row into record[1] to be able to
7441
either update or delete the offending record. We either:
7443
- use rnd_pos() with a row-id (available as dupp_row) to the
7444
offending row, if that is possible (MyISAM and Blackhole), or else
7446
- use index_read_idx() with the key that is duplicated, to
7447
retrieve the offending row.
7449
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
7451
if (table->file->inited && (error= table->file->ha_index_end()))
7453
if ((error= table->file->ha_rnd_init(false)))
7456
error= table->file->rnd_pos(table->record[1], table->file->dup_ref);
7457
table->file->ha_rnd_end();
7460
table->file->print_error(error, MYF(0));
7466
if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
7471
if (key.get() == NULL)
7473
key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
7474
if (key.get() == NULL)
7480
key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
7482
error= table->file->index_read_idx_map(table->record[1], keynum,
7483
(const uchar*)key.get(),
7488
table->file->print_error(error, MYF(0));
7494
Now, record[1] should contain the offending row. That
7495
will enable us to update it or, alternatively, delete it (so
7496
that we can insert the new row afterwards).
7500
If row is incomplete we will use the record found to fill
7503
if (!get_flags(COMPLETE_ROWS_F))
7505
restore_record(table,record[1]);
7506
error= unpack_current_row(rli, &m_cols);
7510
REPLACE is defined as either INSERT or DELETE + INSERT. If
7511
possible, we can replace it with an UPDATE, but that will not
7512
work on InnoDB if FOREIGN KEY checks are necessary.
7514
I (Matz) am not sure of the reason for the last_uniq_key()
7515
check as, but I'm guessing that it's something along the
7518
Suppose that we got the duplicate key to be a key that is not
7519
the last unique key for the table and we perform an update:
7520
then there might be another key for which the unique check will
7521
fail, so we're better off just deleting the row and inserting
7524
if (last_uniq_key(table, keynum) &&
7525
!table->file->referenced_by_foreign_key())
7527
error=table->file->ha_update_row(table->record[1],
7531
case HA_ERR_RECORD_IS_THE_SAME:
7538
table->file->print_error(error, MYF(0));
7545
if ((error= table->file->ha_delete_row(table->record[1])))
7547
table->file->print_error(error, MYF(0));
7550
/* Will retry ha_write_row() with the offending row removed. */
7560
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
7562
assert(m_table != NULL);
7564
write_row(rli, /* if 1 then overwrite */
7565
bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
7567
if (error && !thd->is_error())
7570
my_error(ER_UNKNOWN_ERROR, MYF(0));
7576
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
7579
void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
7581
Rows_log_event::print_helper(file, print_event_info, "Write_rows");
7585
/**************************************************************************
7586
Delete_rows_log_event member functions
7587
**************************************************************************/
7589
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
7591
Compares table->record[0] and table->record[1]
7593
Returns TRUE if different.
7595
static bool record_compare(TABLE *table)
7598
Need to set the X bit and the filler bits in both records since
7599
there are engines that do not set it correctly.
7601
In addition, since MyISAM checks that one hasn't tampered with the
7602
record, it is necessary to restore the old bytes into the record
7603
after doing the comparison.
7605
TODO[record format ndb]: Remove it once NDB returns correct
7606
records. Check that the other engines also return correct records.
7609
uchar saved_x[2], saved_filler[2];
7611
if (table->s->null_bytes > 0)
7613
for (int i = 0 ; i < 2 ; ++i)
7615
saved_x[i]= table->record[i][0];
7616
saved_filler[i]= table->record[i][table->s->null_bytes - 1];
7617
table->record[i][0]|= 1U;
7618
table->record[i][table->s->null_bytes - 1]|=
7619
256U - (1U << table->s->last_null_bit_pos);
7623
if (table->s->blob_fields + table->s->varchar_fields == 0)
7625
result= cmp_record(table,record[1]);
7626
goto record_compare_exit;
7629
/* Compare null bits */
7630
if (memcmp(table->null_flags,
7631
table->null_flags+table->s->rec_buff_length,
7632
table->s->null_bytes))
7634
result= true; // Diff in NULL value
7635
goto record_compare_exit;
7638
/* Compare updated fields */
7639
for (Field **ptr=table->field ; *ptr ; ptr++)
7641
if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
7644
goto record_compare_exit;
7648
record_compare_exit:
7650
Restore the saved bytes.
7652
TODO[record format ndb]: Remove this code once NDB returns the
7653
correct record format.
7655
if (table->s->null_bytes > 0)
7657
for (int i = 0 ; i < 2 ; ++i)
7659
table->record[i][0]= saved_x[i];
7660
table->record[i][table->s->null_bytes - 1]= saved_filler[i];
7668
Locate the current row in event's table.
7670
The current row is pointed by @c m_curr_row. Member @c m_width tells how many
7671
columns are there in the row (this can be differnet from the number of columns
7672
in the table). It is assumed that event's table is already open and pointed
7675
If a corresponding record is found in the table it is stored in
7676
@c m_table->record[0]. Note that when record is located based on a primary
7677
key, it is possible that the record found differs from the row being located.
7679
If no key is specified or table does not have keys, a table scan is used to
7680
find the row. In that case the row should be complete and contain values for
7681
all columns. However, it can still be shorter than the table, i.e. the table
7682
can contain extra columns not present in the row. It is also possible that
7683
the table has fewer columns than the row being located.
7685
@returns Error code on failure, 0 on success.
7687
@post In case of success @c m_table->record[0] contains the record found.
7688
Also, the internal "cursor" of the table is positioned at the record found.
7690
@note If the engine allows random access of the records, a combination of
7691
@c position() and @c rnd_pos() will be used.
7694
int Rows_log_event::find_row(const Relay_log_info *rli)
7696
assert(m_table && m_table->in_use != NULL);
7698
TABLE *table= m_table;
7701
/* unpack row - missing fields get default values */
7702
prepare_record(table, &m_cols, m_width, false/* don't check errors */);
7703
error= unpack_current_row(rli, &m_cols);
7705
// Temporary fix to find out why it fails [/Matz]
7706
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
7708
if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
7709
table->s->primary_key < MAX_KEY)
7712
Use a more efficient method to fetch the record given by
7713
table->record[0] if the engine allows it. We first compute a
7714
row reference using the position() member function (it will be
7715
stored in table->file->ref) and the use rnd_pos() to position
7716
the "cursor" (i.e., record[0] in this case) at the correct row.
7718
TODO: Add a check that the correct record has been fetched by
7719
comparing with the original record. Take into account that the
7720
record on the master and slave can be of different
7721
length. Something along these lines should work:
7723
ADD>>> store_record(table,record[1]);
7724
int error= table->file->rnd_pos(table->record[0], table->file->ref);
7725
ADD>>> assert(memcmp(table->record[1], table->record[0],
7726
table->s->reclength) == 0);
7729
int error= table->file->rnd_pos_by_record(table->record[0]);
7730
table->file->ha_rnd_end();
7733
table->file->print_error(error, MYF(0));
7738
// We can't use position() - try other methods.
7741
Save copy of the record in table->record[1]. It might be needed
7742
later if linear search is used to find exact match.
7744
store_record(table,record[1]);
7746
if (table->s->keys > 0)
7748
/* We have a key: search the table using the index */
7749
if (!table->file->inited && (error= table->file->ha_index_init(0, false)))
7751
table->file->print_error(error, MYF(0));
7755
/* Fill key data for the row */
7758
key_copy(m_key, table->record[0], table->key_info, 0);
7761
We need to set the null bytes to ensure that the filler bit are
7762
all set when returning. There are storage engines that just set
7763
the necessary bits on the bytes and don't set the filler bits
7766
my_ptrdiff_t const pos=
7767
table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
7768
table->record[0][pos]= 0xFF;
7770
if ((error= table->file->index_read_map(table->record[0], m_key,
7772
HA_READ_KEY_EXACT)))
7774
table->file->print_error(error, MYF(0));
7775
table->file->ha_index_end();
7780
Below is a minor "optimization". If the key (i.e., key number
7781
0) has the HA_NOSAME flag set, we know that we have found the
7782
correct record (since there can be no duplicates); otherwise, we
7783
have to compare the record with the one found to see if it is
7786
CAVEAT! This behaviour is essential for the replication of,
7787
e.g., the mysql.proc table since the correct record *shall* be
7788
found using the primary key *only*. There shall be no
7789
comparison of non-PK columns to decide if the correct record is
7790
found. I can see no scenario where it would be incorrect to
7791
chose the row to change only using a PK or an UNNI.
7793
if (table->key_info->flags & HA_NOSAME)
7795
table->file->ha_index_end();
7800
In case key is not unique, we still have to iterate over records found
7801
and find the one which is identical to the row given. A copy of the
7802
record we are looking for is stored in record[1].
7804
while (record_compare(table))
7807
We need to set the null bytes to ensure that the filler bit
7808
are all set when returning. There are storage engines that
7809
just set the necessary bits on the bytes and don't set the
7810
filler bits correctly.
7812
TODO[record format ndb]: Remove this code once NDB returns the
7813
correct record format.
7815
if (table->s->null_bytes > 0)
7817
table->record[0][table->s->null_bytes - 1]|=
7818
256U - (1U << table->s->last_null_bit_pos);
7821
if ((error= table->file->index_next(table->record[0])))
7823
table->file->print_error(error, MYF(0));
7824
table->file->ha_index_end();
7830
Have to restart the scan to be able to fetch the next row.
7832
table->file->ha_index_end();
7836
int restart_count= 0; // Number of times scanning has restarted from top
7838
/* We don't have a key: search the table using rnd_next() */
7839
if ((error= table->file->ha_rnd_init(1)))
7841
table->file->print_error(error, MYF(0));
7845
/* Continue until we find the right record or have made a full loop */
7848
error= table->file->rnd_next(table->record[0]);
7853
case HA_ERR_RECORD_DELETED:
7856
case HA_ERR_END_OF_FILE:
7857
if (++restart_count < 2)
7858
table->file->ha_rnd_init(1);
7862
table->file->print_error(error, MYF(0));
7863
table->file->ha_rnd_end();
7867
while (restart_count < 2 && record_compare(table));
7870
Note: above record_compare will take into accout all record fields
7871
which might be incorrect in case a partial row was given in the event
7873
table->file->ha_rnd_end();
7875
assert(error == HA_ERR_END_OF_FILE || error == HA_ERR_RECORD_DELETED || error == 0);
7879
table->default_column_bitmaps();
7882
table->default_column_bitmaps();
7889
Constructor used to build an event for writing to the binary log.
7892
#ifndef MYSQL_CLIENT
7893
Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
7895
bool is_transactional)
7896
: Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
7899
#endif /* #if !defined(MYSQL_CLIENT) */
7902
Constructor used by slave to read the event from the binary log.
7904
#ifdef HAVE_REPLICATION
7905
Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len,
7906
const Format_description_log_event
7908
: Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
7913
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
7916
Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
7918
if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
7919
m_table->s->primary_key < MAX_KEY)
7922
We don't need to allocate any memory for m_key since it is not used.
7927
if (m_table->s->keys > 0)
7929
// Allocate buffer for key searches
7930
m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
7932
return HA_ERR_OUT_OF_MEM;
7939
Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
7942
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
7943
m_table->file->ha_index_or_rnd_end();
7944
my_free(m_key, MYF(MY_ALLOW_ZERO_PTR));
7950
int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
7953
assert(m_table != NULL);
7955
if (!(error= find_row(rli)))
7958
Delete the record found, located in record[0]
7960
error= m_table->file->ha_delete_row(m_table->record[0]);
7965
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
7968
void Delete_rows_log_event::print(FILE *file,
7969
PRINT_EVENT_INFO* print_event_info)
7971
Rows_log_event::print_helper(file, print_event_info, "Delete_rows");
7976
/**************************************************************************
7977
Update_rows_log_event member functions
7978
**************************************************************************/
7981
Constructor used to build an event for writing to the binary log.
7983
#if !defined(MYSQL_CLIENT)
7984
Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
7986
bool is_transactional)
7987
: Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
7989
init(tbl_arg->write_set);
7992
void Update_rows_log_event::init(MY_BITMAP const *cols)
7994
/* if bitmap_init fails, caught in is_valid() */
7995
if (likely(!bitmap_init(&m_cols_ai,
7996
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
8000
/* Cols can be zero if this is a dummy binrows event */
8001
if (likely(cols != NULL))
8003
memcpy(m_cols_ai.bitmap, cols->bitmap, no_bytes_in_map(cols));
8004
create_last_word_mask(&m_cols_ai);
8008
#endif /* !defined(MYSQL_CLIENT) */
8011
Update_rows_log_event::~Update_rows_log_event()
8013
if (m_cols_ai.bitmap == m_bitbuf_ai) // no my_malloc happened
8014
m_cols_ai.bitmap= 0; // so no my_free in bitmap_free
8015
bitmap_free(&m_cols_ai); // To pair with bitmap_init().
8020
Constructor used by slave to read the event from the binary log.
8022
#ifdef HAVE_REPLICATION
8023
Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len,
8025
Format_description_log_event
8027
: Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
8032
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
8035
Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
8037
if (m_table->s->keys > 0)
8039
// Allocate buffer for key searches
8040
m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
8042
return HA_ERR_OUT_OF_MEM;
8045
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
8051
Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
8054
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
8055
m_table->file->ha_index_or_rnd_end();
8056
my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc
8063
Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
8065
assert(m_table != NULL);
8067
int error= find_row(rli);
8071
We need to read the second image in the event of error to be
8072
able to skip to the next pair of updates
8074
m_curr_row= m_curr_row_end;
8075
unpack_current_row(rli, &m_cols_ai);
8080
This is the situation after locating BI:
8082
===|=== before image ====|=== after image ===|===
8084
m_curr_row m_curr_row_end
8086
BI found in the table is stored in record[0]. We copy it to record[1]
8087
and unpack AI to record[0].
8090
store_record(m_table,record[1]);
8092
m_curr_row= m_curr_row_end;
8093
error= unpack_current_row(rli, &m_cols_ai); // this also updates m_curr_row_end
8096
Now we have the right row to update. The old row (the one we're
8097
looking for) is in record[1] and the new row is in record[0].
8100
// Temporary fix to find out why it fails [/Matz]
8101
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
8102
memcpy(m_table->write_set->bitmap, m_cols_ai.bitmap, (m_table->write_set->n_bits + 7) / 8);
8104
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
8105
if (error == HA_ERR_RECORD_IS_THE_SAME)
8111
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
8114
void Update_rows_log_event::print(FILE *file,
8115
PRINT_EVENT_INFO* print_event_info)
8117
Rows_log_event::print_helper(file, print_event_info, "Update_rows");
8122
Incident_log_event::Incident_log_event(const char *buf, uint event_len,
8123
const Format_description_log_event *descr_event)
8124
: Log_event(buf, descr_event)
8126
uint8 const common_header_len=
8127
descr_event->common_header_len;
8128
uint8 const post_header_len=
8129
descr_event->post_header_len[INCIDENT_EVENT-1];
8131
m_incident= static_cast<Incident>(uint2korr(buf + common_header_len));
8132
char const *ptr= buf + common_header_len + post_header_len;
8133
char const *const str_end= buf + event_len;
8134
uint8 len= 0; // Assignment to keep compiler happy
8135
const char *str= NULL; // Assignment to keep compiler happy
8136
read_str(&ptr, str_end, &str, &len);
8137
m_message.str= const_cast<char*>(str);
8138
m_message.length= len;
8143
Incident_log_event::~Incident_log_event()
8149
Incident_log_event::description() const
8151
static const char *const description[]= {
8152
"NOTHING", // Not used
8156
assert(0 <= m_incident);
8157
assert((size_t) m_incident <= sizeof(description)/sizeof(*description));
8159
return description[m_incident];
8163
#ifndef MYSQL_CLIENT
8164
void Incident_log_event::pack_info(Protocol *protocol)
8168
if (m_message.length > 0)
8169
bytes= snprintf(buf, sizeof(buf), "#%d (%s)",
8170
m_incident, description());
8172
bytes= snprintf(buf, sizeof(buf), "#%d (%s): %s",
8173
m_incident, description(), m_message.str);
8174
protocol->store(buf, bytes, &my_charset_bin);
8181
Incident_log_event::print(FILE *file,
8182
PRINT_EVENT_INFO *print_event_info)
8184
if (print_event_info->short_form)
8187
Write_on_release_cache cache(&print_event_info->head_cache, file);
8188
print_header(&cache, print_event_info, false);
8189
my_b_printf(&cache, "\n# Incident: %s", description());
8193
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
8195
Incident_log_event::do_apply_event(Relay_log_info const *rli)
8197
rli->report(ERROR_LEVEL, ER_SLAVE_INCIDENT,
8198
ER(ER_SLAVE_INCIDENT),
8200
m_message.length > 0 ? m_message.str : "<none>");
8206
Incident_log_event::write_data_header(IO_CACHE *file)
8208
uchar buf[sizeof(int16)];
8209
int2store(buf, (int16) m_incident);
8210
return(my_b_safe_write(file, buf, sizeof(buf)));
8214
Incident_log_event::write_data_body(IO_CACHE *file)
8216
return(write_str(file, m_message.str, m_message.length));
8219
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
8220
Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len,
8221
const Format_description_log_event* description_event)
8222
:Log_event(buf, description_event)
8224
uint8 header_size= description_event->common_header_len;
8225
ident_len = event_len - header_size;
8226
set_if_smaller(ident_len,FN_REFLEN-1);
8227
log_ident= buf + header_size;
8234
The default values for these variables should be values that are
8235
*incorrect*, i.e., values that cannot occur in an event. This way,
8236
they will always be printed for the first event.
8238
st_print_event_info::st_print_event_info()
8239
:flags2_inited(0), sql_mode_inited(0),
8240
auto_increment_increment(0),auto_increment_offset(0), charset_inited(0),
8241
lc_time_names_number(~0),
8242
charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER),
8243
thread_id(0), thread_id_printed(false),
8244
base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(false)
8247
Currently we only use static PRINT_EVENT_INFO objects, so zeroed at
8248
program's startup, but these explicit bzero() is for the day someone
8249
creates dynamic instances.
8251
bzero(db, sizeof(db));
8252
bzero(charset, sizeof(charset));
8253
bzero(time_zone_str, sizeof(time_zone_str));
8256
myf const flags = MYF(MY_WME | MY_NABP);
8257
open_cached_file(&head_cache, NULL, NULL, 0, flags);
8258
open_cached_file(&body_cache, NULL, NULL, 0, flags);