1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008 Sun Microsystems
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; version 2 of the License.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20
#include <drizzled/server_includes.h>
23
#include "rpl_filter.h"
24
#include "rpl_utility.h"
25
#include "rpl_record.h"
26
#include <mysys/my_dir.h>
27
#include <drizzled/error.h>
28
#include <libdrizzle/pack.h>
29
#include <drizzled/sql_parse.h>
33
#include <mysys/base64.h>
34
#include <mysys/my_bitmap.h>
36
#include <drizzled/gettext.h>
37
#include <libdrizzle/libdrizzle.h>
38
#include <drizzled/error.h>
39
#include <drizzled/query_id.h>
40
#include <drizzled/tztime.h>
41
#include <drizzled/slave.h>
44
static const char *HA_ERR(int i)
47
case HA_ERR_KEY_NOT_FOUND: return "HA_ERR_KEY_NOT_FOUND";
48
case HA_ERR_FOUND_DUPP_KEY: return "HA_ERR_FOUND_DUPP_KEY";
49
case HA_ERR_RECORD_CHANGED: return "HA_ERR_RECORD_CHANGED";
50
case HA_ERR_WRONG_INDEX: return "HA_ERR_WRONG_INDEX";
51
case HA_ERR_CRASHED: return "HA_ERR_CRASHED";
52
case HA_ERR_WRONG_IN_RECORD: return "HA_ERR_WRONG_IN_RECORD";
53
case HA_ERR_OUT_OF_MEM: return "HA_ERR_OUT_OF_MEM";
54
case HA_ERR_NOT_A_TABLE: return "HA_ERR_NOT_A_TABLE";
55
case HA_ERR_WRONG_COMMAND: return "HA_ERR_WRONG_COMMAND";
56
case HA_ERR_OLD_FILE: return "HA_ERR_OLD_FILE";
57
case HA_ERR_NO_ACTIVE_RECORD: return "HA_ERR_NO_ACTIVE_RECORD";
58
case HA_ERR_RECORD_DELETED: return "HA_ERR_RECORD_DELETED";
59
case HA_ERR_RECORD_FILE_FULL: return "HA_ERR_RECORD_FILE_FULL";
60
case HA_ERR_INDEX_FILE_FULL: return "HA_ERR_INDEX_FILE_FULL";
61
case HA_ERR_END_OF_FILE: return "HA_ERR_END_OF_FILE";
62
case HA_ERR_UNSUPPORTED: return "HA_ERR_UNSUPPORTED";
63
case HA_ERR_TO_BIG_ROW: return "HA_ERR_TO_BIG_ROW";
64
case HA_WRONG_CREATE_OPTION: return "HA_WRONG_CREATE_OPTION";
65
case HA_ERR_FOUND_DUPP_UNIQUE: return "HA_ERR_FOUND_DUPP_UNIQUE";
66
case HA_ERR_UNKNOWN_CHARSET: return "HA_ERR_UNKNOWN_CHARSET";
67
case HA_ERR_WRONG_MRG_TABLE_DEF: return "HA_ERR_WRONG_MRG_TABLE_DEF";
68
case HA_ERR_CRASHED_ON_REPAIR: return "HA_ERR_CRASHED_ON_REPAIR";
69
case HA_ERR_CRASHED_ON_USAGE: return "HA_ERR_CRASHED_ON_USAGE";
70
case HA_ERR_LOCK_WAIT_TIMEOUT: return "HA_ERR_LOCK_WAIT_TIMEOUT";
71
case HA_ERR_LOCK_TABLE_FULL: return "HA_ERR_LOCK_TABLE_FULL";
72
case HA_ERR_READ_ONLY_TRANSACTION: return "HA_ERR_READ_ONLY_TRANSACTION";
73
case HA_ERR_LOCK_DEADLOCK: return "HA_ERR_LOCK_DEADLOCK";
74
case HA_ERR_CANNOT_ADD_FOREIGN: return "HA_ERR_CANNOT_ADD_FOREIGN";
75
case HA_ERR_NO_REFERENCED_ROW: return "HA_ERR_NO_REFERENCED_ROW";
76
case HA_ERR_ROW_IS_REFERENCED: return "HA_ERR_ROW_IS_REFERENCED";
77
case HA_ERR_NO_SAVEPOINT: return "HA_ERR_NO_SAVEPOINT";
78
case HA_ERR_NON_UNIQUE_BLOCK_SIZE: return "HA_ERR_NON_UNIQUE_BLOCK_SIZE";
79
case HA_ERR_NO_SUCH_TABLE: return "HA_ERR_NO_SUCH_TABLE";
80
case HA_ERR_TABLE_EXIST: return "HA_ERR_TABLE_EXIST";
81
case HA_ERR_NO_CONNECTION: return "HA_ERR_NO_CONNECTION";
82
case HA_ERR_NULL_IN_SPATIAL: return "HA_ERR_NULL_IN_SPATIAL";
83
case HA_ERR_TABLE_DEF_CHANGED: return "HA_ERR_TABLE_DEF_CHANGED";
84
case HA_ERR_NO_PARTITION_FOUND: return "HA_ERR_NO_PARTITION_FOUND";
85
case HA_ERR_RBR_LOGGING_FAILED: return "HA_ERR_RBR_LOGGING_FAILED";
86
case HA_ERR_DROP_INDEX_FK: return "HA_ERR_DROP_INDEX_FK";
87
case HA_ERR_FOREIGN_DUPLICATE_KEY: return "HA_ERR_FOREIGN_DUPLICATE_KEY";
88
case HA_ERR_TABLE_NEEDS_UPGRADE: return "HA_ERR_TABLE_NEEDS_UPGRADE";
89
case HA_ERR_TABLE_READONLY: return "HA_ERR_TABLE_READONLY";
90
case HA_ERR_AUTOINC_READ_FAILED: return "HA_ERR_AUTOINC_READ_FAILED";
91
case HA_ERR_AUTOINC_ERANGE: return "HA_ERR_AUTOINC_ERANGE";
92
case HA_ERR_GENERIC: return "HA_ERR_GENERIC";
93
case HA_ERR_RECORD_IS_THE_SAME: return "HA_ERR_RECORD_IS_THE_SAME";
94
case HA_ERR_LOGGING_IMPOSSIBLE: return "HA_ERR_LOGGING_IMPOSSIBLE";
95
case HA_ERR_CORRUPT_EVENT: return "HA_ERR_CORRUPT_EVENT";
96
case HA_ERR_ROWS_EVENT_APPLY : return "HA_ERR_ROWS_EVENT_APPLY";
102
Error reporting facility for Rows_log_event::do_apply_event
104
@param level error, warning or info
105
@param ha_error HA_ERR_ code
106
@param rli pointer to the active Relay_log_info instance
107
@param session pointer to the slave thread's session
108
@param table pointer to the event's table object
109
@param type the type of the event
110
@param log_name the master binlog file name
111
@param pos the master binlog file pos (the next after the event)
114
static void inline slave_rows_error_report(enum loglevel level, int ha_error,
115
Relay_log_info const *rli, Session *session,
116
Table *table, const char * type,
117
const char *log_name, ulong pos)
119
const char *handler_error= HA_ERR(ha_error);
120
char buff[MAX_SLAVE_ERRMSG], *slider;
121
const char *buff_end= buff + sizeof(buff);
123
List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
127
for (err= it++, slider= buff; err && slider < buff_end - 1;
128
slider += len, err= it++)
130
len= snprintf(slider, buff_end - slider,
131
_(" %s, Error_code: %d;"), err->msg, err->code);
134
rli->report(level, session->is_error()? session->main_da.sql_errno() : 0,
135
_("Could not execute %s event on table %s.%s;"
136
"%s handler error %s; "
137
"the event's master log %s, end_log_pos %lu"),
138
type, table->s->db.str,
139
table->s->table_name.str,
141
handler_error == NULL? _("<unknown>") : handler_error,
147
Cache that will automatically be written to a dedicated file on
153
class Write_on_release_cache
161
typedef unsigned short flag_set;
167
Write_on_release_cache
168
cache Pointer to cache to use
169
file File to write cache to upon destruction
170
flags Flags for the cache
174
Class used to guarantee copy of cache to file before exiting the
175
current block. On successful copy of the cache, the cache will
176
be reinited as a WRITE_CACHE.
178
Currently, a pointer to the cache is provided in the
179
constructor, but it would be possible to create a subclass
180
holding the IO_CACHE itself.
182
Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0)
183
: m_cache(cache), m_file(file), m_flags(flags)
185
reinit_io_cache(m_cache, WRITE_CACHE, 0L, false, true);
188
~Write_on_release_cache()
190
copy_event_cache_to_file_and_reinit(m_cache, m_file);
191
if (m_flags | FLUSH_F)
196
Return a pointer to the internal IO_CACHE.
203
Function to return a pointer to the internal cache, so that the
204
object can be treated as a IO_CACHE and used with the my_b_*
208
A pointer to the internal IO_CACHE.
210
IO_CACHE *operator&()
216
// Hidden, to prevent usage.
217
Write_on_release_cache(Write_on_release_cache const&);
224
uint32_t debug_not_change_ts_if_art_event= 1; // bug#29309 simulation
230
static void clear_all_errors(Session *session, Relay_log_info *rli)
232
session->is_slave_error = 0;
233
session->clear_error();
239
Ignore error code specified on command line.
242
inline int ignored_error_code(int err_code)
244
return ((err_code == ER_SLAVE_IGNORED_TABLE) ||
245
(use_slave_mask && bitmap_is_set(&slave_error_mask, err_code)));
253
static char *pretty_print_str(char *packet, const char *str, int len)
255
const char *end= str + len;
261
switch ((c=*str++)) {
262
case '\n': *pos++= '\\'; *pos++= 'n'; break;
263
case '\r': *pos++= '\\'; *pos++= 'r'; break;
264
case '\\': *pos++= '\\'; *pos++= '\\'; break;
265
case '\b': *pos++= '\\'; *pos++= 'b'; break;
266
case '\t': *pos++= '\\'; *pos++= 't'; break;
267
case '\'': *pos++= '\\'; *pos++= '\''; break;
268
case 0 : *pos++= '\\'; *pos++= '0'; break;
280
Creates a temporary name for load data infile:.
282
@param buf Store new filename here
283
@param file_id File_id (part of file name)
284
@param event_server_id Event_id (part of file name)
285
@param ext Extension for file name
288
Pointer to start of extension
291
static char *slave_load_file_stem(char *buf, uint32_t file_id,
292
int event_server_id, const char *ext)
295
fn_format(buf,"SQL_LOAD-",slave_load_tmpdir, "", MY_UNPACK_FILENAME);
298
buf= strchr(buf, '\0');
299
buf= int10_to_str(::server_id, buf, 10);
301
buf= int10_to_str(event_server_id, buf, 10);
303
res= int10_to_str(file_id, buf, 10);
304
my_stpcpy(res, ext); // Add extension last
305
return res; // Pointer to extension
310
Delete all temporary files used for SQL_LOAD.
313
static void cleanup_load_tmpdir()
318
char fname[FN_REFLEN], prefbuf[31], *p;
320
if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
324
When we are deleting temporary files, we should only remove
325
the files associated with the server id of our server.
326
We don't use event_server_id here because since we've disabled
327
direct binlogging of Create_file/Append_file/Exec_load events
328
we cannot meet Start_log event in the middle of events from one
331
p= strmake(prefbuf, STRING_WITH_LEN("SQL_LOAD-"));
332
p= int10_to_str(::server_id, p, 10);
336
for (i=0 ; i < (uint)dirp->number_off_files; i++)
338
file=dirp->dir_entry+i;
339
if (is_prefix(file->name, prefbuf))
341
fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME);
342
my_delete(fname, MYF(0));
354
static bool write_str(IO_CACHE *file, const char *str, uint32_t length)
356
unsigned char tmp[1];
357
tmp[0]= (unsigned char) length;
358
return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
359
my_b_safe_write(file, (unsigned char*) str, length));
367
static inline int read_str(const char **buf, const char *buf_end,
368
const char **str, uint8_t *len)
370
if (*buf + ((uint) (unsigned char) **buf) >= buf_end)
372
*len= (uint8_t) **buf;
374
(*buf)+= (uint) *len+1;
380
Transforms a string into "" or its expression in 0x... form.
383
char *str_to_hex(char *to, const char *from, uint32_t len)
389
to= octet2hex(to, from, len);
392
to= my_stpcpy(to, "\"\"");
393
return to; // pointer to end 0 of 'to'
398
Append a version of the 'from' string suitable for use in a query to
399
the 'to' string. To generate a correct escaping, the character set
400
information in 'csinfo' is used.
404
append_query_string(const CHARSET_INFO * const csinfo,
405
String const *from, String *to)
408
uint32_t const orig_len= to->length();
409
if (to->reserve(orig_len + from->length()*2+3))
412
beg= to->c_ptr_quick() + to->length();
414
if (csinfo->escape_with_backslash_is_dangerous)
415
ptr= str_to_hex(ptr, from->ptr(), from->length());
419
ptr+= drizzle_escape_string(ptr, from->ptr(), from->length());
422
to->length(orig_len + ptr - beg);
427
/**************************************************************************
428
Log_event methods (= the parent class of all events)
429
**************************************************************************/
433
returns the human readable name of the event's type
436
const char* Log_event::get_type_str(Log_event_type type)
439
case START_EVENT_V3: return "Start_v3";
440
case STOP_EVENT: return "Stop";
441
case QUERY_EVENT: return "Query";
442
case ROTATE_EVENT: return "Rotate";
443
case INTVAR_EVENT: return "Intvar";
444
case LOAD_EVENT: return "Load";
445
case NEW_LOAD_EVENT: return "New_load";
446
case SLAVE_EVENT: return "Slave";
447
case CREATE_FILE_EVENT: return "Create_file";
448
case APPEND_BLOCK_EVENT: return "Append_block";
449
case DELETE_FILE_EVENT: return "Delete_file";
450
case EXEC_LOAD_EVENT: return "Exec_load";
451
case RAND_EVENT: return "RAND";
452
case XID_EVENT: return "Xid";
453
case USER_VAR_EVENT: return "User var";
454
case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
455
case TABLE_MAP_EVENT: return "Table_map";
456
case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old";
457
case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old";
458
case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old";
459
case WRITE_ROWS_EVENT: return "Write_rows";
460
case UPDATE_ROWS_EVENT: return "Update_rows";
461
case DELETE_ROWS_EVENT: return "Delete_rows";
462
case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
463
case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
464
case INCIDENT_EVENT: return "Incident";
465
default: return "Unknown"; /* impossible */
469
const char* Log_event::get_type_str()
471
return get_type_str(get_type_code());
476
Log_event::Log_event()
479
Log_event::Log_event(Session* session_arg, uint16_t flags_arg, bool using_trans)
480
:log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), session(session_arg)
482
server_id= session->server_id;
483
when= session->start_time;
484
cache_stmt= using_trans;
489
This minimal constructor is for when you are not even sure that there
490
is a valid Session. For example in the server when we are shutting down or
491
flushing logs after receiving a SIGHUP (then we must write a Rotate to
492
the binlog but we have no Session, so we need this minimal constructor).
495
Log_event::Log_event()
496
:temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
499
server_id= ::server_id;
501
We can't call my_time() here as this would cause a call before
510
Log_event::Log_event()
513
Log_event::Log_event(const char* buf,
514
const Format_description_log_event* description_event)
515
:temp_buf(0), cache_stmt(0)
518
when= uint4korr(buf);
519
server_id= uint4korr(buf + SERVER_ID_OFFSET);
520
data_written= uint4korr(buf + EVENT_LEN_OFFSET);
521
if (description_event->binlog_version==1)
528
log_pos= uint4korr(buf + LOG_POS_OFFSET);
530
If the log is 4.0 (so here it can only be a 4.0 relay log read by
531
the SQL thread or a 4.0 master binlog read by the I/O thread),
532
log_pos is the beginning of the event: we transform it into the end
533
of the event, which is more useful.
534
But how do you know that the log is 4.0: you know it if
535
description_event is version 3 *and* you are not reading a
536
Format_desc (remember that mysqlbinlog starts by assuming that 5.0
537
logs are in 4.0 format, until it finds a Format_desc).
539
if (description_event->binlog_version==3 &&
540
buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
543
If log_pos=0, don't change it. log_pos==0 is a marker to mean
544
"don't change rli->group_master_log_pos" (see
545
inc_group_relay_log_pos()). As it is unreal log_pos, adding the
546
event len's is nonsense. For example, a fake Rotate event should
547
not have its log_pos (which is 0) changed or it will modify
548
Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense
549
value of (a non-zero offset which does not exist in the master's
550
binlog, so which will cause problems if the user uses this value
553
log_pos+= data_written; /* purecov: inspected */
556
flags= uint2korr(buf + FLAGS_OFFSET);
557
if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
558
(buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
561
These events always have a header which stops here (i.e. their
565
Initialization to zero of all other Log_event members as they're
566
not specified. Currently there are no such members; in the future
567
there will be an event UID (but Format_description and Rotate
568
don't need this UID, as they are not propagated through
569
--log-slave-updates (remember the UID is used to not play a query
570
twice when you have two masters which are slaves of a 3rd master).
575
/* otherwise, go on with reading the header from buf (nothing now) */
579
int Log_event::do_update_pos(Relay_log_info *rli)
582
rli is null when (as far as I (Guilhem) know) the caller is
583
Load_log_event::do_apply_event *and* that one is called from
584
Execute_load_log_event::do_apply_event. In this case, we don't
585
do anything here ; Execute_load_log_event::do_apply_event will
586
call Log_event::do_apply_event again later with the proper rli.
587
Strictly speaking, if we were sure that rli is null only in the
588
case discussed above, 'if (rli)' is useless here. But as we are
589
not 100% sure, keep it for now.
591
Matz: I don't think we will need this check with this refactoring.
596
bug#29309 simulation: resetting the flag to force
597
wrong behaviour of artificial event to update
598
rli->last_master_timestamp for only one time -
599
the first FLUSH LOGS in the test.
601
if (debug_not_change_ts_if_art_event == 1
602
&& is_artificial_event())
603
debug_not_change_ts_if_art_event= 0;
604
rli->stmt_done(log_pos,
605
is_artificial_event() &&
606
debug_not_change_ts_if_art_event > 0 ? 0 : when);
607
if (debug_not_change_ts_if_art_event == 0)
608
debug_not_change_ts_if_art_event= 2;
610
return 0; // Cannot fail currently
614
Log_event::enum_skip_reason
615
Log_event::do_shall_skip(Relay_log_info *rli)
617
if ((server_id == ::server_id && !rli->replicate_same_server_id) || (rli->slave_skip_counter == 1 && rli->is_in_group()))
618
return EVENT_SKIP_IGNORE;
619
else if (rli->slave_skip_counter > 0)
620
return EVENT_SKIP_COUNT;
622
return EVENT_SKIP_NOT;
627
Log_event::pack_info()
630
void Log_event::pack_info(Protocol *protocol)
632
protocol->store("", &my_charset_bin);
637
init_show_field_list() prepares the column names and types for the
638
output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
642
void Log_event::init_show_field_list(List<Item>* field_list)
644
field_list->push_back(new Item_empty_string("Log_name", 20));
645
field_list->push_back(new Item_return_int("Pos", MY_INT32_NUM_DECIMAL_DIGITS,
646
DRIZZLE_TYPE_LONGLONG));
647
field_list->push_back(new Item_empty_string("Event_type", 20));
648
field_list->push_back(new Item_return_int("Server_id", 10,
650
field_list->push_back(new Item_return_int("End_log_pos",
651
MY_INT32_NUM_DECIMAL_DIGITS,
652
DRIZZLE_TYPE_LONGLONG));
653
field_list->push_back(new Item_empty_string("Info", 20));
660
bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
662
unsigned char header[LOG_EVENT_HEADER_LEN];
665
/* Store number of bytes that will be written by this event */
666
data_written= event_data_length + sizeof(header);
669
log_pos != 0 if this is relay-log event. In this case we should not
673
if (is_artificial_event())
676
We should not do any cleanup on slave when reading this. We
677
mark this by setting log_pos to 0. Start_log_event_v3() will
678
detect this on reading and set artificial_event=1 for the event.
685
Calculate position of end of event
687
Note that with a SEQ_READ_APPEND cache, my_b_tell() does not
688
work well. So this will give slightly wrong positions for the
689
Format_desc/Rotate/Stop events which the slave writes to its
690
relay log. For example, the initial Format_desc will have
691
end_log_pos=91 instead of 95. Because after writing the first 4
692
bytes of the relay log, my_b_tell() still reports 0. Because
693
my_b_append() does not update the counter which my_b_tell()
694
later uses (one should probably use my_b_append_tell() to work
695
around this). To get right positions even when writing to the
696
relay log, we use the (new) my_b_safe_tell().
698
Note that this raises a question on the correctness of all these
699
assert(my_b_tell()=rli->event_relay_log_pos).
701
If in a transaction, the log_pos which we calculate below is not
702
very good (because then my_b_safe_tell() returns start position
703
of the BEGIN, so it's like the statement was at the BEGIN's
704
place), but it's not a very serious problem (as the slave, when
705
it is in a transaction, does not take those end_log_pos into
706
account (as it calls inc_event_relay_log_pos()). To be fixed
707
later, so that it looks less strange. But not bug.
710
log_pos= my_b_safe_tell(file)+data_written;
713
now= (ulong) get_time(); // Query start time
716
Header will be of size LOG_EVENT_HEADER_LEN for all events, except for
717
FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT, where it will be
718
LOG_EVENT_MINIMAL_HEADER_LEN (remember these 2 have a frozen header,
719
because we read them before knowing the format).
722
int4store(header, now); // timestamp
723
header[EVENT_TYPE_OFFSET]= get_type_code();
724
int4store(header+ SERVER_ID_OFFSET, server_id);
725
int4store(header+ EVENT_LEN_OFFSET, data_written);
726
int4store(header+ LOG_POS_OFFSET, log_pos);
727
int2store(header+ FLAGS_OFFSET, flags);
729
return(my_b_safe_write(file, header, sizeof(header)) != 0);
734
This needn't be format-tolerant, because we only read
735
LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length).
738
int Log_event::read_log_event(IO_CACHE* file, String* packet,
739
pthread_mutex_t* log_lock)
743
char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
746
pthread_mutex_lock(log_lock);
747
if (my_b_read(file, (unsigned char*) buf, sizeof(buf)))
750
If the read hits eof, we must report it as eof so the caller
751
will know it can go into cond_wait to be woken up on the next
755
result= LOG_READ_EOF;
757
result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
760
data_len= uint4korr(buf + EVENT_LEN_OFFSET);
761
if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
762
data_len > current_session->variables.max_allowed_packet)
764
result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
769
/* Append the log event header to packet */
770
if (packet->append(buf, sizeof(buf)))
772
/* Failed to allocate packet */
773
result= LOG_READ_MEM;
776
data_len-= LOG_EVENT_MINIMAL_HEADER_LEN;
779
/* Append rest of event, read directly from file into packet */
780
if (packet->append(file, data_len))
783
Fatal error occured when appending rest of the event
784
to packet, possible failures:
785
1. EOF occured when reading from file, it's really an error
786
as data_len is >=0 there's supposed to be more bytes available.
787
file->error will have been set to number of bytes left to read
788
2. Read was interrupted, file->error would normally be set to -1
789
3. Failed to allocate memory for packet, my_errno
790
will be ENOMEM(file->error shuold be 0, but since the
791
memory allocation occurs before the call to read it might
794
result= (my_errno == ENOMEM ? LOG_READ_MEM :
795
(file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
796
/* Implicit goto end; */
802
pthread_mutex_unlock(log_lock);
806
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
807
#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
811
Allocates memory; The caller is responsible for clean-up.
813
Log_event* Log_event::read_log_event(IO_CACHE* file,
814
pthread_mutex_t* log_lock,
815
const Format_description_log_event
818
assert(description_event != 0);
819
char head[LOG_EVENT_MINIMAL_HEADER_LEN];
821
First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
822
check the event for sanity and to know its length; no need to really parse
823
it. We say "at most" because this could be a 3.23 master, which has header
824
of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's
825
"minimal" over the set {MySQL >=4.0}).
827
uint32_t header_size= cmin(description_event->common_header_len,
828
LOG_EVENT_MINIMAL_HEADER_LEN);
831
if (my_b_read(file, (unsigned char *) head, header_size))
835
No error here; it could be that we are at the file's end. However
836
if the next my_b_read() fails (below), it will be an error as we
837
were able to read the first bytes.
841
uint32_t data_len = uint4korr(head + EVENT_LEN_OFFSET);
843
const char *error= 0;
845
#ifndef max_allowed_packet
846
Session *session=current_session;
847
uint32_t max_allowed_packet= session ? session->variables.max_allowed_packet : ~(ulong)0;
850
if (data_len > max_allowed_packet)
852
error = "Event too big";
856
if (data_len < header_size)
858
error = "Event too small";
862
// some events use the extra byte to null-terminate strings
863
if (!(buf = (char*) my_malloc(data_len+1, MYF(MY_WME))))
865
error = "Out of memory";
869
memcpy(buf, head, header_size);
870
if (my_b_read(file, (unsigned char*) buf + header_size, data_len - header_size))
872
error = "read error";
875
if ((res= read_log_event(buf, data_len, &error, description_event)))
876
res->register_temp_buf(buf);
883
sql_print_error(_("Error in Log_event::read_log_event(): "
884
"'%s', data_len: %d, event_type: %d"),
885
error,data_len,head[EVENT_TYPE_OFFSET]);
888
The SQL slave thread will check if file->error<0 to know
889
if there was an I/O error. Even if there is no "low-level" I/O errors
890
with 'file', any of the high-level above errors is worrying
891
enough to stop the SQL thread now ; as we are skipping the current event,
892
going on with reading and successfully executing other events can
893
only corrupt the slave's databases. So stop.
902
Binlog format tolerance is in (buf, event_len, description_event)
906
Log_event* Log_event::read_log_event(const char* buf, uint32_t event_len,
908
const Format_description_log_event *description_event)
911
assert(description_event != 0);
913
/* Check the integrity */
914
if (event_len < EVENT_LEN_OFFSET ||
915
buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
916
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
918
*error="Sanity check failed"; // Needed to free buffer
919
return(NULL); // general sanity check - will fail on a partial read
922
uint32_t event_type= buf[EVENT_TYPE_OFFSET];
923
if (event_type > description_event->number_of_event_types &&
924
event_type != FORMAT_DESCRIPTION_EVENT)
927
It is unsafe to use the description_event if its post_header_len
928
array does not include the event type.
935
In some previuos versions (see comment in
936
Format_description_log_event::Format_description_log_event(char*,...)),
937
event types were assigned different id numbers than in the
938
present version. In order to replicate from such versions to the
939
present version, we must map those event type id's to our event
940
type id's. The mapping is done with the event_type_permutation
941
array, which was set up when the Format_description_log_event
944
if (description_event->event_type_permutation)
945
event_type= description_event->event_type_permutation[event_type];
949
ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
952
ev = new Load_log_event(buf, event_len, description_event);
955
ev = new Load_log_event(buf, event_len, description_event);
958
ev = new Rotate_log_event(buf, event_len, description_event);
960
case CREATE_FILE_EVENT:
961
ev = new Create_file_log_event(buf, event_len, description_event);
963
case APPEND_BLOCK_EVENT:
964
ev = new Append_block_log_event(buf, event_len, description_event);
966
case DELETE_FILE_EVENT:
967
ev = new Delete_file_log_event(buf, event_len, description_event);
969
case EXEC_LOAD_EVENT:
970
ev = new Execute_load_log_event(buf, event_len, description_event);
972
case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
973
ev = new Start_log_event_v3(buf, description_event);
976
ev = new Stop_log_event(buf, description_event);
979
ev = new Intvar_log_event(buf, description_event);
982
ev = new Xid_log_event(buf, description_event);
985
ev = new Rand_log_event(buf, description_event);
988
ev = new User_var_log_event(buf, description_event);
990
case FORMAT_DESCRIPTION_EVENT:
991
ev = new Format_description_log_event(buf, event_len, description_event);
993
case WRITE_ROWS_EVENT:
994
ev = new Write_rows_log_event(buf, event_len, description_event);
996
case UPDATE_ROWS_EVENT:
997
ev = new Update_rows_log_event(buf, event_len, description_event);
999
case DELETE_ROWS_EVENT:
1000
ev = new Delete_rows_log_event(buf, event_len, description_event);
1002
case TABLE_MAP_EVENT:
1003
ev = new Table_map_log_event(buf, event_len, description_event);
1005
case BEGIN_LOAD_QUERY_EVENT:
1006
ev = new Begin_load_query_log_event(buf, event_len, description_event);
1008
case EXECUTE_LOAD_QUERY_EVENT:
1009
ev= new Execute_load_query_log_event(buf, event_len, description_event);
1011
case INCIDENT_EVENT:
1012
ev = new Incident_log_event(buf, event_len, description_event);
1021
is_valid() are small event-specific sanity tests which are
1022
important; for example there are some my_malloc() in constructors
1023
(e.g. Query_log_event::Query_log_event(char*...)); when these
1024
my_malloc() fail we can't return an error out of the constructor
1025
(because constructor is "void") ; so instead we leave the pointer we
1026
wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
1027
Same for Format_description_log_event, member 'post_header_len'.
1029
if (!ev || !ev->is_valid())
1032
*error= "Found invalid event in binary log";
1038
inline Log_event::enum_skip_reason
1039
Log_event::continue_group(Relay_log_info *rli)
1041
if (rli->slave_skip_counter == 1)
1042
return Log_event::EVENT_SKIP_IGNORE;
1043
return Log_event::do_shall_skip(rli);
1046
/**************************************************************************
1047
Query_log_event methods
1048
**************************************************************************/
1051
This (which is used only for SHOW BINLOG EVENTS) could be updated to
1052
print SET @@session_var=. But this is not urgent, as SHOW BINLOG EVENTS is
1053
only an information, it does not produce suitable queries to replay (for
1054
example it does not print LOAD DATA INFILE).
1059
void Query_log_event::pack_info(Protocol *protocol)
1061
// TODO: show the catalog ??
1063
if (!(buf= (char*) my_malloc(9 + db_len + q_len, MYF(MY_WME))))
1066
if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
1069
pos= my_stpcpy(buf, "use `");
1070
memcpy(pos, db, db_len);
1071
pos= my_stpcpy(pos+db_len, "`; ");
1075
memcpy(pos, query, q_len);
1078
protocol->store(buf, pos-buf, &my_charset_bin);
1084
Utility function for the next method (Query_log_event::write()) .
1086
static void write_str_with_code_and_len(char **dst, const char *src,
1087
int len, uint32_t code)
1091
*((*dst)++)= (unsigned char) len;
1092
memcpy(*dst, src, len);
1098
Query_log_event::write().
1101
In this event we have to modify the header to have the correct
1102
EVENT_LEN_OFFSET as we don't yet know how many status variables we
1106
bool Query_log_event::write(IO_CACHE* file)
1109
@todo if catalog can be of length FN_REFLEN==512, then we are not
1110
replicating it correctly, since the length is stored in a byte
1113
unsigned char buf[QUERY_HEADER_LEN+
1114
1+4+ // code of flags2 and flags2
1115
1+8+ // code of sql_mode and sql_mode
1116
1+1+FN_REFLEN+ // code of catalog and catalog length and catalog
1117
1+4+ // code of autoinc and the 2 autoinc variables
1118
1+6+ // code of charset and charset
1119
1+1+MAX_TIME_ZONE_NAME_LENGTH+ // code of tz and tz length and tz name
1120
1+2+ // code of lc_time_names and lc_time_names_number
1121
1+2 // code of charset_database and charset_database_number
1122
], *start, *start_of_status;
1126
return 1; // Something wrong with event
1129
We want to store the thread id:
1130
(- as an information for the user when he reads the binlog)
1131
- if the query uses temporary table: for the slave SQL thread to know to
1132
which master connection the temp table belongs.
1133
Now imagine we (write()) are called by the slave SQL thread (we are
1134
logging a query executed by this thread; the slave runs with
1135
--log-slave-updates). Then this query will be logged with
1136
thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of
1137
the same name were created simultaneously on the master (in the master
1139
CREATE TEMPORARY TABLE t; (thread 1)
1140
CREATE TEMPORARY TABLE t; (thread 2)
1142
then in the slave's binlog there will be
1143
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1144
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1145
which is bad (same thread id!).
1147
To avoid this, we log the thread's thread id EXCEPT for the SQL
1148
slave thread for which we log the original (master's) thread id.
1149
Now this moves the bug: what happens if the thread id on the
1150
master was 10 and when the slave replicates the query, a
1151
connection number 10 is opened by a normal client on the slave,
1152
and updates a temp table of the same name? We get a problem
1153
again. To avoid this, in the handling of temp tables (sql_base.cc)
1154
we use thread_id AND server_id. TODO when this is merged into
1155
4.1: in 4.1, slave_proxy_id has been renamed to pseudo_thread_id
1156
and is a session variable: that's to make mysqlbinlog work with
1157
temp tables. We probably need to introduce
1159
SET PSEUDO_SERVER_ID
1160
for mysqlbinlog in 4.1. mysqlbinlog would print:
1161
SET PSEUDO_SERVER_ID=
1162
SET PSEUDO_THREAD_ID=
1163
for each query using temp tables.
1165
int4store(buf + Q_THREAD_ID_OFFSET, slave_proxy_id);
1166
int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
1167
buf[Q_DB_LEN_OFFSET] = (char) db_len;
1168
int2store(buf + Q_ERR_CODE_OFFSET, error_code);
1171
You MUST always write status vars in increasing order of code. This
1172
guarantees that a slightly older slave will be able to parse those he
1175
start_of_status= start= buf+QUERY_HEADER_LEN;
1178
*start++= Q_FLAGS2_CODE;
1179
int4store(start, flags2);
1182
if (sql_mode_inited)
1184
*start++= Q_SQL_MODE_CODE;
1185
int8store(start, (uint64_t)sql_mode);
1188
if (catalog_len) // i.e. this var is inited (false for 4.0 events)
1190
write_str_with_code_and_len((char **)(&start),
1191
catalog, catalog_len, Q_CATALOG_NZ_CODE);
1193
In 5.0.x where x<4 masters we used to store the end zero here. This was
1194
a waste of one byte so we don't do it in x>=4 masters. We change code to
1195
Q_CATALOG_NZ_CODE, because re-using the old code would make x<4 slaves
1196
of this x>=4 master segfault (expecting a zero when there is
1197
none). Remaining compatibility problems are: the older slave will not
1198
find the catalog; but it is will not crash, and it's not an issue
1199
that it does not find the catalog as catalogs were not used in these
1200
older MySQL versions (we store it in binlog and read it from relay log
1201
but do nothing useful with it). What is an issue is that the older slave
1202
will stop processing the Q_* blocks (and jumps to the db/query) as soon
1203
as it sees unknown Q_CATALOG_NZ_CODE; so it will not be able to read
1204
Q_AUTO_INCREMENT*, Q_CHARSET and so replication will fail silently in
1205
various ways. Documented that you should not mix alpha/beta versions if
1206
they are not exactly the same version, with example of 5.0.3->5.0.2 and
1207
5.0.4->5.0.3. If replication is from older to new, the new will
1208
recognize Q_CATALOG_CODE and have no problem.
1211
if (auto_increment_increment != 1 || auto_increment_offset != 1)
1213
*start++= Q_AUTO_INCREMENT;
1214
int2store(start, auto_increment_increment);
1215
int2store(start+2, auto_increment_offset);
1220
*start++= Q_CHARSET_CODE;
1221
memcpy(start, charset, 6);
1226
/* In the TZ sys table, column Name is of length 64 so this should be ok */
1227
assert(time_zone_len <= MAX_TIME_ZONE_NAME_LENGTH);
1228
*start++= Q_TIME_ZONE_CODE;
1229
*start++= time_zone_len;
1230
memcpy(start, time_zone_str, time_zone_len);
1231
start+= time_zone_len;
1233
if (lc_time_names_number)
1235
assert(lc_time_names_number <= 0xFFFF);
1236
*start++= Q_LC_TIME_NAMES_CODE;
1237
int2store(start, lc_time_names_number);
1240
if (charset_database_number)
1242
assert(charset_database_number <= 0xFFFF);
1243
*start++= Q_CHARSET_DATABASE_CODE;
1244
int2store(start, charset_database_number);
1248
Here there could be code like
1249
if (command-line-option-which-says-"log_this_variable" && inited)
1251
*start++= Q_THIS_VARIABLE_CODE;
1252
int4store(start, this_variable);
1257
/* Store length of status variables */
1258
status_vars_len= (uint) (start-start_of_status);
1259
assert(status_vars_len <= MAX_SIZE_LOG_EVENT_STATUS);
1260
int2store(buf + Q_STATUS_VARS_LEN_OFFSET, status_vars_len);
1263
Calculate length of whole event
1264
The "1" below is the \0 in the db's length
1266
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
1268
return (write_header(file, event_length) ||
1269
my_b_safe_write(file, (unsigned char*) buf, QUERY_HEADER_LEN) ||
1270
write_post_header_for_derived(file) ||
1271
my_b_safe_write(file, (unsigned char*) start_of_status,
1272
(uint) (start-start_of_status)) ||
1273
my_b_safe_write(file, (db) ? (unsigned char*) db : (unsigned char*)"", db_len + 1) ||
1274
my_b_safe_write(file, (unsigned char*) query, q_len)) ? 1 : 0;
1278
The simplest constructor that could possibly work. This is used for
1279
creating static objects that have a special meaning and are invisible
1282
Query_log_event::Query_log_event()
1283
:Log_event(), data_buf(0)
1290
Query_log_event::Query_log_event()
1291
session_arg - thread handle
1292
query_arg - array of char representing the query
1293
query_length - size of the `query_arg' array
1294
using_trans - there is a modified transactional table
1295
suppress_use - suppress the generation of 'USE' statements
1296
killed_status_arg - an optional with default to Session::KILLED_NO_VALUE
1297
if the value is different from the default, the arg
1298
is set to the current session->killed value.
1299
A caller might need to masquerade session->killed with
1300
Session::NOT_KILLED.
1302
Creates an event for binlogging
1303
The value for local `killed_status' can be supplied by caller.
1305
Query_log_event::Query_log_event(Session* session_arg, const char* query_arg,
1306
ulong query_length, bool using_trans,
1308
Session::killed_state killed_status_arg)
1309
:Log_event(session_arg,
1310
(session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
1312
(suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1314
data_buf(0), query(query_arg), catalog(session_arg->catalog),
1315
db(session_arg->db), q_len((uint32_t) query_length),
1316
thread_id(session_arg->thread_id),
1317
/* save the original thread id; we already know the server id */
1318
slave_proxy_id(session_arg->variables.pseudo_thread_id),
1319
flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1321
auto_increment_increment(session_arg->variables.auto_increment_increment),
1322
auto_increment_offset(session_arg->variables.auto_increment_offset),
1323
lc_time_names_number(session_arg->variables.lc_time_names->number),
1324
charset_database_number(0)
1328
if (killed_status_arg == Session::KILLED_NO_VALUE)
1329
killed_status_arg= session_arg->killed;
1332
(killed_status_arg == Session::NOT_KILLED) ?
1333
(session_arg->is_error() ? session_arg->main_da.sql_errno() : 0) :
1334
(session_arg->killed_errno());
1337
exec_time = (ulong) (end_time - session_arg->start_time);
1339
@todo this means that if we have no catalog, then it is replicated
1340
as an existing catalog of length zero. is that safe? /sven
1342
catalog_len = (catalog) ? (uint32_t) strlen(catalog) : 0;
1343
/* status_vars_len is set just before writing the event */
1344
db_len = (db) ? (uint32_t) strlen(db) : 0;
1345
if (session_arg->variables.collation_database != session_arg->db_charset)
1346
charset_database_number= session_arg->variables.collation_database->number;
1349
If we don't use flags2 for anything else than options contained in
1350
session_arg->options, it would be more efficient to flags2=session_arg->options
1351
(OPTIONS_WRITTEN_TO_BIN_LOG would be used only at reading time).
1352
But it's likely that we don't want to use 32 bits for 3 bits; in the future
1353
we will probably want to reclaim the 29 bits. So we need the &.
1355
flags2= (uint32_t) (session_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
1356
assert(session_arg->variables.character_set_client->number < 256*256);
1357
assert(session_arg->variables.collation_connection->number < 256*256);
1358
assert(session_arg->variables.collation_server->number < 256*256);
1359
assert(session_arg->variables.character_set_client->mbminlen == 1);
1360
int2store(charset, session_arg->variables.character_set_client->number);
1361
int2store(charset+2, session_arg->variables.collation_connection->number);
1362
int2store(charset+4, session_arg->variables.collation_server->number);
1363
if (session_arg->time_zone_used)
1366
Note that our event becomes dependent on the Time_zone object
1367
representing the time zone. Fortunately such objects are never deleted
1368
or changed during mysqld's lifetime.
1370
time_zone_len= session_arg->variables.time_zone->get_name()->length();
1371
time_zone_str= session_arg->variables.time_zone->get_name()->ptr();
1378
/* 2 utility functions for the next method */
1381
Read a string with length from memory.
1383
This function reads the string-with-length stored at
1384
<code>src</code> and extract the length into <code>*len</code> and
1385
a pointer to the start of the string into <code>*dst</code>. The
1386
string can then be copied using <code>memcpy()</code> with the
1387
number of bytes given in <code>*len</code>.
1389
@param src Pointer to variable holding a pointer to the memory to
1390
read the string from.
1391
@param dst Pointer to variable holding a pointer where the actual
1392
string starts. Starting from this position, the string
1393
can be copied using @c memcpy().
1394
@param len Pointer to variable where the length will be stored.
1395
@param end One-past-the-end of the memory where the string is
1398
@return Zero if the entire string can be copied successfully,
1399
@c UINT_MAX if the length could not be read from memory
1400
(that is, if <code>*src >= end</code>), otherwise the
1401
number of bytes that are missing to read the full
1402
string, which happends <code>*dst + *len >= end</code>.
1405
get_str_len_and_pointer(const Log_event::Byte **src,
1408
const Log_event::Byte *end)
1411
return -1; // Will be UINT_MAX in two-complement arithmetics
1412
uint32_t length= **src;
1415
if (*src + length >= end)
1416
return *src + length - end + 1; // Number of bytes missing
1417
*dst= (char *)*src + 1; // Will be copied later
1424
static void copy_str_and_move(const char **src,
1425
Log_event::Byte **dst,
1428
memcpy(*dst, *src, len);
1429
*src= (const char *)*dst;
1436
Macro to check that there is enough space to read from memory.
1438
@param PTR Pointer to memory
1439
@param END End of memory
1440
@param CNT Number of bytes that should be read.
1442
#define CHECK_SPACE(PTR,END,CNT) \
1444
assert((PTR) + (CNT) <= (END)); \
1445
if ((PTR) + (CNT) > (END)) { \
1453
This is used by the SQL slave thread to prepare the event before execution.
1455
Query_log_event::Query_log_event(const char* buf, uint32_t event_len,
1456
const Format_description_log_event
1458
Log_event_type event_type)
1459
:Log_event(buf, description_event), data_buf(0), query(NULL),
1460
db(NULL), catalog_len(0), status_vars_len(0),
1461
flags2_inited(0), sql_mode_inited(0), charset_inited(0),
1462
auto_increment_increment(1), auto_increment_offset(1),
1463
time_zone_len(0), lc_time_names_number(0), charset_database_number(0)
1467
uint8_t common_header_len, post_header_len;
1468
Log_event::Byte *start;
1469
const Log_event::Byte *end;
1472
common_header_len= description_event->common_header_len;
1473
post_header_len= description_event->post_header_len[event_type-1];
1476
We test if the event's length is sensible, and if so we compute data_len.
1477
We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
1478
We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
1480
if (event_len < (uint)(common_header_len + post_header_len))
1482
data_len = event_len - (common_header_len + post_header_len);
1483
buf+= common_header_len;
1485
slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
1486
exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
1487
db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
1488
error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
1491
5.0 format starts here.
1492
Depending on the format, we may or not have affected/warnings etc
1493
The remnent post-header to be parsed has length:
1495
tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
1498
status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
1500
Check if status variable length is corrupt and will lead to very
1501
wrong data. We could be even more strict and require data_len to
1502
be even bigger, but this will suffice to catch most corruption
1503
errors that can lead to a crash.
1505
if (status_vars_len > cmin(data_len, (uint32_t)MAX_SIZE_LOG_EVENT_STATUS))
1510
data_len-= status_vars_len;
1514
We have parsed everything we know in the post header for QUERY_EVENT,
1515
the rest of post header is either comes from older version MySQL or
1516
dedicated to derived events (e.g. Execute_load_query...)
1519
/* variable-part: the status vars; only in MySQL 5.0 */
1521
start= (Log_event::Byte*) (buf+post_header_len);
1522
end= (const Log_event::Byte*) (start+status_vars_len);
1523
for (const Log_event::Byte* pos= start; pos < end;)
1527
CHECK_SPACE(pos, end, 4);
1529
flags2= uint4korr(pos);
1532
case Q_SQL_MODE_CODE:
1534
CHECK_SPACE(pos, end, 8);
1536
sql_mode= (ulong) uint8korr(pos); // QQ: Fix when sql_mode is uint64_t
1540
case Q_CATALOG_NZ_CODE:
1541
if (get_str_len_and_pointer(&pos, &catalog, &catalog_len, end))
1547
case Q_AUTO_INCREMENT:
1548
CHECK_SPACE(pos, end, 4);
1549
auto_increment_increment= uint2korr(pos);
1550
auto_increment_offset= uint2korr(pos+2);
1553
case Q_TIME_ZONE_CODE:
1555
if (get_str_len_and_pointer(&pos, &time_zone_str, &time_zone_len, end))
1562
case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
1563
CHECK_SPACE(pos, end, 1);
1564
if ((catalog_len= *pos))
1565
catalog= (char*) pos+1; // Will be copied later
1566
CHECK_SPACE(pos, end, catalog_len + 2);
1567
pos+= catalog_len+2; // leap over end 0
1568
catalog_nz= 0; // catalog has end 0 in event
1570
case Q_LC_TIME_NAMES_CODE:
1571
CHECK_SPACE(pos, end, 2);
1572
lc_time_names_number= uint2korr(pos);
1575
case Q_CHARSET_DATABASE_CODE:
1576
CHECK_SPACE(pos, end, 2);
1577
charset_database_number= uint2korr(pos);
1581
/* That's why you must write status vars in growing order of code */
1582
pos= (const unsigned char*) end; // Break loop
1586
if (!(start= data_buf = (Log_event::Byte*) my_malloc(catalog_len + 1 +
1591
if (catalog_len) // If catalog is given
1594
@todo we should clean up and do only copy_str_and_move; it
1595
works for both cases. Then we can remove the catalog_nz
1598
if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
1599
copy_str_and_move(&catalog, &start, catalog_len);
1602
memcpy(start, catalog, catalog_len+1); // copy end 0
1603
catalog= (const char *)start;
1604
start+= catalog_len+1;
1608
copy_str_and_move(&time_zone_str, &start, time_zone_len);
1611
if time_zone_len or catalog_len are 0, then time_zone and catalog
1612
are uninitialized at this point. shouldn't they point to the
1613
zero-length null-terminated strings we allocated space for in the
1614
my_alloc call above? /sven
1617
/* A 2nd variable part; this is common to all versions */
1618
memcpy(start, end, data_len); // Copy db and query
1619
start[data_len]= '\0'; // End query with \0 (For safetly)
1621
query= (char *)(start + db_len + 1);
1622
q_len= data_len - db_len -1;
1628
Query_log_event::do_apply_event()
1630
int Query_log_event::do_apply_event(Relay_log_info const *rli)
1632
return do_apply_event(rli, query, q_len);
1638
Compare the values of "affected rows" around here. Something
1641
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1643
sql_print_error("Slave: did not get the expected number of affected \
1644
rows running query from master - expected %d, got %d (this numbers \
1645
should have matched modulo 4294967296).", 0, ...);
1646
session->query_error = 1;
1649
We may also want an option to tell the slave to ignore "affected"
1650
mismatch. This mismatch could be implemented with a new ER_ code, and
1651
to ignore it you would use --slave-skip-errors...
1653
int Query_log_event::do_apply_event(Relay_log_info const *rli,
1654
const char *query_arg, uint32_t q_len_arg)
1657
int expected_error,actual_error= 0;
1658
Query_id &query_id= Query_id::get_query_id();
1660
Colleagues: please never free(session->catalog) in MySQL. This would
1661
lead to bugs as here session->catalog is a part of an alloced block,
1662
not an entire alloced block (see
1663
Query_log_event::do_apply_event()). Same for session->db. Thank
1666
session->catalog= catalog_len ? (char *) catalog : (char *)"";
1667
new_db.length= db_len;
1668
new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
1669
session->set_db(new_db.str, new_db.length); /* allocates a copy of 'db' */
1670
session->variables.auto_increment_increment= auto_increment_increment;
1671
session->variables.auto_increment_offset= auto_increment_offset;
1674
InnoDB internally stores the master log position it has executed so far,
1675
i.e. the position just after the COMMIT event.
1676
When InnoDB will want to store, the positions in rli won't have
1677
been updated yet, so group_master_log_* will point to old BEGIN
1678
and event_master_log* will point to the beginning of current COMMIT.
1679
But log_pos of the COMMIT Query event is what we want, i.e. the pos of the
1680
END of the current log event (COMMIT). We save it in rli so that InnoDB can
1683
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
1685
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
1686
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
1689
Note: We do not need to execute reset_one_shot_variables() if this
1691
Reason: The db stored in binlog events is the same for SET and for
1692
its companion query. If the SET is ignored because of
1693
db_ok(), the companion query will also be ignored, and if
1694
the companion query is ignored in the db_ok() test of
1695
::do_apply_event(), then the companion SET also have so
1696
we don't need to reset_one_shot_variables().
1698
if (rpl_filter->db_ok(session->db))
1700
session->set_time((time_t)when);
1701
session->query_length= q_len_arg;
1702
session->query= (char*)query_arg;
1703
session->query_id= query_id.next();
1704
session->variables.pseudo_thread_id= thread_id; // for temp tables
1706
if (ignored_error_code((expected_error= error_code)) ||
1707
!check_expected_error(session,rli,expected_error))
1711
all bits of session->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
1712
must take their value from flags2.
1714
session->options= flags2|(session->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
1717
String tmp(time_zone_str, time_zone_len, &my_charset_bin);
1718
if (!(session->variables.time_zone= my_tz_find(session, &tmp)))
1720
my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr());
1721
session->variables.time_zone= global_system_variables.time_zone;
1722
goto compare_errors;
1725
if (lc_time_names_number)
1727
if (!(session->variables.lc_time_names=
1728
my_locale_by_number(lc_time_names_number)))
1730
my_printf_error(ER_UNKNOWN_ERROR,
1731
"Unknown locale: '%d'", MYF(0), lc_time_names_number);
1732
session->variables.lc_time_names= &my_locale_en_US;
1733
goto compare_errors;
1737
session->variables.lc_time_names= &my_locale_en_US;
1738
if (charset_database_number)
1740
const CHARSET_INFO *cs;
1741
if (!(cs= get_charset(charset_database_number, MYF(0))))
1744
int10_to_str((int) charset_database_number, buf, -10);
1745
my_error(ER_UNKNOWN_COLLATION, MYF(0), buf);
1746
goto compare_errors;
1748
session->variables.collation_database= cs;
1751
session->variables.collation_database= session->db_charset;
1753
/* Execute the query (note that we bypass dispatch_command()) */
1754
const char* found_semicolon= NULL;
1755
mysql_parse(session, session->query, session->query_length, &found_semicolon);
1756
log_slow_statement(session);
1761
The query got a really bad error on the master (thread killed etc),
1762
which could be inconsistent. Parse it to test the table names: if the
1763
replicate-*-do|ignore-table rules say "this query must be ignored" then
1764
we exit gracefully; otherwise we warn about the bad error and tell DBA
1767
if (mysql_test_parse_for_slave(session, session->query, session->query_length))
1768
clear_all_errors(session, const_cast<Relay_log_info*>(rli)); /* Can ignore query */
1771
rli->report(ERROR_LEVEL, expected_error,
1772
_("Query partially completed on the master "
1773
"(error on master: %d) and was aborted. There is a "
1774
"chance that your master is inconsistent at this "
1775
"point. If you are sure that your master is ok, run "
1776
"this query manually on the slave and then restart the "
1777
"slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1; "
1778
"START SLAVE; . Query: '%s'"),
1779
expected_error, session->query);
1780
session->is_slave_error= 1;
1788
If we expected a non-zero error code, and we don't get the same error
1789
code, and none of them should be ignored.
1791
actual_error= session->is_error() ? session->main_da.sql_errno() : 0;
1792
if ((expected_error != actual_error) &&
1794
!ignored_error_code(actual_error) &&
1795
!ignored_error_code(expected_error))
1797
rli->report(ERROR_LEVEL, 0,
1798
_("Query caused differenxt errors on master and slave.\n"
1799
"Error on master: '%s' (%d), Error on slave: '%s' (%d).\n"
1800
"Default database: '%s'. Query: '%s'"),
1803
actual_error ? session->main_da.message() : _("no error"),
1805
print_slave_db_safe(db), query_arg);
1806
session->is_slave_error= 1;
1809
If we get the same error code as expected, or they should be ignored.
1811
else if (expected_error == actual_error ||
1812
ignored_error_code(actual_error))
1814
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
1815
session->killed= Session::NOT_KILLED;
1818
Other cases: mostly we expected no error and get one.
1820
else if (session->is_slave_error || session->is_fatal_error)
1822
rli->report(ERROR_LEVEL, actual_error,
1823
_("Error '%s' on query. Default database: '%s'. Query: '%s'"),
1824
(actual_error ? session->main_da.message() :
1825
_("unexpected success or fatal error")),
1826
print_slave_db_safe(session->db), query_arg);
1827
session->is_slave_error= 1;
1831
TODO: compare the values of "affected rows" around here. Something
1833
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1835
sql_print_error("Slave: did not get the expected number of affected \
1836
rows running query from master - expected %d, got %d (this numbers \
1837
should have matched modulo 4294967296).", 0, ...);
1838
session->is_slave_error = 1;
1840
We may also want an option to tell the slave to ignore "affected"
1841
mismatch. This mismatch could be implemented with a new ER_ code, and
1842
to ignore it you would use --slave-skip-errors...
1844
To do the comparison we need to know the value of "affected" which the
1845
above mysql_parse() computed. And we need to know the value of
1846
"affected" in the master's binlog. Both will be implemented later. The
1847
important thing is that we now have the format ready to log the values
1848
of "affected" in the binlog. So we can release 5.0.0 before effectively
1849
logging "affected" and effectively comparing it.
1851
} /* End of if (db_ok(... */
1854
pthread_mutex_lock(&LOCK_thread_count);
1856
Probably we have set session->query, session->db, session->catalog to point to places
1857
in the data_buf of this event. Now the event is going to be deleted
1858
probably, so data_buf will be freed, so the session->... listed above will be
1859
pointers to freed memory.
1860
So we must set them to 0, so that those bad pointers values are not later
1861
used. Note that "cleanup" queries like automatic DROP TEMPORARY Table
1862
don't suffer from these assignments to 0 as DROP TEMPORARY
1863
Table uses the db.table syntax.
1865
session->catalog= 0;
1866
session->set_db(NULL, 0); /* will free the current database */
1867
session->query= 0; // just to be sure
1868
session->query_length= 0;
1869
pthread_mutex_unlock(&LOCK_thread_count);
1870
close_thread_tables(session);
1872
As a disk space optimization, future masters will not log an event for
1873
LAST_INSERT_ID() if that function returned 0 (and thus they will be able
1874
to replace the Session::stmt_depends_on_first_successful_insert_id_in_prev_stmt
1875
variable by (Session->first_successful_insert_id_in_prev_stmt > 0) ; with the
1876
resetting below we are ready to support that.
1878
session->first_successful_insert_id_in_prev_stmt_for_binlog= 0;
1879
session->first_successful_insert_id_in_prev_stmt= 0;
1880
session->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
1881
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
1882
return session->is_slave_error;
1885
int Query_log_event::do_update_pos(Relay_log_info *rli)
1888
Note that we will not increment group* positions if we are just
1889
after a SET ONE_SHOT, because SET ONE_SHOT should not be separated
1890
from its following updating query.
1892
if (session->one_shot_set)
1894
rli->inc_event_relay_log_pos();
1898
return Log_event::do_update_pos(rli);
1902
Log_event::enum_skip_reason
1903
Query_log_event::do_shall_skip(Relay_log_info *rli)
1905
assert(query && q_len > 0);
1907
if (rli->slave_skip_counter > 0)
1909
if (strcmp("BEGIN", query) == 0)
1911
session->options|= OPTION_BEGIN;
1912
return(Log_event::continue_group(rli));
1915
if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
1917
session->options&= ~OPTION_BEGIN;
1918
return(Log_event::EVENT_SKIP_COUNT);
1921
return(Log_event::do_shall_skip(rli));
1925
/**************************************************************************
1926
Start_log_event_v3 methods
1927
**************************************************************************/
1929
Start_log_event_v3::Start_log_event_v3()
1930
:Log_event(), created(0), binlog_version(BINLOG_VERSION),
1931
artificial_event(0), dont_set_created(0)
1933
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
1937
Start_log_event_v3::pack_info()
1940
void Start_log_event_v3::pack_info(Protocol *protocol)
1942
char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
1943
pos= my_stpcpy(buf, "Server ver: ");
1944
pos= my_stpcpy(pos, server_version);
1945
pos= my_stpcpy(pos, ", Binlog ver: ");
1946
pos= int10_to_str(binlog_version, pos, 10);
1947
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
1952
Start_log_event_v3::Start_log_event_v3()
1955
Start_log_event_v3::Start_log_event_v3(const char* buf,
1956
const Format_description_log_event
1958
:Log_event(buf, description_event)
1960
buf+= description_event->common_header_len;
1961
binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET);
1962
memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
1964
// prevent overrun if log is corrupted on disk
1965
server_version[ST_SERVER_VER_LEN-1]= 0;
1966
created= uint4korr(buf+ST_CREATED_OFFSET);
1967
/* We use log_pos to mark if this was an artificial event or not */
1968
artificial_event= (log_pos == 0);
1969
dont_set_created= 1;
1974
Start_log_event_v3::write()
1977
bool Start_log_event_v3::write(IO_CACHE* file)
1979
char buff[START_V3_HEADER_LEN];
1980
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
1981
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
1982
if (!dont_set_created)
1983
created= when= get_time();
1984
int4store(buff + ST_CREATED_OFFSET,created);
1985
return (write_header(file, sizeof(buff)) ||
1986
my_b_safe_write(file, (unsigned char*) buff, sizeof(buff)));
1991
Start_log_event_v3::do_apply_event() .
1995
- To handle the case where the master died without having time to write
1996
DROP TEMPORARY Table, DO RELEASE_LOCK (prepared statements' deletion is
1997
TODO), we clean up all temporary tables that we got, if we are sure we
2001
- Remove all active user locks.
2002
Guilhem 2003-06: this is true but not urgent: the worst it can cause is
2003
the use of a bit of memory for a user lock which will not be used
2004
anymore. If the user lock is later used, the old one will be released. In
2005
other words, no deadlock problem.
2008
int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
2010
switch (binlog_version)
2015
This can either be 4.x (then a Start_log_event_v3 is only at master
2016
startup so we are sure the master has restarted and cleared his temp
2017
tables; the event always has 'created'>0) or 5.0 (then we have to test
2022
close_temporary_tables(session);
2023
cleanup_load_tmpdir();
2028
Now the older formats; in that case load_tmpdir is cleaned up by the I/O
2032
if (strncmp(rli->relay_log.description_event_for_exec->server_version,
2033
"3.23.57",7) >= 0 && created)
2036
Can distinguish, based on the value of 'created': this event was
2037
generated at master startup.
2039
close_temporary_tables(session);
2042
Otherwise, can't distinguish a Start_log_event generated at
2043
master startup and one generated by master FLUSH LOGS, so cannot
2044
be sure temp tables have to be dropped. So do nothing.
2048
/* this case is impossible */
2054
/***************************************************************************
2055
Format_description_log_event methods
2056
****************************************************************************/
2059
Format_description_log_event 1st ctor.
2061
Ctor. Can be used to create the event to write to the binary log (when the
2062
server starts or when FLUSH LOGS), or to create artificial events to parse
2063
binlogs from MySQL 3.23 or 4.x.
2064
When in a client, only the 2nd use is possible.
2066
@param binlog_version the binlog version for which we want to build
2067
an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x
2068
x>=2 and 4.1) or 4 (MySQL 5.0). Note that the
2069
old 4.0 (binlog version 2) is not supported;
2070
it should not be used for replication with
2074
Format_description_log_event::
2075
Format_description_log_event(uint8_t binlog_ver, const char* server_ver)
2076
:Start_log_event_v3(), event_type_permutation(0)
2078
binlog_version= binlog_ver;
2079
switch (binlog_ver) {
2080
case 4: /* MySQL 5.0 */
2081
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
2082
common_header_len= LOG_EVENT_HEADER_LEN;
2083
number_of_event_types= LOG_EVENT_TYPES;
2084
/* we'll catch my_malloc() error in is_valid() */
2085
post_header_len=(uint8_t*) my_malloc(number_of_event_types*sizeof(uint8_t),
2088
This long list of assignments is not beautiful, but I see no way to
2089
make it nicer, as the right members are #defines, not array members, so
2090
it's impossible to write a loop.
2092
if (post_header_len)
2094
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2095
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN;
2096
post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
2097
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2098
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2099
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2100
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2101
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2102
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
2103
post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
2104
post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN;
2105
post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2106
post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2107
post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2108
post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1];
2109
post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
2110
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
2111
post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
2116
case 3: /* 4.0.x x>=2 */
2118
We build an artificial (i.e. not sent by the master) event, which
2119
describes what those old master versions send.
2122
my_stpcpy(server_version, server_ver ? server_ver : "3.23");
2124
my_stpcpy(server_version, server_ver ? server_ver : "4.0");
2125
common_header_len= binlog_ver==1 ? OLD_HEADER_LEN :
2126
LOG_EVENT_MINIMAL_HEADER_LEN;
2128
The first new event in binlog version 4 is Format_desc. So any event type
2129
after that does not exist in older versions. We use the events known by
2130
version 3, even if version 1 had only a subset of them (this is not a
2131
problem: it uses a few bytes for nothing but unifies code; it does not
2132
make the slave detect less corruptions).
2134
number_of_event_types= FORMAT_DESCRIPTION_EVENT - 1;
2135
post_header_len=(uint8_t*) my_malloc(number_of_event_types*sizeof(uint8_t),
2137
if (post_header_len)
2139
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2140
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_MINIMAL_LEN;
2141
post_header_len[STOP_EVENT-1]= 0;
2142
post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN;
2143
post_header_len[INTVAR_EVENT-1]= 0;
2144
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2145
post_header_len[SLAVE_EVENT-1]= 0;
2146
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2147
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2148
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2149
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2150
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
2151
post_header_len[RAND_EVENT-1]= 0;
2152
post_header_len[USER_VAR_EVENT-1]= 0;
2155
default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
2156
post_header_len= 0; /* will make is_valid() fail */
2159
calc_server_version_split();
2164
The problem with this constructor is that the fixed header may have a
2165
length different from this version, but we don't know this length as we
2166
have not read the Format_description_log_event which says it, yet. This
2167
length is in the post-header of the event, but we don't know where the
2170
So this type of event HAS to:
2171
- either have the header's length at the beginning (in the header, at a
2172
fixed position which will never be changed), not in the post-header. That
2173
would make the header be "shifted" compared to other events.
2174
- or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future
2175
versions, so that we know for sure.
2177
I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because
2178
it is sent before Format_description_log_event).
2181
Format_description_log_event::
2182
Format_description_log_event(const char* buf,
2185
Format_description_log_event*
2187
:Start_log_event_v3(buf, description_event), event_type_permutation(0)
2189
buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
2190
if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
2191
return; /* sanity check */
2192
number_of_event_types=
2193
event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1);
2194
/* If alloc fails, we'll detect it in is_valid() */
2195
post_header_len= (uint8_t*) my_memdup((unsigned char*)buf+ST_COMMON_HEADER_LEN_OFFSET+1,
2196
number_of_event_types*
2197
sizeof(*post_header_len), MYF(0));
2198
calc_server_version_split();
2201
In some previous versions, the events were given other event type
2202
id numbers than in the present version. When replicating from such
2203
a version, we therefore set up an array that maps those id numbers
2204
to the id numbers of the present server.
2206
If post_header_len is null, it means malloc failed, and is_valid
2207
will fail, so there is no need to do anything.
2209
The trees in which events have wrong id's are:
2211
mysql-5.1-wl1012.old mysql-5.1-wl2325-5.0-drop6p13-alpha
2212
mysql-5.1-wl2325-5.0-drop6 mysql-5.1-wl2325-5.0
2213
mysql-5.1-wl2325-no-dd
2215
(this was found by grepping for two lines in sequence where the
2216
first matches "FORMAT_DESCRIPTION_EVENT," and the second matches
2217
"TABLE_MAP_EVENT," in log_event.h in all trees)
2219
In these trees, the following server_versions existed since
2220
TABLE_MAP_EVENT was introduced:
2222
5.1.1-a_drop5p3 5.1.1-a_drop5p4 5.1.1-alpha
2223
5.1.2-a_drop5p10 5.1.2-a_drop5p11 5.1.2-a_drop5p12
2224
5.1.2-a_drop5p13 5.1.2-a_drop5p14 5.1.2-a_drop5p15
2225
5.1.2-a_drop5p16 5.1.2-a_drop5p16b 5.1.2-a_drop5p16c
2226
5.1.2-a_drop5p17 5.1.2-a_drop5p4 5.1.2-a_drop5p5
2227
5.1.2-a_drop5p6 5.1.2-a_drop5p7 5.1.2-a_drop5p8
2228
5.1.2-a_drop5p9 5.1.3-a_drop5p17 5.1.3-a_drop5p17b
2229
5.1.3-a_drop5p17c 5.1.4-a_drop5p18 5.1.4-a_drop5p19
2230
5.1.4-a_drop5p20 5.1.4-a_drop6p0 5.1.4-a_drop6p1
2231
5.1.4-a_drop6p2 5.1.5-a_drop5p20 5.2.0-a_drop6p3
2232
5.2.0-a_drop6p4 5.2.0-a_drop6p5 5.2.0-a_drop6p6
2233
5.2.1-a_drop6p10 5.2.1-a_drop6p11 5.2.1-a_drop6p12
2234
5.2.1-a_drop6p6 5.2.1-a_drop6p7 5.2.1-a_drop6p8
2235
5.2.2-a_drop6p13 5.2.2-a_drop6p13-alpha 5.2.2-a_drop6p13b
2238
(this was found by grepping for "mysql," in all historical
2239
versions of configure.in in the trees listed above).
2241
There are 5.1.1-alpha versions that use the new event id's, so we
2242
do not test that version string. So replication from 5.1.1-alpha
2243
with the other event id's to a new version does not work.
2244
Moreover, we can safely ignore the part after drop[56]. This
2245
allows us to simplify the big list above to the following regexes:
2247
5\.1\.[1-5]-a_drop5.*
2249
5\.2\.[0-2]-a_drop6.*
2251
This is what we test for in the 'if' below.
2253
if (post_header_len &&
2254
server_version[0] == '5' && server_version[1] == '.' &&
2255
server_version[3] == '.' &&
2256
strncmp(server_version + 5, "-a_drop", 7) == 0 &&
2257
((server_version[2] == '1' &&
2258
server_version[4] >= '1' && server_version[4] <= '5' &&
2259
server_version[12] == '5') ||
2260
(server_version[2] == '1' &&
2261
server_version[4] == '4' &&
2262
server_version[12] == '6') ||
2263
(server_version[2] == '2' &&
2264
server_version[4] >= '0' && server_version[4] <= '2' &&
2265
server_version[12] == '6')))
2267
if (number_of_event_types != 22)
2269
/* this makes is_valid() return false. */
2270
free(post_header_len);
2271
post_header_len= NULL;
2274
static const uint8_t perm[23]=
2276
UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
2277
INTVAR_EVENT, LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
2278
APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
2280
RAND_EVENT, USER_VAR_EVENT,
2281
FORMAT_DESCRIPTION_EVENT,
2283
PRE_GA_WRITE_ROWS_EVENT,
2284
PRE_GA_UPDATE_ROWS_EVENT,
2285
PRE_GA_DELETE_ROWS_EVENT,
2287
BEGIN_LOAD_QUERY_EVENT,
2288
EXECUTE_LOAD_QUERY_EVENT,
2290
event_type_permutation= perm;
2292
Since we use (permuted) event id's to index the post_header_len
2293
array, we need to permute the post_header_len array too.
2295
uint8_t post_header_len_temp[23];
2296
for (int i= 1; i < 23; i++)
2297
post_header_len_temp[perm[i] - 1]= post_header_len[i - 1];
2298
for (int i= 0; i < 22; i++)
2299
post_header_len[i] = post_header_len_temp[i];
2304
bool Format_description_log_event::write(IO_CACHE* file)
2307
We don't call Start_log_event_v3::write() because this would make 2
2310
unsigned char buff[FORMAT_DESCRIPTION_HEADER_LEN];
2311
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
2312
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
2313
if (!dont_set_created)
2314
created= when= get_time();
2315
int4store(buff + ST_CREATED_OFFSET,created);
2316
buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
2317
memcpy(buff+ST_COMMON_HEADER_LEN_OFFSET+1, post_header_len,
2319
return (write_header(file, sizeof(buff)) ||
2320
my_b_safe_write(file, buff, sizeof(buff)));
2324
int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
2327
As a transaction NEVER spans on 2 or more binlogs:
2328
if we have an active transaction at this point, the master died
2329
while writing the transaction to the binary log, i.e. while
2330
flushing the binlog cache to the binlog. XA guarantees that master has
2331
rolled back. So we roll back.
2332
Note: this event could be sent by the master to inform us of the
2333
format of its binlog; in other words maybe it is not at its
2334
original place when it comes to us; we'll know this by checking
2335
log_pos ("artificial" events have log_pos == 0).
2337
if (!artificial_event && created && session->transaction.all.ha_list)
2339
/* This is not an error (XA is safe), just an information */
2340
rli->report(INFORMATION_LEVEL, 0,
2341
_("Rolling back unfinished transaction (no COMMIT "
2342
"or ROLLBACK in relay log). A probable cause is that "
2343
"the master died while writing the transaction to "
2344
"its binary log, thus rolled back too."));
2345
const_cast<Relay_log_info*>(rli)->cleanup_context(session, 1);
2348
If this event comes from ourselves, there is no cleaning task to
2349
perform, we don't call Start_log_event_v3::do_apply_event()
2350
(this was just to update the log's description event).
2352
if (server_id != ::server_id)
2355
If the event was not requested by the slave i.e. the master sent
2356
it while the slave asked for a position >4, the event will make
2357
rli->group_master_log_pos advance. Say that the slave asked for
2358
position 1000, and the Format_desc event's end is 96. Then in
2359
the beginning of replication rli->group_master_log_pos will be
2360
0, then 96, then jump to first really asked event (which is
2361
>96). So this is ok.
2363
return(Start_log_event_v3::do_apply_event(rli));
2368
int Format_description_log_event::do_update_pos(Relay_log_info *rli)
2370
/* save the information describing this binlog */
2371
delete rli->relay_log.description_event_for_exec;
2372
rli->relay_log.description_event_for_exec= this;
2374
if (server_id == ::server_id)
2377
We only increase the relay log position if we are skipping
2378
events and do not touch any group_* variables, nor flush the
2379
relay log info. If there is a crash, we will have to re-skip
2380
the events again, but that is a minor issue.
2382
If we do not skip stepping the group log position (and the
2383
server id was changed when restarting the server), it might well
2384
be that we start executing at a position that is invalid, e.g.,
2385
at a Rows_log_event or a Query_log_event preceeded by a
2386
Intvar_log_event instead of starting at a Table_map_log_event or
2387
the Intvar_log_event respectively.
2389
rli->inc_event_relay_log_pos();
2394
return Log_event::do_update_pos(rli);
2398
Log_event::enum_skip_reason
2399
Format_description_log_event::do_shall_skip(Relay_log_info *)
2401
return Log_event::EVENT_SKIP_NOT;
2406
Splits the event's 'server_version' string into three numeric pieces stored
2407
into 'server_version_split':
2408
X.Y.Zabc (X,Y,Z numbers, a not a digit) -> {X,Y,Z}
2411
'server_version_split' is then used for lookups to find if the server which
2412
created this event has some known bug.
2414
void Format_description_log_event::calc_server_version_split()
2416
char *p= server_version, *r;
2418
for (uint32_t i= 0; i<=2; i++)
2420
number= strtoul(p, &r, 10);
2421
server_version_split[i]= (unsigned char)number;
2422
assert(number < 256); // fit in unsigned char
2424
assert(!((i == 0) && (*r != '.'))); // should be true in practice
2426
p++; // skip the dot
2431
/**************************************************************************
2432
Load_log_event methods
2433
General note about Load_log_event: the binlogging of LOAD DATA INFILE is
2434
going to be changed in 5.0 (or maybe in 5.1; not decided yet).
2435
However, the 5.0 slave could still have to read such events (from a 4.x
2436
master), convert them (which just means maybe expand the header, when 5.0
2437
servers have a UID in events) (remember that whatever is after the header
2438
will be like in 4.x, as this event's format is not modified in 5.0 as we
2439
will use new types of events to log the new LOAD DATA INFILE features).
2440
To be able to read/convert, we just need to not assume that the common
2441
header is of length LOG_EVENT_HEADER_LEN (we must use the description
2443
Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE
2444
between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the
2445
positions displayed in SHOW SLAVE STATUS then are fine too).
2446
**************************************************************************/
2449
Load_log_event::pack_info()
2452
uint32_t Load_log_event::get_query_buffer_length()
2455
5 + db_len + 3 + // "use DB; "
2456
18 + fname_len + 2 + // "LOAD DATA INFILE 'file''"
2458
9 + // " REPLACE or IGNORE "
2459
13 + table_name_len*2 + // "INTO Table `table`"
2460
21 + sql_ex.field_term_len*4 + 2 + // " FIELDS TERMINATED BY 'str'"
2461
23 + sql_ex.enclosed_len*4 + 2 + // " OPTIONALLY ENCLOSED BY 'str'"
2462
12 + sql_ex.escaped_len*4 + 2 + // " ESCAPED BY 'str'"
2463
21 + sql_ex.line_term_len*4 + 2 + // " LINES TERMINATED BY 'str'"
2464
19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'"
2465
15 + 22 + // " IGNORE xxx LINES"
2466
3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)"
2470
void Load_log_event::print_query(bool need_db, char *buf,
2471
char **end, char **fn_start, char **fn_end)
2475
if (need_db && db && db_len)
2477
pos= my_stpcpy(pos, "use `");
2478
memcpy(pos, db, db_len);
2479
pos= my_stpcpy(pos+db_len, "`; ");
2482
pos= my_stpcpy(pos, "LOAD DATA ");
2487
if (check_fname_outside_temp_buf())
2488
pos= my_stpcpy(pos, "LOCAL ");
2489
pos= my_stpcpy(pos, "INFILE '");
2490
memcpy(pos, fname, fname_len);
2491
pos= my_stpcpy(pos+fname_len, "' ");
2493
if (sql_ex.opt_flags & REPLACE_FLAG)
2494
pos= my_stpcpy(pos, " REPLACE ");
2495
else if (sql_ex.opt_flags & IGNORE_FLAG)
2496
pos= my_stpcpy(pos, " IGNORE ");
2498
pos= my_stpcpy(pos ,"INTO");
2503
pos= my_stpcpy(pos ," Table `");
2504
memcpy(pos, table_name, table_name_len);
2505
pos+= table_name_len;
2507
/* We have to create all optinal fields as the default is not empty */
2508
pos= my_stpcpy(pos, "` FIELDS TERMINATED BY ");
2509
pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len);
2510
if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
2511
pos= my_stpcpy(pos, " OPTIONALLY ");
2512
pos= my_stpcpy(pos, " ENCLOSED BY ");
2513
pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len);
2515
pos= my_stpcpy(pos, " ESCAPED BY ");
2516
pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
2518
pos= my_stpcpy(pos, " LINES TERMINATED BY ");
2519
pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
2520
if (sql_ex.line_start_len)
2522
pos= my_stpcpy(pos, " STARTING BY ");
2523
pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
2526
if ((long) skip_lines > 0)
2528
pos= my_stpcpy(pos, " IGNORE ");
2529
pos= int64_t10_to_str((int64_t) skip_lines, pos, 10);
2530
pos= my_stpcpy(pos," LINES ");
2536
const char *field= fields;
2537
pos= my_stpcpy(pos, " (");
2538
for (i = 0; i < num_fields; i++)
2545
memcpy(pos, field, field_lens[i]);
2546
pos+= field_lens[i];
2547
field+= field_lens[i] + 1;
2556
void Load_log_event::pack_info(Protocol *protocol)
2560
if (!(buf= (char*) my_malloc(get_query_buffer_length(), MYF(MY_WME))))
2562
print_query(true, buf, &end, 0, 0);
2563
protocol->store(buf, end-buf, &my_charset_bin);
2569
Load_log_event::write_data_header()
2572
bool Load_log_event::write_data_header(IO_CACHE* file)
2574
char buf[LOAD_HEADER_LEN];
2575
int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
2576
int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
2577
int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
2578
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
2579
buf[L_DB_LEN_OFFSET] = (char)db_len;
2580
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
2581
return my_b_safe_write(file, (unsigned char*)buf, LOAD_HEADER_LEN) != 0;
2586
Load_log_event::write_data_body()
2589
bool Load_log_event::write_data_body(IO_CACHE* file)
2591
if (sql_ex.write_data(file))
2593
if (num_fields && fields && field_lens)
2595
if (my_b_safe_write(file, (unsigned char*)field_lens, num_fields) ||
2596
my_b_safe_write(file, (unsigned char*)fields, field_block_len))
2599
return (my_b_safe_write(file, (unsigned char*)table_name, table_name_len + 1) ||
2600
my_b_safe_write(file, (unsigned char*)db, db_len + 1) ||
2601
my_b_safe_write(file, (unsigned char*)fname, fname_len));
2606
Load_log_event::Load_log_event()
2609
Load_log_event::Load_log_event(Session *session_arg, sql_exchange *ex,
2610
const char *db_arg, const char *table_name_arg,
2611
List<Item> &fields_arg,
2612
enum enum_duplicates handle_dup,
2613
bool ignore, bool using_trans)
2614
:Log_event(session_arg,
2615
session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
2617
thread_id(session_arg->thread_id),
2618
slave_proxy_id(session_arg->variables.pseudo_thread_id),
2619
num_fields(0),fields(0),
2620
field_lens(0),field_block_len(0),
2621
table_name(table_name_arg ? table_name_arg : ""),
2622
db(db_arg), fname(ex->file_name), local_fname(false)
2626
exec_time = (ulong) (end_time - session_arg->start_time);
2627
/* db can never be a zero pointer in 4.0 */
2628
db_len = (uint32_t) strlen(db);
2629
table_name_len = (uint32_t) strlen(table_name);
2630
fname_len = (fname) ? (uint) strlen(fname) : 0;
2631
sql_ex.field_term = (char*) ex->field_term->ptr();
2632
sql_ex.field_term_len = (uint8_t) ex->field_term->length();
2633
sql_ex.enclosed = (char*) ex->enclosed->ptr();
2634
sql_ex.enclosed_len = (uint8_t) ex->enclosed->length();
2635
sql_ex.line_term = (char*) ex->line_term->ptr();
2636
sql_ex.line_term_len = (uint8_t) ex->line_term->length();
2637
sql_ex.line_start = (char*) ex->line_start->ptr();
2638
sql_ex.line_start_len = (uint8_t) ex->line_start->length();
2639
sql_ex.escaped = (char*) ex->escaped->ptr();
2640
sql_ex.escaped_len = (uint8_t) ex->escaped->length();
2641
sql_ex.opt_flags = 0;
2642
sql_ex.cached_new_format = -1;
2645
sql_ex.opt_flags|= DUMPFILE_FLAG;
2646
if (ex->opt_enclosed)
2647
sql_ex.opt_flags|= OPT_ENCLOSED_FLAG;
2649
sql_ex.empty_flags= 0;
2651
switch (handle_dup) {
2653
sql_ex.opt_flags|= REPLACE_FLAG;
2655
case DUP_UPDATE: // Impossible here
2660
sql_ex.opt_flags|= IGNORE_FLAG;
2662
if (!ex->field_term->length())
2663
sql_ex.empty_flags |= FIELD_TERM_EMPTY;
2664
if (!ex->enclosed->length())
2665
sql_ex.empty_flags |= ENCLOSED_EMPTY;
2666
if (!ex->line_term->length())
2667
sql_ex.empty_flags |= LINE_TERM_EMPTY;
2668
if (!ex->line_start->length())
2669
sql_ex.empty_flags |= LINE_START_EMPTY;
2670
if (!ex->escaped->length())
2671
sql_ex.empty_flags |= ESCAPED_EMPTY;
2673
skip_lines = ex->skip_lines;
2675
List_iterator<Item> li(fields_arg);
2676
field_lens_buf.length(0);
2677
fields_buf.length(0);
2679
while ((item = li++))
2682
unsigned char len = (unsigned char) strlen(item->name);
2683
field_block_len += len + 1;
2684
fields_buf.append(item->name, len + 1);
2685
field_lens_buf.append((char*)&len, 1);
2688
field_lens = (const unsigned char*)field_lens_buf.ptr();
2689
fields = fields_buf.ptr();
2695
The caller must do buf[event_len] = 0 before he starts using the
2698
Load_log_event::Load_log_event(const char *buf, uint32_t event_len,
2699
const Format_description_log_event *description_event)
2700
:Log_event(buf, description_event), num_fields(0), fields(0),
2701
field_lens(0),field_block_len(0),
2702
table_name(0), db(0), fname(0), local_fname(false)
2705
I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0,
2706
4.0->5.0 and 5.0->5.0 and it works.
2709
copy_log_event(buf, event_len,
2710
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
2712
description_event->common_header_len :
2713
LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
2715
/* otherwise it's a derived class, will call copy_log_event() itself */
2721
Load_log_event::copy_log_event()
2724
int Load_log_event::copy_log_event(const char *buf, ulong event_len,
2726
const Format_description_log_event *description_event)
2729
char* buf_end = (char*)buf + event_len;
2730
/* this is the beginning of the post-header */
2731
const char* data_head = buf + description_event->common_header_len;
2732
slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
2733
exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
2734
skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
2735
table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
2736
db_len = (uint)data_head[L_DB_LEN_OFFSET];
2737
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
2739
if ((int) event_len < body_offset)
2742
Sql_ex.init() on success returns the pointer to the first byte after
2743
the sql_ex structure, which is the start of field lengths array.
2745
if (!(field_lens= (unsigned char*)sql_ex.init((char*)buf + body_offset,
2747
buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
2750
data_len = event_len - body_offset;
2751
if (num_fields > data_len) // simple sanity check against corruption
2753
for (uint32_t i = 0; i < num_fields; i++)
2754
field_block_len += (uint)field_lens[i] + 1;
2756
fields = (char*)field_lens + num_fields;
2757
table_name = fields + field_block_len;
2758
db = table_name + table_name_len + 1;
2759
fname = db + db_len + 1;
2760
fname_len = strlen(fname);
2761
// null termination is accomplished by the caller doing buf[event_len]=0
2768
Load_log_event::set_fields()
2771
This function can not use the member variable
2772
for the database, since LOAD DATA INFILE on the slave
2773
can be for a different database than the current one.
2774
This is the reason for the affected_db argument to this method.
2777
void Load_log_event::set_fields(const char* affected_db,
2778
List<Item> &field_list,
2779
Name_resolution_context *context)
2782
const char* field = fields;
2783
for (i= 0; i < num_fields; i++)
2785
field_list.push_back(new Item_field(context,
2786
affected_db, table_name, field));
2787
field+= field_lens[i] + 1;
2793
Does the data loading job when executing a LOAD DATA on the slave.
2797
@param use_rli_only_for_errors If set to 1, rli is provided to
2798
Load_log_event::exec_event only for this
2799
function to have RPL_LOG_NAME and
2800
rli->last_slave_error, both being used by
2801
error reports. rli's position advancing
2802
is skipped (done by the caller which is
2803
Execute_load_log_event::exec_event).
2804
If set to 0, rli is provided for full use,
2805
i.e. for error reports and position
2809
fix this; this can be done by testing rules in
2810
Create_file_log_event::exec_event() and then discarding Append_block and
2813
this is a bug - this needs to be moved to the I/O thread
2821
int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
2822
bool use_rli_only_for_errors)
2825
Query_id &query_id= Query_id::get_query_id();
2826
new_db.length= db_len;
2827
new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
2828
session->set_db(new_db.str, new_db.length);
2829
assert(session->query == 0);
2830
session->query_length= 0; // Should not be needed
2831
session->is_slave_error= 0;
2832
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
2834
/* see Query_log_event::do_apply_event() and BUG#13360 */
2835
assert(!rli->m_table_map.count());
2837
Usually lex_start() is called by mysql_parse(), but we need it here
2838
as the present method does not call mysql_parse().
2841
mysql_reset_session_for_next_command(session);
2843
if (!use_rli_only_for_errors)
2846
Saved for InnoDB, see comment in
2847
Query_log_event::do_apply_event()
2849
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
2853
We test replicate_*_db rules. Note that we have already prepared
2854
the file to load, even if we are going to ignore and delete it
2855
now. So it is possible that we did a lot of disk writes for
2856
nothing. In other words, a big LOAD DATA INFILE on the master will
2857
still consume a lot of space on the slave (space in the relay log
2858
+ space of temp files: twice the space of the file to load...)
2859
even if it will finally be ignored. TODO: fix this; this can be
2860
done by testing rules in Create_file_log_event::do_apply_event()
2861
and then discarding Append_block and al. Another way is do the
2862
filtering in the I/O thread (more efficient: no disk writes at
2866
Note: We do not need to execute reset_one_shot_variables() if this
2868
Reason: The db stored in binlog events is the same for SET and for
2869
its companion query. If the SET is ignored because of
2870
db_ok(), the companion query will also be ignored, and if
2871
the companion query is ignored in the db_ok() test of
2872
::do_apply_event(), then the companion SET also have so
2873
we don't need to reset_one_shot_variables().
2875
if (rpl_filter->db_ok(session->db))
2877
session->set_time((time_t)when);
2878
session->query_id = query_id.next();
2880
Initing session->row_count is not necessary in theory as this variable has no
2881
influence in the case of the slave SQL thread (it is used to generate a
2882
"data truncated" warning but which is absorbed and never gets to the
2883
error log); still we init it to avoid a Valgrind message.
2885
drizzle_reset_errors(session, 0);
2888
memset(&tables, 0, sizeof(tables));
2889
tables.db= session->strmake(session->db, session->db_length);
2890
tables.alias = tables.table_name = (char*) table_name;
2891
tables.lock_type = TL_WRITE;
2894
// the table will be opened in mysql_load
2895
if (rpl_filter->is_on() && !rpl_filter->tables_ok(session->db, &tables))
2897
// TODO: this is a bug - this needs to be moved to the I/O thread
2899
skip_load_data_infile(net);
2905
enum enum_duplicates handle_dup;
2907
char *load_data_query;
2910
Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST
2911
and written to slave's binlog if binlogging is on.
2913
if (!(load_data_query= (char *)session->alloc(get_query_buffer_length() + 1)))
2916
This will set session->fatal_error in case of OOM. So we surely will notice
2917
that something is wrong.
2922
print_query(false, load_data_query, &end, (char **)&session->lex->fname_start,
2923
(char **)&session->lex->fname_end);
2925
session->query_length= end - load_data_query;
2926
session->query= load_data_query;
2928
if (sql_ex.opt_flags & REPLACE_FLAG)
2930
handle_dup= DUP_REPLACE;
2932
else if (sql_ex.opt_flags & IGNORE_FLAG)
2935
handle_dup= DUP_ERROR;
2940
When replication is running fine, if it was DUP_ERROR on the
2941
master then we could choose IGNORE here, because if DUP_ERROR
2942
suceeded on master, and data is identical on the master and slave,
2943
then there should be no uniqueness errors on slave, so IGNORE is
2944
the same as DUP_ERROR. But in the unlikely case of uniqueness errors
2945
(because the data on the master and slave happen to be different
2946
(user error or bug), we want LOAD DATA to print an error message on
2947
the slave to discover the problem.
2949
If reading from net (a 3.23 master), mysql_load() will change this
2952
handle_dup= DUP_ERROR;
2955
We need to set session->lex->sql_command and session->lex->duplicates
2956
since InnoDB tests these variables to decide if this is a LOAD
2957
DATA ... REPLACE INTO ... statement even though mysql_parse()
2958
is not called. This is not needed in 5.0 since there the LOAD
2959
DATA ... statement is replicated using mysql_parse(), which
2960
sets the session->lex fields correctly.
2962
session->lex->sql_command= SQLCOM_LOAD;
2963
session->lex->duplicates= handle_dup;
2965
sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
2966
String field_term(sql_ex.field_term,sql_ex.field_term_len,&my_charset_utf8_general_ci);
2967
String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,&my_charset_utf8_general_ci);
2968
String line_term(sql_ex.line_term,sql_ex.line_term_len,&my_charset_utf8_general_ci);
2969
String line_start(sql_ex.line_start,sql_ex.line_start_len,&my_charset_utf8_general_ci);
2970
String escaped(sql_ex.escaped,sql_ex.escaped_len, &my_charset_utf8_general_ci);
2971
ex.field_term= &field_term;
2972
ex.enclosed= &enclosed;
2973
ex.line_term= &line_term;
2974
ex.line_start= &line_start;
2975
ex.escaped= &escaped;
2977
ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
2978
if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
2979
ex.field_term->length(0);
2981
ex.skip_lines = skip_lines;
2982
List<Item> field_list;
2983
session->lex->select_lex.context.resolve_in_table_list_only(&tables);
2984
set_fields(tables.db, field_list, &session->lex->select_lex.context);
2985
session->variables.pseudo_thread_id= thread_id;
2988
// mysql_load will use session->net to read the file
2989
session->net.vio = net->vio;
2991
Make sure the client does not get confused about the packet sequence
2993
session->net.pkt_nr = net->pkt_nr;
2996
It is safe to use tmp_list twice because we are not going to
2997
update it inside mysql_load().
2999
List<Item> tmp_list;
3000
if (mysql_load(session, &ex, &tables, field_list, tmp_list, tmp_list,
3001
handle_dup, ignore, net != 0))
3002
session->is_slave_error= 1;
3003
if (session->cuted_fields)
3005
/* log_pos is the position of the LOAD event in the master log */
3006
sql_print_warning(_("Slave: load data infile on table '%s' at "
3007
"log position %s in log '%s' produced %ld "
3008
"warning(s). Default database: '%s'"),
3010
llstr(log_pos,llbuff), RPL_LOG_NAME,
3011
(ulong) session->cuted_fields,
3012
print_slave_db_safe(session->db));
3015
net->pkt_nr= session->net.pkt_nr;
3021
We will just ask the master to send us /dev/null if we do not
3022
want to load the data.
3023
TODO: this a bug - needs to be done in I/O thread
3026
skip_load_data_infile(net);
3030
session->net.vio = 0;
3031
const char *remember_db= session->db;
3032
pthread_mutex_lock(&LOCK_thread_count);
3033
session->catalog= 0;
3034
session->set_db(NULL, 0); /* will free the current database */
3036
session->query_length= 0;
3037
pthread_mutex_unlock(&LOCK_thread_count);
3038
close_thread_tables(session);
3040
if (session->is_slave_error)
3042
/* this err/sql_errno code is copy-paste from net_send_error() */
3045
if (session->is_error())
3047
err= session->main_da.message();
3048
sql_errno= session->main_da.sql_errno();
3052
sql_errno=ER_UNKNOWN_ERROR;
3055
rli->report(ERROR_LEVEL, sql_errno,
3056
_("Error '%s' running LOAD DATA INFILE on table '%s'. "
3057
"Default database: '%s'"),
3058
err, (char*)table_name, print_slave_db_safe(remember_db));
3059
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
3062
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
3064
if (session->is_fatal_error)
3067
snprintf(buf, sizeof(buf),
3068
_("Running LOAD DATA INFILE on table '%-.64s'."
3069
" Default database: '%-.64s'"),
3071
print_slave_db_safe(remember_db));
3073
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
3074
ER(ER_SLAVE_FATAL_ERROR), buf);
3078
return ( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) );
3082
/**************************************************************************
3083
Rotate_log_event methods
3084
**************************************************************************/
3087
Rotate_log_event::pack_info()
3090
void Rotate_log_event::pack_info(Protocol *protocol)
3092
char buf1[256], buf[22];
3093
String tmp(buf1, sizeof(buf1), &my_charset_utf8_general_ci);
3095
tmp.append(new_log_ident, ident_len);
3096
tmp.append(STRING_WITH_LEN(";pos="));
3097
tmp.append(llstr(pos,buf));
3098
protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
3103
Rotate_log_event::Rotate_log_event() (2 constructors)
3107
Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
3108
uint32_t ident_len_arg, uint64_t pos_arg,
3110
:Log_event(), new_log_ident(new_log_ident_arg),
3111
pos(pos_arg),ident_len(ident_len_arg ? ident_len_arg :
3112
(uint) strlen(new_log_ident_arg)), flags(flags_arg)
3114
if (flags & DUP_NAME)
3115
new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME));
3120
Rotate_log_event::Rotate_log_event(const char* buf, uint32_t event_len,
3121
const Format_description_log_event* description_event)
3122
:Log_event(buf, description_event) ,new_log_ident(0), flags(DUP_NAME)
3124
// The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
3125
uint8_t header_size= description_event->common_header_len;
3126
uint8_t post_header_len= description_event->post_header_len[ROTATE_EVENT-1];
3127
uint32_t ident_offset;
3128
if (event_len < header_size)
3131
pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
3132
ident_len = (uint)(event_len -
3133
(header_size+post_header_len));
3134
ident_offset = post_header_len;
3135
set_if_smaller(ident_len,FN_REFLEN-1);
3136
new_log_ident= my_strndup(buf + ident_offset, (uint) ident_len, MYF(MY_WME));
3142
Rotate_log_event::write()
3145
bool Rotate_log_event::write(IO_CACHE* file)
3147
char buf[ROTATE_HEADER_LEN];
3148
int8store(buf + R_POS_OFFSET, pos);
3149
return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
3150
my_b_safe_write(file, (unsigned char*)buf, ROTATE_HEADER_LEN) ||
3151
my_b_safe_write(file, (unsigned char*)new_log_ident, (uint) ident_len));
3156
Got a rotate log event from the master.
3158
This is mainly used so that we can later figure out the logname and
3159
position for the master.
3161
We can't rotate the slave's BINlog as this will cause infinitive rotations
3162
in a A -> B -> A setup.
3163
The NOTES below is a wrong comment which will disappear when 4.1 is merged.
3168
int Rotate_log_event::do_update_pos(Relay_log_info *rli)
3170
pthread_mutex_lock(&rli->data_lock);
3171
rli->event_relay_log_pos= my_b_tell(rli->cur_log);
3173
If we are in a transaction or in a group: the only normal case is
3174
when the I/O thread was copying a big transaction, then it was
3175
stopped and restarted: we have this in the relay log:
3183
In that case, we don't want to touch the coordinates which
3184
correspond to the beginning of the transaction. Starting from
3185
5.0.0, there also are some rotates from the slave itself, in the
3186
relay log, which shall not change the group positions.
3188
if ((server_id != ::server_id || rli->replicate_same_server_id) &&
3189
!rli->is_in_group())
3191
rli->group_master_log_name.assign(new_log_ident, ident_len+1);
3192
rli->notify_group_master_log_name_update();
3193
rli->group_master_log_pos= pos;
3194
rli->group_relay_log_name.assign(rli->event_relay_log_name);
3195
rli->notify_group_relay_log_name_update();
3196
rli->group_relay_log_pos= rli->event_relay_log_pos;
3198
Reset session->options and sql_mode etc, because this could be the signal of
3199
a master's downgrade from 5.0 to 4.0.
3200
However, no need to reset description_event_for_exec: indeed, if the next
3201
master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
3202
master is 4.0 then the events are in the slave's format (conversion).
3204
set_slave_thread_options(session);
3205
session->variables.auto_increment_increment=
3206
session->variables.auto_increment_offset= 1;
3208
pthread_mutex_unlock(&rli->data_lock);
3209
pthread_cond_broadcast(&rli->data_cond);
3210
flush_relay_log_info(rli);
3216
Log_event::enum_skip_reason
3217
Rotate_log_event::do_shall_skip(Relay_log_info *rli)
3219
enum_skip_reason reason= Log_event::do_shall_skip(rli);
3222
case Log_event::EVENT_SKIP_NOT:
3223
case Log_event::EVENT_SKIP_COUNT:
3224
return Log_event::EVENT_SKIP_NOT;
3226
case Log_event::EVENT_SKIP_IGNORE:
3227
return Log_event::EVENT_SKIP_IGNORE;
3230
return Log_event::EVENT_SKIP_NOT; // To keep compiler happy
3234
/**************************************************************************
3235
Intvar_log_event methods
3236
**************************************************************************/
3239
Intvar_log_event::pack_info()
3242
void Intvar_log_event::pack_info(Protocol *protocol)
3244
char buf[256], *pos;
3245
pos= strmake(buf, get_var_type_name(), sizeof(buf)-23);
3247
pos= int64_t10_to_str(val, pos, -10);
3248
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3253
Intvar_log_event::Intvar_log_event()
3256
Intvar_log_event::Intvar_log_event(const char* buf,
3257
const Format_description_log_event* description_event)
3258
:Log_event(buf, description_event)
3260
buf+= description_event->common_header_len;
3261
type= buf[I_TYPE_OFFSET];
3262
val= uint8korr(buf+I_VAL_OFFSET);
3267
Intvar_log_event::get_var_type_name()
3270
const char* Intvar_log_event::get_var_type_name()
3273
case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
3274
case INSERT_ID_EVENT: return "INSERT_ID";
3275
default: /* impossible */ return "UNKNOWN";
3281
Intvar_log_event::write()
3284
bool Intvar_log_event::write(IO_CACHE* file)
3286
unsigned char buf[9];
3287
buf[I_TYPE_OFFSET]= (unsigned char) type;
3288
int8store(buf + I_VAL_OFFSET, val);
3289
return (write_header(file, sizeof(buf)) ||
3290
my_b_safe_write(file, buf, sizeof(buf)));
3295
Intvar_log_event::print()
3299
Intvar_log_event::do_apply_event()
3302
int Intvar_log_event::do_apply_event(Relay_log_info const *rli)
3305
We are now in a statement until the associated query log event has
3308
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3311
case LAST_INSERT_ID_EVENT:
3312
session->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1;
3313
session->first_successful_insert_id_in_prev_stmt= val;
3315
case INSERT_ID_EVENT:
3316
session->force_one_auto_inc_interval(val);
3322
int Intvar_log_event::do_update_pos(Relay_log_info *rli)
3324
rli->inc_event_relay_log_pos();
3329
Log_event::enum_skip_reason
3330
Intvar_log_event::do_shall_skip(Relay_log_info *rli)
3333
It is a common error to set the slave skip counter to 1 instead of
3334
2 when recovering from an insert which used a auto increment,
3335
rand, or user var. Therefore, if the slave skip counter is 1, we
3336
just say that this event should be skipped by ignoring it, meaning
3337
that we do not change the value of the slave skip counter since it
3338
will be decreased by the following insert event.
3340
return continue_group(rli);
3344
/**************************************************************************
3345
Rand_log_event methods
3346
**************************************************************************/
3348
void Rand_log_event::pack_info(Protocol *protocol)
3350
char buf1[256], *pos;
3351
pos= my_stpcpy(buf1,"rand_seed1=");
3352
pos= int10_to_str((long) seed1, pos, 10);
3353
pos= my_stpcpy(pos, ",rand_seed2=");
3354
pos= int10_to_str((long) seed2, pos, 10);
3355
protocol->store(buf1, (uint) (pos-buf1), &my_charset_bin);
3359
Rand_log_event::Rand_log_event(const char* buf,
3360
const Format_description_log_event* description_event)
3361
:Log_event(buf, description_event)
3363
buf+= description_event->common_header_len;
3364
seed1= uint8korr(buf+RAND_SEED1_OFFSET);
3365
seed2= uint8korr(buf+RAND_SEED2_OFFSET);
3369
bool Rand_log_event::write(IO_CACHE* file)
3371
unsigned char buf[16];
3372
int8store(buf + RAND_SEED1_OFFSET, seed1);
3373
int8store(buf + RAND_SEED2_OFFSET, seed2);
3374
return (write_header(file, sizeof(buf)) ||
3375
my_b_safe_write(file, buf, sizeof(buf)));
3379
int Rand_log_event::do_apply_event(Relay_log_info const *rli)
3382
We are now in a statement until the associated query log event has
3385
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3387
session->rand.seed1= (ulong) seed1;
3388
session->rand.seed2= (ulong) seed2;
3392
int Rand_log_event::do_update_pos(Relay_log_info *rli)
3394
rli->inc_event_relay_log_pos();
3399
Log_event::enum_skip_reason
3400
Rand_log_event::do_shall_skip(Relay_log_info *rli)
3403
It is a common error to set the slave skip counter to 1 instead of
3404
2 when recovering from an insert which used a auto increment,
3405
rand, or user var. Therefore, if the slave skip counter is 1, we
3406
just say that this event should be skipped by ignoring it, meaning
3407
that we do not change the value of the slave skip counter since it
3408
will be decreased by the following insert event.
3410
return continue_group(rli);
3414
/**************************************************************************
3415
Xid_log_event methods
3416
**************************************************************************/
3418
void Xid_log_event::pack_info(Protocol *protocol)
3420
char buf[128], *pos;
3421
pos= my_stpcpy(buf, "COMMIT /* xid=");
3422
pos= int64_t10_to_str(xid, pos, 10);
3423
pos= my_stpcpy(pos, " */");
3424
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3429
It's ok not to use int8store here,
3430
as long as XID::set(uint64_t) and
3431
XID::get_my_xid doesn't do it either.
3432
We don't care about actual values of xids as long as
3433
identical numbers compare identically
3437
Xid_log_event(const char* buf,
3438
const Format_description_log_event *description_event)
3439
:Log_event(buf, description_event)
3441
buf+= description_event->common_header_len;
3442
memcpy(&xid, buf, sizeof(xid));
3446
bool Xid_log_event::write(IO_CACHE* file)
3448
return write_header(file, sizeof(xid)) ||
3449
my_b_safe_write(file, (unsigned char*) &xid, sizeof(xid));
3453
int Xid_log_event::do_apply_event(const Relay_log_info *)
3455
return end_trans(session, COMMIT);
3458
Log_event::enum_skip_reason
3459
Xid_log_event::do_shall_skip(Relay_log_info *rli)
3461
if (rli->slave_skip_counter > 0) {
3462
session->options&= ~OPTION_BEGIN;
3463
return(Log_event::EVENT_SKIP_COUNT);
3465
return(Log_event::do_shall_skip(rli));
3469
/**************************************************************************
3470
User_var_log_event methods
3471
**************************************************************************/
3473
void User_var_log_event::pack_info(Protocol* protocol)
3476
uint32_t val_offset= 4 + name_len;
3477
uint32_t event_len= val_offset;
3481
if (!(buf= (char*) my_malloc(val_offset + 5, MYF(MY_WME))))
3483
my_stpcpy(buf + val_offset, "NULL");
3484
event_len= val_offset + 4;
3491
float8get(real_val, val);
3492
if (!(buf= (char*) my_malloc(val_offset + MY_GCVT_MAX_FIELD_WIDTH + 1,
3495
event_len+= my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, MY_GCVT_MAX_FIELD_WIDTH,
3496
buf + val_offset, NULL);
3499
if (!(buf= (char*) my_malloc(val_offset + 22, MYF(MY_WME))))
3501
event_len= int64_t10_to_str(uint8korr(val), buf + val_offset,-10)-buf;
3503
case DECIMAL_RESULT:
3505
if (!(buf= (char*) my_malloc(val_offset + DECIMAL_MAX_STR_LENGTH,
3508
String str(buf+val_offset, DECIMAL_MAX_STR_LENGTH, &my_charset_bin);
3510
binary2my_decimal(E_DEC_FATAL_ERROR, (unsigned char*) (val+2), &dec, val[0],
3512
my_decimal2string(E_DEC_FATAL_ERROR, &dec, 0, 0, 0, &str);
3513
event_len= str.length() + val_offset;
3517
/* 15 is for 'COLLATE' and other chars */
3518
buf= (char*) my_malloc(event_len+val_len*2+1+2*MY_CS_NAME_SIZE+15,
3520
const CHARSET_INFO *cs;
3523
if (!(cs= get_charset(charset_number, MYF(0))))
3525
my_stpcpy(buf+val_offset, "???");
3530
char *p= strxmov(buf + val_offset, "_", cs->csname, " ", NULL);
3531
p= str_to_hex(p, val, val_len);
3532
p= strxmov(p, " COLLATE ", cs->name, NULL);
3544
memcpy(buf+2, name, name_len);
3545
buf[2+name_len]= '`';
3546
buf[3+name_len]= '=';
3547
protocol->store(buf, event_len, &my_charset_bin);
3552
User_var_log_event::
3553
User_var_log_event(const char* buf,
3554
const Format_description_log_event* description_event)
3555
:Log_event(buf, description_event)
3557
buf+= description_event->common_header_len;
3558
name_len= uint4korr(buf);
3559
name= (char *) buf + UV_NAME_LEN_SIZE;
3560
buf+= UV_NAME_LEN_SIZE + name_len;
3561
is_null= (bool) *buf;
3564
type= STRING_RESULT;
3565
charset_number= my_charset_bin.number;
3571
type= (Item_result) buf[UV_VAL_IS_NULL];
3572
charset_number= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE);
3573
val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3574
UV_CHARSET_NUMBER_SIZE);
3575
val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3576
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE);
3581
bool User_var_log_event::write(IO_CACHE* file)
3583
char buf[UV_NAME_LEN_SIZE];
3584
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3585
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
3586
unsigned char buf2[(8 > DECIMAL_MAX_FIELD_SIZE + 2) ? 8 : DECIMAL_MAX_FIELD_SIZE +2], *pos= buf2;
3587
uint32_t buf1_length;
3590
int4store(buf, name_len);
3592
if ((buf1[0]= is_null))
3595
val_len= 0; // Length of 'pos'
3600
int4store(buf1 + 2, charset_number);
3604
float8store(buf2, *(double*) val);
3607
int8store(buf2, *(int64_t*) val);
3609
case DECIMAL_RESULT:
3611
my_decimal *dec= (my_decimal *)val;
3612
dec->fix_buffer_pointer();
3613
buf2[0]= (char)(dec->intg + dec->frac);
3614
buf2[1]= (char)dec->frac;
3615
decimal2bin((decimal_t*)val, buf2+2, buf2[0], buf2[1]);
3616
val_len= decimal_bin_size(buf2[0], buf2[1]) + 2;
3620
pos= (unsigned char*) val;
3627
int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
3631
/* Length of the whole event */
3632
event_length= sizeof(buf)+ name_len + buf1_length + val_len;
3634
return (write_header(file, event_length) ||
3635
my_b_safe_write(file, (unsigned char*) buf, sizeof(buf)) ||
3636
my_b_safe_write(file, (unsigned char*) name, name_len) ||
3637
my_b_safe_write(file, (unsigned char*) buf1, buf1_length) ||
3638
my_b_safe_write(file, pos, val_len));
3644
User_var_log_event::do_apply_event()
3647
int User_var_log_event::do_apply_event(Relay_log_info const *rli)
3650
const CHARSET_INFO *charset;
3651
if (!(charset= get_charset(charset_number, MYF(MY_WME))))
3653
LEX_STRING user_var_name;
3654
user_var_name.str= name;
3655
user_var_name.length= name_len;
3660
We are now in a statement until the associated query log event has
3663
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3667
it= new Item_null();
3673
float8get(real_val, val);
3674
it= new Item_float(real_val, 0);
3675
val= (char*) &real_val; // Pointer to value in native format
3679
int_val= (int64_t) uint8korr(val);
3680
it= new Item_int(int_val);
3681
val= (char*) &int_val; // Pointer to value in native format
3684
case DECIMAL_RESULT:
3686
Item_decimal *dec= new Item_decimal((unsigned char*) val+2, val[0], val[1]);
3688
val= (char *)dec->val_decimal(NULL);
3689
val_len= sizeof(my_decimal);
3693
it= new Item_string(val, val_len, charset);
3701
Item_func_set_user_var e(user_var_name, it);
3703
Item_func_set_user_var can't substitute something else on its place =>
3704
0 can be passed as last argument (reference on item)
3706
e.fix_fields(session, 0);
3708
A variable can just be considered as a table with
3709
a single record and with a single column. Thus, like
3710
a column value, it could always have IMPLICIT derivation.
3712
e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0);
3713
free_root(session->mem_root,0);
3718
int User_var_log_event::do_update_pos(Relay_log_info *rli)
3720
rli->inc_event_relay_log_pos();
3724
Log_event::enum_skip_reason
3725
User_var_log_event::do_shall_skip(Relay_log_info *rli)
3728
It is a common error to set the slave skip counter to 1 instead
3729
of 2 when recovering from an insert which used a auto increment,
3730
rand, or user var. Therefore, if the slave skip counter is 1, we
3731
just say that this event should be skipped by ignoring it, meaning
3732
that we do not change the value of the slave skip counter since it
3733
will be decreased by the following insert event.
3735
return continue_group(rli);
3739
/**************************************************************************
3740
Slave_log_event methods
3741
**************************************************************************/
3743
void Slave_log_event::pack_info(Protocol *protocol)
3745
char buf[256+HOSTNAME_LENGTH], *pos;
3746
pos= my_stpcpy(buf, "host=");
3747
pos= my_stpncpy(pos, master_host.c_str(), HOSTNAME_LENGTH);
3748
pos= my_stpcpy(pos, ",port=");
3749
pos= int10_to_str((long) master_port, pos, 10);
3750
pos= my_stpcpy(pos, ",log=");
3751
pos= my_stpcpy(pos, master_log.c_str());
3752
pos= my_stpcpy(pos, ",pos=");
3753
pos= int64_t10_to_str(master_pos, pos, 10);
3754
protocol->store(buf, pos-buf, &my_charset_bin);
3760
re-write this better without holding both locks at the same time
3762
Slave_log_event::Slave_log_event(Session* session_arg,
3763
Relay_log_info* rli)
3764
:Log_event(session_arg, 0, 0) , mem_pool(0), master_host(0)
3766
if (!rli->inited) // QQ When can this happen ?
3769
Master_info* mi = rli->mi;
3770
// TODO: re-write this better without holding both locks at the same time
3771
pthread_mutex_lock(&mi->data_lock);
3772
pthread_mutex_lock(&rli->data_lock);
3773
// on OOM, just do not initialize the structure and print the error
3774
if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
3777
master_host.assign(mi->getHostname());
3778
master_log.assign(rli->group_master_log_name);
3779
master_port = mi->getPort();
3780
master_pos = rli->group_master_log_pos;
3783
sql_print_error(_("Out of memory while recording slave event"));
3784
pthread_mutex_unlock(&rli->data_lock);
3785
pthread_mutex_unlock(&mi->data_lock);
3790
Slave_log_event::~Slave_log_event()
3796
int Slave_log_event::get_data_size()
3798
return master_host.length() + master_log.length() + 1 + SL_MASTER_HOST_OFFSET;
3802
bool Slave_log_event::write(IO_CACHE* file)
3804
ulong event_length= get_data_size();
3805
int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
3806
int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
3807
// log and host are already there
3809
return (write_header(file, event_length) ||
3810
my_b_safe_write(file, (unsigned char*) mem_pool, event_length));
3814
void Slave_log_event::init_from_mem_pool()
3816
master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
3817
master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
3819
/* Assign these correctly */
3820
master_host.assign(mem_pool + SL_MASTER_HOST_OFFSET);
3821
master_log.assign();
3826
int Slave_log_event::do_apply_event(const Relay_log_info *)
3828
if (mysql_bin_log.is_open())
3829
mysql_bin_log.write(this);
3834
/**************************************************************************
3835
Stop_log_event methods
3836
**************************************************************************/
3839
The master stopped. We used to clean up all temporary tables but
3840
this is useless as, as the master has shut down properly, it has
3841
written all DROP TEMPORARY Table (prepared statements' deletion is
3842
TODO only when we binlog prep stmts). We used to clean up
3843
slave_load_tmpdir, but this is useless as it has been cleared at the
3844
end of LOAD DATA INFILE. So we have nothing to do here. The place
3845
were we must do this cleaning is in
3846
Start_log_event_v3::do_apply_event(), not here. Because if we come
3847
here, the master was sane.
3849
int Stop_log_event::do_update_pos(Relay_log_info *rli)
3852
We do not want to update master_log pos because we get a rotate event
3853
before stop, so by now group_master_log_name is set to the next log.
3854
If we updated it, we will have incorrect master coordinates and this
3855
could give false triggers in MASTER_POS_WAIT() that we have reached
3856
the target position when in fact we have not.
3858
if (session->options & OPTION_BEGIN)
3859
rli->inc_event_relay_log_pos();
3862
rli->inc_group_relay_log_pos(0);
3863
flush_relay_log_info(rli);
3869
/**************************************************************************
3870
Create_file_log_event methods
3871
**************************************************************************/
3874
Create_file_log_event ctor
3877
Create_file_log_event::
3878
Create_file_log_event(Session* session_arg, sql_exchange* ex,
3879
const char* db_arg, const char* table_name_arg,
3880
List<Item>& fields_arg, enum enum_duplicates handle_dup,
3882
unsigned char* block_arg, uint32_t block_len_arg, bool using_trans)
3883
:Load_log_event(session_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
3885
fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
3886
file_id(session_arg->file_id = mysql_bin_log.next_file_id())
3888
sql_ex.force_new_format();
3894
Create_file_log_event::write_data_body()
3897
bool Create_file_log_event::write_data_body(IO_CACHE* file)
3900
if ((res= Load_log_event::write_data_body(file)) || fake_base)
3902
return (my_b_safe_write(file, (unsigned char*) "", 1) ||
3903
my_b_safe_write(file, (unsigned char*) block, block_len));
3908
Create_file_log_event::write_data_header()
3911
bool Create_file_log_event::write_data_header(IO_CACHE* file)
3914
unsigned char buf[CREATE_FILE_HEADER_LEN];
3915
if ((res= Load_log_event::write_data_header(file)) || fake_base)
3917
int4store(buf + CF_FILE_ID_OFFSET, file_id);
3918
return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
3923
Create_file_log_event::write_base()
3926
bool Create_file_log_event::write_base(IO_CACHE* file)
3929
fake_base= 1; // pretend we are Load event
3936
Create_file_log_event ctor
3939
Create_file_log_event::Create_file_log_event(const char* buf, uint32_t len,
3940
const Format_description_log_event* description_event)
3941
:Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0)
3943
uint32_t block_offset;
3944
uint32_t header_len= description_event->common_header_len;
3945
uint8_t load_header_len= description_event->post_header_len[LOAD_EVENT-1];
3946
uint8_t create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
3947
if (!(event_buf= (char*) my_memdup(buf, len, MYF(MY_WME))) ||
3948
copy_log_event(event_buf,len,
3949
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
3950
load_header_len + header_len :
3951
(fake_base ? (header_len+load_header_len) :
3952
(header_len+load_header_len) +
3953
create_file_header_len)),
3956
if (description_event->binlog_version!=1)
3958
file_id= uint4korr(buf +
3960
load_header_len + CF_FILE_ID_OFFSET);
3962
Note that it's ok to use get_data_size() below, because it is computed
3963
with values we have already read from this event (because we called
3964
copy_log_event()); we are not using slave's format info to decode
3965
master's format, we are really using master's format info.
3966
Anyway, both formats should be identical (except the common_header_len)
3967
as these Load events are not changed between 4.0 and 5.0 (as logging of
3968
LOAD DATA INFILE does not use Load_log_event in 5.0).
3970
The + 1 is for \0 terminating fname
3972
block_offset= (description_event->common_header_len +
3973
Load_log_event::get_data_size() +
3974
create_file_header_len + 1);
3975
if (len < block_offset)
3977
block = (unsigned char*)buf + block_offset;
3978
block_len = len - block_offset;
3982
sql_ex.force_new_format();
3983
inited_from_old = 1;
3990
Create_file_log_event::pack_info()
3993
void Create_file_log_event::pack_info(Protocol *protocol)
3995
char buf[NAME_LEN*2 + 30 + 21*2], *pos;
3996
pos= my_stpcpy(buf, "db=");
3997
memcpy(pos, db, db_len);
3998
pos= my_stpcpy(pos + db_len, ";table=");
3999
memcpy(pos, table_name, table_name_len);
4000
pos= my_stpcpy(pos + table_name_len, ";file_id=");
4001
pos= int10_to_str((long) file_id, pos, 10);
4002
pos= my_stpcpy(pos, ";block_len=");
4003
pos= int10_to_str((long) block_len, pos, 10);
4004
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
4009
Create_file_log_event::do_apply_event()
4012
int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
4014
char proc_info[17+FN_REFLEN+10], *fname_buf;
4020
memset(&file, 0, sizeof(file));
4021
fname_buf= my_stpcpy(proc_info, "Making temp file ");
4022
ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
4023
session->set_proc_info(proc_info);
4024
my_delete(fname_buf, MYF(0)); // old copy may exist already
4025
if ((fd= my_create(fname_buf, CREATE_MODE,
4027
MYF(MY_WME))) < 0 ||
4028
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
4029
MYF(MY_WME|MY_NABP)))
4031
rli->report(ERROR_LEVEL, my_errno,
4032
_("Error in Create_file event: could not open file '%s'"),
4037
// a trick to avoid allocating another buffer
4039
fname_len= (uint) (my_stpcpy(ext, ".data") - fname);
4040
if (write_base(&file))
4042
my_stpcpy(ext, ".info"); // to have it right in the error message
4043
rli->report(ERROR_LEVEL, my_errno,
4044
_("Error in Create_file event: could not write to file '%s'"),
4048
end_io_cache(&file);
4049
my_close(fd, MYF(0));
4051
// fname_buf now already has .data, not .info, because we did our trick
4052
my_delete(fname_buf, MYF(0)); // old copy may exist already
4053
if ((fd= my_create(fname_buf, CREATE_MODE,
4057
rli->report(ERROR_LEVEL, my_errno,
4058
_("Error in Create_file event: could not open file '%s'"),
4062
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
4064
rli->report(ERROR_LEVEL, my_errno,
4065
_("Error in Create_file event: write to '%s' failed"),
4069
error=0; // Everything is ok
4073
end_io_cache(&file);
4075
my_close(fd, MYF(0));
4076
session->set_proc_info(0);
4081
/**************************************************************************
4082
Append_block_log_event methods
4083
**************************************************************************/
4086
Append_block_log_event ctor
4089
Append_block_log_event::Append_block_log_event(Session *session_arg,
4091
unsigned char *block_arg,
4092
uint32_t block_len_arg,
4094
:Log_event(session_arg,0, using_trans), block(block_arg),
4095
block_len(block_len_arg), file_id(session_arg->file_id), db(db_arg)
4101
Append_block_log_event ctor
4104
Append_block_log_event::Append_block_log_event(const char* buf, uint32_t len,
4105
const Format_description_log_event* description_event)
4106
:Log_event(buf, description_event),block(0)
4108
uint8_t common_header_len= description_event->common_header_len;
4109
uint8_t append_block_header_len=
4110
description_event->post_header_len[APPEND_BLOCK_EVENT-1];
4111
uint32_t total_header_len= common_header_len+append_block_header_len;
4112
if (len < total_header_len)
4114
file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
4115
block= (unsigned char*)buf + total_header_len;
4116
block_len= len - total_header_len;
4122
Append_block_log_event::write()
4125
bool Append_block_log_event::write(IO_CACHE* file)
4127
unsigned char buf[APPEND_BLOCK_HEADER_LEN];
4128
int4store(buf + AB_FILE_ID_OFFSET, file_id);
4129
return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
4130
my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
4131
my_b_safe_write(file, (unsigned char*) block, block_len));
4136
Append_block_log_event::pack_info()
4139
void Append_block_log_event::pack_info(Protocol *protocol)
4143
length= (uint) sprintf(buf, ";file_id=%u;block_len=%u", file_id,
4145
protocol->store(buf, length, &my_charset_bin);
4150
Append_block_log_event::get_create_or_append()
4153
int Append_block_log_event::get_create_or_append() const
4155
return 0; /* append to the file, fail if not exists */
4159
Append_block_log_event::do_apply_event()
4162
int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
4164
char proc_info[17+FN_REFLEN+10], *fname= proc_info+17;
4168
fname= my_stpcpy(proc_info, "Making temp file ");
4169
slave_load_file_stem(fname, file_id, server_id, ".data");
4170
session->set_proc_info(proc_info);
4171
if (get_create_or_append())
4173
my_delete(fname, MYF(0)); // old copy may exist already
4174
if ((fd= my_create(fname, CREATE_MODE,
4178
rli->report(ERROR_LEVEL, my_errno,
4179
_("Error in %s event: could not create file '%s'"),
4180
get_type_str(), fname);
4184
else if ((fd = my_open(fname, O_WRONLY | O_APPEND,
4187
rli->report(ERROR_LEVEL, my_errno,
4188
_("Error in %s event: could not open file '%s'"),
4189
get_type_str(), fname);
4192
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
4194
rli->report(ERROR_LEVEL, my_errno,
4195
_("Error in %s event: write to '%s' failed"),
4196
get_type_str(), fname);
4203
my_close(fd, MYF(0));
4204
session->set_proc_info(0);
4209
/**************************************************************************
4210
Delete_file_log_event methods
4211
**************************************************************************/
4214
Delete_file_log_event ctor
4217
Delete_file_log_event::Delete_file_log_event(Session *session_arg, const char* db_arg,
4219
:Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
4224
Delete_file_log_event ctor
4227
Delete_file_log_event::Delete_file_log_event(const char* buf, uint32_t len,
4228
const Format_description_log_event* description_event)
4229
:Log_event(buf, description_event),file_id(0)
4231
uint8_t common_header_len= description_event->common_header_len;
4232
uint8_t delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1];
4233
if (len < (uint)(common_header_len + delete_file_header_len))
4235
file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET);
4240
Delete_file_log_event::write()
4243
bool Delete_file_log_event::write(IO_CACHE* file)
4245
unsigned char buf[DELETE_FILE_HEADER_LEN];
4246
int4store(buf + DF_FILE_ID_OFFSET, file_id);
4247
return (write_header(file, sizeof(buf)) ||
4248
my_b_safe_write(file, buf, sizeof(buf)));
4253
Delete_file_log_event::pack_info()
4256
void Delete_file_log_event::pack_info(Protocol *protocol)
4260
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
4261
protocol->store(buf, (int32_t) length, &my_charset_bin);
4265
Delete_file_log_event::do_apply_event()
4268
int Delete_file_log_event::do_apply_event(const Relay_log_info *)
4270
char fname[FN_REFLEN+10];
4271
char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
4272
(void) my_delete(fname, MYF(MY_WME));
4273
my_stpcpy(ext, ".info");
4274
(void) my_delete(fname, MYF(MY_WME));
4279
/**************************************************************************
4280
Execute_load_log_event methods
4281
**************************************************************************/
4284
Execute_load_log_event ctor
4287
Execute_load_log_event::Execute_load_log_event(Session *session_arg,
4290
:Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
4296
Execute_load_log_event ctor
4299
Execute_load_log_event::Execute_load_log_event(const char* buf, uint32_t len,
4300
const Format_description_log_event* description_event)
4301
:Log_event(buf, description_event), file_id(0)
4303
uint8_t common_header_len= description_event->common_header_len;
4304
uint8_t exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1];
4305
if (len < (uint)(common_header_len+exec_load_header_len))
4307
file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET);
4312
Execute_load_log_event::write()
4315
bool Execute_load_log_event::write(IO_CACHE* file)
4317
unsigned char buf[EXEC_LOAD_HEADER_LEN];
4318
int4store(buf + EL_FILE_ID_OFFSET, file_id);
4319
return (write_header(file, sizeof(buf)) ||
4320
my_b_safe_write(file, buf, sizeof(buf)));
4325
Execute_load_log_event::pack_info()
4328
void Execute_load_log_event::pack_info(Protocol *protocol)
4332
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
4333
protocol->store(buf, (int32_t) length, &my_charset_bin);
4338
Execute_load_log_event::do_apply_event()
4341
int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
4343
char fname[FN_REFLEN+10];
4348
Load_log_event *lev= 0;
4350
ext= slave_load_file_stem(fname, file_id, server_id, ".info");
4351
if ((fd = my_open(fname, O_RDONLY,
4352
MYF(MY_WME))) < 0 ||
4353
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
4354
MYF(MY_WME|MY_NABP)))
4356
rli->report(ERROR_LEVEL, my_errno,
4357
_("Error in Exec_load event: could not open file '%s'"),
4361
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
4362
(pthread_mutex_t*)0,
4363
rli->relay_log.description_event_for_exec)) ||
4364
lev->get_type_code() != NEW_LOAD_EVENT)
4366
rli->report(ERROR_LEVEL, 0,
4367
_("Error in Exec_load event: "
4368
"file '%s' appears corrupted"),
4373
lev->session = session;
4375
lev->do_apply_event should use rli only for errors i.e. should
4376
not advance rli's position.
4378
lev->do_apply_event is the place where the table is loaded (it
4379
calls mysql_load()).
4382
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
4383
if (lev->do_apply_event(0,rli,1))
4386
We want to indicate the name of the file that could not be loaded
4388
But as we are here we are sure the error is in rli->last_slave_error and
4389
rli->last_slave_errno (example of error: duplicate entry for key), so we
4390
don't want to overwrite it with the filename.
4391
What we want instead is add the filename to the current error message.
4393
char *tmp= my_strdup(rli->last_error().message, MYF(MY_WME));
4396
rli->report(ERROR_LEVEL, rli->last_error().number,
4397
_("%s. Failed executing load from '%s'"),
4404
We have an open file descriptor to the .info file; we need to close it
4405
or Windows will refuse to delete the file in my_delete().
4409
my_close(fd, MYF(0));
4410
end_io_cache(&file);
4413
(void) my_delete(fname, MYF(MY_WME));
4414
memcpy(ext, ".data", 6);
4415
(void) my_delete(fname, MYF(MY_WME));
4422
my_close(fd, MYF(0));
4423
end_io_cache(&file);
4429
/**************************************************************************
4430
Begin_load_query_log_event methods
4431
**************************************************************************/
4433
Begin_load_query_log_event::
4434
Begin_load_query_log_event(Session* session_arg, const char* db_arg, unsigned char* block_arg,
4435
uint32_t block_len_arg, bool using_trans)
4436
:Append_block_log_event(session_arg, db_arg, block_arg, block_len_arg,
4439
file_id= session_arg->file_id= mysql_bin_log.next_file_id();
4443
Begin_load_query_log_event::
4444
Begin_load_query_log_event(const char* buf, uint32_t len,
4445
const Format_description_log_event* desc_event)
4446
:Append_block_log_event(buf, len, desc_event)
4451
int Begin_load_query_log_event::get_create_or_append() const
4453
return 1; /* create the file */
4457
Log_event::enum_skip_reason
4458
Begin_load_query_log_event::do_shall_skip(Relay_log_info *rli)
4461
If the slave skip counter is 1, then we should not start executing
4464
return continue_group(rli);
4468
/**************************************************************************
4469
Execute_load_query_log_event methods
4470
**************************************************************************/
4473
Execute_load_query_log_event::
4474
Execute_load_query_log_event(Session *session_arg, const char* query_arg,
4475
ulong query_length_arg, uint32_t fn_pos_start_arg,
4476
uint32_t fn_pos_end_arg,
4477
enum_load_dup_handling dup_handling_arg,
4478
bool using_trans, bool suppress_use,
4479
Session::killed_state killed_err_arg):
4480
Query_log_event(session_arg, query_arg, query_length_arg, using_trans,
4481
suppress_use, killed_err_arg),
4482
file_id(session_arg->file_id), fn_pos_start(fn_pos_start_arg),
4483
fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
4488
Execute_load_query_log_event::
4489
Execute_load_query_log_event(const char* buf, uint32_t event_len,
4490
const Format_description_log_event* desc_event):
4491
Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT),
4492
file_id(0), fn_pos_start(0), fn_pos_end(0)
4494
if (!Query_log_event::is_valid())
4497
buf+= desc_event->common_header_len;
4499
fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET);
4500
fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET);
4501
dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET));
4503
if (fn_pos_start > q_len || fn_pos_end > q_len ||
4504
dup_handling > LOAD_DUP_REPLACE)
4507
file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET);
4511
ulong Execute_load_query_log_event::get_post_header_size_for_derived()
4513
return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN;
4518
Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
4520
unsigned char buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
4521
int4store(buf, file_id);
4522
int4store(buf + 4, fn_pos_start);
4523
int4store(buf + 4 + 4, fn_pos_end);
4524
*(buf + 4 + 4 + 4)= (unsigned char) dup_handling;
4525
return my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
4529
void Execute_load_query_log_event::pack_info(Protocol *protocol)
4532
if (!(buf= (char*) my_malloc(9 + db_len + q_len + 10 + 21, MYF(MY_WME))))
4537
pos= my_stpcpy(buf, "use `");
4538
memcpy(pos, db, db_len);
4539
pos= my_stpcpy(pos+db_len, "`; ");
4543
memcpy(pos, query, q_len);
4546
pos= my_stpcpy(pos, " ;file_id=");
4547
pos= int10_to_str((long) file_id, pos, 10);
4548
protocol->store(buf, pos-buf, &my_charset_bin);
4554
Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli)
4562
buf= (char*) my_malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
4563
(FN_REFLEN + 10) + 10 + 8 + 5, MYF(MY_WME));
4565
/* Replace filename and LOCAL keyword in query before executing it */
4568
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
4569
ER(ER_SLAVE_FATAL_ERROR),
4570
_("Not enough memory"));
4575
memcpy(p, query, fn_pos_start);
4577
fname= (p= strmake(p, STRING_WITH_LEN(" INFILE \'")));
4578
p= slave_load_file_stem(p, file_id, server_id, ".data");
4579
fname_end= p= strchr(p, '\0'); // Safer than p=p+5
4581
switch (dup_handling) {
4582
case LOAD_DUP_IGNORE:
4583
p= strmake(p, STRING_WITH_LEN(" IGNORE"));
4585
case LOAD_DUP_REPLACE:
4586
p= strmake(p, STRING_WITH_LEN(" REPLACE"));
4589
/* Ordinary load data */
4592
p= strmake(p, STRING_WITH_LEN(" INTO"));
4593
p= strmake(p, query+fn_pos_end, q_len-fn_pos_end);
4595
error= Query_log_event::do_apply_event(rli, buf, p-buf);
4597
/* Forging file name for deletion in same buffer */
4601
If there was an error the slave is going to stop, leave the
4602
file so that we can re-execute this event at START SLAVE.
4605
(void) my_delete(fname, MYF(MY_WME));
4612
/**************************************************************************
4614
**************************************************************************/
4617
sql_ex_info::write_data()
4620
bool sql_ex_info::write_data(IO_CACHE* file)
4624
return (write_str(file, field_term, (uint) field_term_len) ||
4625
write_str(file, enclosed, (uint) enclosed_len) ||
4626
write_str(file, line_term, (uint) line_term_len) ||
4627
write_str(file, line_start, (uint) line_start_len) ||
4628
write_str(file, escaped, (uint) escaped_len) ||
4629
my_b_safe_write(file,(unsigned char*) &opt_flags,1));
4634
@todo This is sensitive to field padding. We should write a
4635
char[7], not an old_sql_ex. /sven
4638
old_ex.field_term= *field_term;
4639
old_ex.enclosed= *enclosed;
4640
old_ex.line_term= *line_term;
4641
old_ex.line_start= *line_start;
4642
old_ex.escaped= *escaped;
4643
old_ex.opt_flags= opt_flags;
4644
old_ex.empty_flags=empty_flags;
4645
return my_b_safe_write(file, (unsigned char*) &old_ex, sizeof(old_ex)) != 0;
4654
const char *sql_ex_info::init(const char *buf, const char *buf_end,
4655
bool use_new_format)
4657
cached_new_format = use_new_format;
4662
The code below assumes that buf will not disappear from
4663
under our feet during the lifetime of the event. This assumption
4664
holds true in the slave thread if the log is in new format, but is not
4665
the case when we have old format because we will be reusing net buffer
4666
to read the actual file before we write out the Create_file event.
4668
if (read_str(&buf, buf_end, &field_term, &field_term_len) ||
4669
read_str(&buf, buf_end, &enclosed, &enclosed_len) ||
4670
read_str(&buf, buf_end, &line_term, &line_term_len) ||
4671
read_str(&buf, buf_end, &line_start, &line_start_len) ||
4672
read_str(&buf, buf_end, &escaped, &escaped_len))
4678
field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
4679
field_term = buf++; // Use first byte in string
4685
empty_flags= *buf++;
4686
if (empty_flags & FIELD_TERM_EMPTY)
4688
if (empty_flags & ENCLOSED_EMPTY)
4690
if (empty_flags & LINE_TERM_EMPTY)
4692
if (empty_flags & LINE_START_EMPTY)
4694
if (empty_flags & ESCAPED_EMPTY)
4701
/**************************************************************************
4702
Rows_log_event member functions
4703
**************************************************************************/
4705
Rows_log_event::Rows_log_event(Session *session_arg, Table *tbl_arg, ulong tid,
4706
MY_BITMAP const *cols, bool is_transactional)
4707
: Log_event(session_arg, 0, is_transactional),
4711
m_width(tbl_arg ? tbl_arg->s->fields : 1),
4712
m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
4713
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4716
We allow a special form of dummy event when the table, and cols
4717
are null and the table id is UINT32_MAX. This is a temporary
4718
solution, to be able to terminate a started statement in the
4719
binary log: the extraneous events will be removed in the future.
4721
assert((tbl_arg && tbl_arg->s && tid != UINT32_MAX) || (!tbl_arg && !cols && tid == UINT32_MAX));
4723
if (session_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
4724
set_flags(NO_FOREIGN_KEY_CHECKS_F);
4725
if (session_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
4726
set_flags(RELAXED_UNIQUE_CHECKS_F);
4727
/* if bitmap_init fails, caught in is_valid() */
4728
if (likely(!bitmap_init(&m_cols,
4729
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4733
/* Cols can be zero if this is a dummy binrows event */
4734
if (likely(cols != NULL))
4736
memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
4737
create_last_word_mask(&m_cols);
4742
// Needed because bitmap_init() does not set it to null on failure
4748
Rows_log_event::Rows_log_event(const char *buf, uint32_t event_len,
4749
Log_event_type event_type,
4750
const Format_description_log_event
4752
: Log_event(buf, description_event),
4755
m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
4756
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4758
uint8_t const common_header_len= description_event->common_header_len;
4759
uint8_t const post_header_len= description_event->post_header_len[event_type-1];
4761
const char *post_start= buf + common_header_len;
4762
post_start+= RW_MAPID_OFFSET;
4763
if (post_header_len == 6)
4765
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
4766
m_table_id= uint4korr(post_start);
4771
m_table_id= (ulong) uint6korr(post_start);
4772
post_start+= RW_FLAGS_OFFSET;
4775
m_flags= uint2korr(post_start);
4777
unsigned char const *const var_start=
4778
(const unsigned char *)buf + common_header_len + post_header_len;
4779
unsigned char const *const ptr_width= var_start;
4780
unsigned char *ptr_after_width= (unsigned char*) ptr_width;
4781
m_width = net_field_length(&ptr_after_width);
4782
/* if bitmap_init fails, catched in is_valid() */
4783
if (likely(!bitmap_init(&m_cols,
4784
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4788
memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
4789
create_last_word_mask(&m_cols);
4790
ptr_after_width+= (m_width + 7) / 8;
4794
// Needed because bitmap_init() does not set it to null on failure
4795
m_cols.bitmap= NULL;
4799
m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
4801
if (event_type == UPDATE_ROWS_EVENT)
4803
/* if bitmap_init fails, caught in is_valid() */
4804
if (likely(!bitmap_init(&m_cols_ai,
4805
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
4809
memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
4810
create_last_word_mask(&m_cols_ai);
4811
ptr_after_width+= (m_width + 7) / 8;
4815
// Needed because bitmap_init() does not set it to null on failure
4816
m_cols_ai.bitmap= 0;
4821
const unsigned char* const ptr_rows_data= (const unsigned char*) ptr_after_width;
4823
size_t const data_size= event_len - (ptr_rows_data - (const unsigned char *) buf);
4825
m_rows_buf= (unsigned char*) my_malloc(data_size, MYF(MY_WME));
4826
if (likely((bool)m_rows_buf))
4828
m_curr_row= m_rows_buf;
4829
m_rows_end= m_rows_buf + data_size;
4830
m_rows_cur= m_rows_end;
4831
memcpy(m_rows_buf, ptr_rows_data, data_size);
4834
m_cols.bitmap= 0; // to not free it
4839
Rows_log_event::~Rows_log_event()
4841
if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
4842
m_cols.bitmap= 0; // so no free in bitmap_free
4843
bitmap_free(&m_cols); // To pair with bitmap_init().
4844
free((unsigned char*)m_rows_buf);
4847
int Rows_log_event::get_data_size()
4849
int const type_code= get_type_code();
4851
unsigned char buf[sizeof(m_width)+1];
4852
unsigned char *end= net_store_length(buf, (m_width + 7) / 8);
4854
int data_size= ROWS_HEADER_LEN;
4855
data_size+= no_bytes_in_map(&m_cols);
4856
data_size+= end - buf;
4858
if (type_code == UPDATE_ROWS_EVENT)
4859
data_size+= no_bytes_in_map(&m_cols_ai);
4861
data_size+= (m_rows_cur - m_rows_buf);
4866
int Rows_log_event::do_add_row_data(unsigned char *row_data, size_t length)
4869
When the table has a primary key, we would probably want, by default, to
4870
log only the primary key value instead of the entire "before image". This
4871
would save binlog space. TODO
4875
If length is zero, there is nothing to write, so we just
4876
return. Note that this is not an optimization, since calling
4877
realloc() with size 0 means free().
4885
assert(m_rows_buf <= m_rows_cur);
4886
assert(!m_rows_buf || (m_rows_end && m_rows_buf <= m_rows_end));
4887
assert(m_rows_cur <= m_rows_end);
4889
/* The cast will always work since m_rows_cur <= m_rows_end */
4890
if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
4892
size_t const block_size= 1024;
4893
my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
4894
my_ptrdiff_t const new_alloc=
4895
block_size * ((cur_size + length + block_size - 1) / block_size);
4897
unsigned char* const new_buf= (unsigned char*)my_realloc((unsigned char*)m_rows_buf, (uint) new_alloc,
4898
MYF(MY_ALLOW_ZERO_PTR|MY_WME));
4899
if (unlikely(!new_buf))
4900
return(HA_ERR_OUT_OF_MEM);
4902
/* If the memory moved, we need to move the pointers */
4903
if (new_buf != m_rows_buf)
4905
m_rows_buf= new_buf;
4906
m_rows_cur= m_rows_buf + cur_size;
4910
The end pointer should always be changed to point to the end of
4911
the allocated memory.
4913
m_rows_end= m_rows_buf + new_alloc;
4916
assert(m_rows_cur + length <= m_rows_end);
4917
memcpy(m_rows_cur, row_data, length);
4918
m_rows_cur+= length;
4923
int Rows_log_event::do_apply_event(Relay_log_info const *rli)
4927
If m_table_id == UINT32_MAX, then we have a dummy event that does not
4928
contain any data. In that case, we just remove all tables in the
4929
tables_to_lock list, close the thread tables, and return with
4932
if (m_table_id == UINT32_MAX)
4935
This one is supposed to be set: just an extra check so that
4936
nothing strange has happened.
4938
assert(get_flags(STMT_END_F));
4940
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4941
close_thread_tables(session);
4942
session->clear_error();
4947
'session' has been set by exec_relay_log_event(), just before calling
4948
do_apply_event(). We still check here to prevent future coding
4951
assert(rli->sql_session == session);
4954
If there is no locks taken, this is the first binrow event seen
4955
after the table map events. We should then lock all the tables
4956
used in the transaction and proceed with execution of the actual
4961
bool need_reopen= 1; /* To execute the first lap of the loop below */
4964
lock_tables() reads the contents of session->lex, so they must be
4965
initialized. Contrary to in
4966
Table_map_log_event::do_apply_event() we don't call
4967
mysql_init_query() as that may reset the binlog format.
4972
There are a few flags that are replicated with each row event.
4973
Make sure to set/clear them before executing the main body of
4976
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
4977
session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
4979
session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
4981
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
4982
session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
4984
session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
4985
/* A small test to verify that objects have consistent types */
4986
assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
4989
while ((error= lock_tables(session, rli->tables_to_lock,
4990
rli->tables_to_lock_count, &need_reopen)))
4994
if (session->is_slave_error || session->is_fatal_error)
4997
Error reporting borrowed from Query_log_event with many excessive
4998
simplifications (we don't honour --slave-skip-errors)
5000
uint32_t actual_error= session->main_da.sql_errno();
5001
rli->report(ERROR_LEVEL, actual_error,
5002
_("Error '%s' in %s event: when locking tables"),
5004
? session->main_da.message()
5005
: _("unexpected success or fatal error")),
5007
session->is_fatal_error= 1;
5011
rli->report(ERROR_LEVEL, error,
5012
_("Error in %s event: when locking tables"),
5015
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5020
So we need to reopen the tables.
5022
We need to flush the pending RBR event, since it keeps a
5023
pointer to an open table.
5025
ALTERNATIVE SOLUTION (not implemented): Extract a pointer to
5026
the pending RBR event and reset the table pointer after the
5027
tables has been reopened.
5029
NOTE: For this new scheme there should be no pending event:
5030
need to add code to assert that is the case.
5032
session->binlog_flush_pending_rows_event(false);
5033
TableList *tables= rli->tables_to_lock;
5034
close_tables_for_reopen(session, &tables);
5036
uint32_t tables_count= rli->tables_to_lock_count;
5037
if ((error= open_tables(session, &tables, &tables_count, 0)))
5039
if (session->is_slave_error || session->is_fatal_error)
5042
Error reporting borrowed from Query_log_event with many excessive
5043
simplifications (we don't honour --slave-skip-errors)
5045
uint32_t actual_error= session->main_da.sql_errno();
5046
rli->report(ERROR_LEVEL, actual_error,
5047
_("Error '%s' on reopening tables"),
5049
? session->main_da.message()
5050
: _("unexpected success or fatal error")));
5051
session->is_slave_error= 1;
5053
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5059
When the open and locking succeeded, we check all tables to
5060
ensure that they still have the correct type.
5062
We can use a down cast here since we know that every table added
5063
to the tables_to_lock is a RPL_TableList.
5067
RPL_TableList *ptr= rli->tables_to_lock;
5068
for ( ; ptr ; ptr= static_cast<RPL_TableList*>(ptr->next_global))
5070
if (ptr->m_tabledef.compatible_with(rli, ptr->table))
5072
mysql_unlock_tables(session, session->lock);
5074
session->is_slave_error= 1;
5075
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5076
return(ERR_BAD_TABLE_DEF);
5082
... and then we add all the tables to the table map and remove
5083
them from tables to lock.
5085
We also invalidate the query cache for all the tables, since
5086
they will now be changed.
5088
TODO [/Matz]: Maybe the query cache should not be invalidated
5089
here? It might be that a table is not changed, even though it
5090
was locked for the statement. We do know that each
5091
Rows_log_event contain at least one row, so after processing one
5092
Rows_log_event, we can invalidate the query cache for the
5095
for (TableList *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
5097
const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
5103
m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
5108
table == NULL means that this table should not be replicated
5109
(this was set up by Table_map_log_event::do_apply_event()
5110
which tested replicate-* rules).
5114
It's not needed to set_time() but
5115
1) it continues the property that "Time" in SHOW PROCESSLIST shows how
5116
much slave is behind
5117
2) it will be needed when we allow replication from a table with no
5118
TIMESTAMP column to a table with one.
5119
So we call set_time(), like in SBR. Presently it changes nothing.
5121
session->set_time((time_t)when);
5123
There are a few flags that are replicated with each row event.
5124
Make sure to set/clear them before executing the main body of
5127
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
5128
session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
5130
session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
5132
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
5133
session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
5135
session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
5137
if (slave_allow_batching)
5138
session->options|= OPTION_ALLOW_BATCH;
5140
session->options&= ~OPTION_ALLOW_BATCH;
5142
/* A small test to verify that objects have consistent types */
5143
assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
5146
Now we are in a statement and will stay in a statement until we
5149
We set this flag here, before actually applying any rows, in
5150
case the SQL thread is stopped and we need to detect that we're
5151
inside a statement and halting abruptly might cause problems
5154
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
5156
if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
5157
set_flags(COMPLETE_ROWS_F);
5160
Set tables write and read sets.
5162
Read_set contains all slave columns (in case we are going to fetch
5163
a complete record from slave)
5165
Write_set equals the m_cols bitmap sent from master but it can be
5166
longer if slave has extra columns.
5169
bitmap_set_all(table->read_set);
5170
bitmap_set_all(table->write_set);
5171
if (!get_flags(COMPLETE_ROWS_F))
5172
bitmap_intersect(table->write_set,&m_cols);
5174
this->slave_exec_mode= slave_exec_mode_options; // fix the mode
5176
// Do event specific preparations
5177
error= do_before_row_operations(rli);
5179
// row processing loop
5181
while (error == 0 && m_curr_row < m_rows_end)
5183
/* in_use can have been set to NULL in close_tables_for_reopen */
5184
Session* old_session= table->in_use;
5186
table->in_use= session;
5188
error= do_exec_row(rli);
5190
table->in_use = old_session;
5196
The following list of "idempotent" errors
5197
means that an error from the list might happen
5198
because of idempotent (more than once)
5199
applying of a binlog file.
5200
Notice, that binlog has a ddl operation its
5201
second applying may cause
5203
case HA_ERR_TABLE_DEF_CHANGED:
5204
case HA_ERR_CANNOT_ADD_FOREIGN:
5206
which are not included into to the list.
5208
case HA_ERR_RECORD_CHANGED:
5209
case HA_ERR_RECORD_DELETED:
5210
case HA_ERR_KEY_NOT_FOUND:
5211
case HA_ERR_END_OF_FILE:
5212
case HA_ERR_FOUND_DUPP_KEY:
5213
case HA_ERR_FOUND_DUPP_UNIQUE:
5214
case HA_ERR_FOREIGN_DUPLICATE_KEY:
5215
case HA_ERR_NO_REFERENCED_ROW:
5216
case HA_ERR_ROW_IS_REFERENCED:
5217
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5219
if (global_system_variables.log_warnings)
5220
slave_rows_error_report(WARNING_LEVEL, error, rli, session, table,
5222
RPL_LOG_NAME, (ulong) log_pos);
5228
session->is_slave_error= 1;
5233
If m_curr_row_end was not set during event execution (e.g., because
5234
of errors) we can't proceed to the next row. If the error is transient
5235
(i.e., error==0 at this point) we must call unpack_current_row() to set
5238
if (!m_curr_row_end && !error)
5239
unpack_current_row(rli, &m_cols);
5241
// at this moment m_curr_row_end should be set
5242
assert(error || m_curr_row_end != NULL);
5243
assert(error || m_curr_row < m_curr_row_end);
5244
assert(error || m_curr_row_end <= m_rows_end);
5246
m_curr_row= m_curr_row_end;
5248
} // row processing loop
5250
error= do_after_row_operations(rli, error);
5253
session->options|= OPTION_KEEP_LOG;
5258
We need to delay this clear until here bacause unpack_current_row() uses
5259
master-side table definitions stored in rli.
5261
if (rli->tables_to_lock && get_flags(STMT_END_F))
5262
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5263
/* reset OPTION_ALLOW_BATCH as not affect later events */
5264
session->options&= ~OPTION_ALLOW_BATCH;
5267
{ /* error has occured during the transaction */
5268
slave_rows_error_report(ERROR_LEVEL, error, rli, session, table,
5269
get_type_str(), RPL_LOG_NAME, (ulong) log_pos);
5274
If one day we honour --skip-slave-errors in row-based replication, and
5275
the error should be skipped, then we would clear mappings, rollback,
5276
close tables, but the slave SQL thread would not stop and then may
5277
assume the mapping is still available, the tables are still open...
5278
So then we should clear mappings/rollback/close here only if this is a
5280
For now we code, knowing that error is not skippable and so slave SQL
5281
thread is certainly going to stop.
5282
rollback at the caller along with sbr.
5284
session->reset_current_stmt_binlog_row_based();
5285
const_cast<Relay_log_info*>(rli)->cleanup_context(session, error);
5286
session->is_slave_error= 1;
5291
This code would ideally be placed in do_update_pos() instead, but
5292
since we have no access to table there, we do the setting of
5293
last_event_start_time here instead.
5295
if (table && (table->s->primary_key == MAX_KEY) &&
5296
!cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS)
5299
------------ Temporary fix until WL#2975 is implemented ---------
5301
This event is not the last one (no STMT_END_F). If we stop now
5302
(in case of terminate_slave_thread()), how will we restart? We
5303
have to restart from Table_map_log_event, but as this table is
5304
not transactional, the rows already inserted will still be
5305
present, and idempotency is not guaranteed (no PK) so we risk
5306
that repeating leads to double insert. So we desperately try to
5307
continue, hope we'll eventually leave this buggy situation (by
5308
executing the final Rows_log_event). If we are in a hopeless
5309
wait (reached end of last relay log and nothing gets appended
5310
there), we timeout after one minute, and notify DBA about the
5311
problem. When WL#2975 is implemented, just remove the member
5312
Relay_log_info::last_event_start_time and all its occurrences.
5314
const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
5320
Log_event::enum_skip_reason
5321
Rows_log_event::do_shall_skip(Relay_log_info *rli)
5324
If the slave skip counter is 1 and this event does not end a
5325
statement, then we should not start executing on the next event.
5326
Otherwise, we defer the decision to the normal skipping logic.
5328
if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
5329
return Log_event::EVENT_SKIP_IGNORE;
5331
return Log_event::do_shall_skip(rli);
5335
Rows_log_event::do_update_pos(Relay_log_info *rli)
5339
if (get_flags(STMT_END_F))
5342
This is the end of a statement or transaction, so close (and
5343
unlock) the tables we opened when processing the
5344
Table_map_log_event starting the statement.
5346
OBSERVER. This will clear *all* mappings, not only those that
5347
are open for the table. There is not good handle for on-close
5350
NOTE. Even if we have no table ('table' == 0) we still need to be
5351
here, so that we increase the group relay log position. If we didn't, we
5352
could have a group relay log position which lags behind "forever"
5353
(assume the last master's transaction is ignored by the slave because of
5354
replicate-ignore rules).
5356
session->binlog_flush_pending_rows_event(true);
5359
If this event is not in a transaction, the call below will, if some
5360
transactional storage engines are involved, commit the statement into
5361
them and flush the pending event to binlog.
5362
If this event is in a transaction, the call will do nothing, but a
5363
Xid_log_event will come next which will, if some transactional engines
5364
are involved, commit the transaction and flush the pending event to the
5367
error= ha_autocommit_or_rollback(session, 0);
5370
Now what if this is not a transactional engine? we still need to
5371
flush the pending event to the binlog; we did it with
5372
session->binlog_flush_pending_rows_event(). Note that we imitate
5373
what is done for real queries: a call to
5374
ha_autocommit_or_rollback() (sometimes only if involves a
5375
transactional engine), and a call to be sure to have the pending
5379
session->reset_current_stmt_binlog_row_based();
5381
rli->cleanup_context(session, 0);
5385
Indicate that a statement is finished.
5386
Step the group log position if we are not in a transaction,
5387
otherwise increase the event log position.
5389
rli->stmt_done(log_pos, when);
5392
Clear any errors pushed in session->net.last_err* if for example "no key
5393
found" (as this is allowed). This is a safety measure; apparently
5394
those errors (e.g. when executing a Delete_rows_log_event of a
5395
non-existing row, like in rpl_row_mystery22.test,
5396
session->net.last_error = "Can't find record in 't1'" and last_errno=1032)
5397
do not become visible. We still prefer to wipe them out.
5399
session->clear_error();
5402
rli->report(ERROR_LEVEL, error,
5403
_("Error in %s event: commit of row events failed, "
5405
get_type_str(), m_table->s->db.str,
5406
m_table->s->table_name.str);
5410
rli->inc_event_relay_log_pos();
5416
bool Rows_log_event::write_data_header(IO_CACHE *file)
5418
unsigned char buf[ROWS_HEADER_LEN]; // No need to init the buffer
5419
assert(m_table_id != UINT32_MAX);
5420
int6store(buf + RW_MAPID_OFFSET, (uint64_t)m_table_id);
5421
int2store(buf + RW_FLAGS_OFFSET, m_flags);
5422
return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
5425
bool Rows_log_event::write_data_body(IO_CACHE*file)
5428
Note that this should be the number of *bits*, not the number of
5431
unsigned char sbuf[sizeof(m_width)];
5432
my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
5434
unsigned char *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
5435
assert(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
5437
res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
5439
res= res || my_b_safe_write(file, (unsigned char*) m_cols.bitmap,
5440
no_bytes_in_map(&m_cols));
5442
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
5444
if (get_type_code() == UPDATE_ROWS_EVENT)
5446
res= res || my_b_safe_write(file, (unsigned char*) m_cols_ai.bitmap,
5447
no_bytes_in_map(&m_cols_ai));
5449
res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
5456
void Rows_log_event::pack_info(Protocol *protocol)
5459
char const *const flagstr=
5460
get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
5461
size_t bytes= snprintf(buf, sizeof(buf),
5462
"table_id: %lu%s", m_table_id, flagstr);
5463
protocol->store(buf, bytes, &my_charset_bin);
5467
/**************************************************************************
5468
Table_map_log_event member functions and support functions
5469
**************************************************************************/
5472
@page How replication of field metadata works.
5474
When a table map is created, the master first calls
5475
Table_map_log_event::save_field_metadata() which calculates how many
5476
values will be in the field metadata. Only those fields that require the
5477
extra data are added. The method also loops through all of the fields in
5478
the table calling the method Field::save_field_metadata() which returns the
5479
values for the field that will be saved in the metadata and replicated to
5480
the slave. Once all fields have been processed, the table map is written to
5481
the binlog adding the size of the field metadata and the field metadata to
5482
the end of the body of the table map.
5484
When a table map is read on the slave, the field metadata is read from the
5485
table map and passed to the table_def class constructor which saves the
5486
field metadata from the table map into an array based on the type of the
5487
field. Field metadata values not present (those fields that do not use extra
5488
data) in the table map are initialized as zero (0). The array size is the
5489
same as the columns for the table on the slave.
5491
Additionally, values saved for field metadata on the master are saved as a
5492
string of bytes (unsigned char) in the binlog. A field may require 1 or more bytes
5493
to store the information. In cases where values require multiple bytes
5494
(e.g. values > 255), the endian-safe methods are used to properly encode
5495
the values on the master and decode them on the slave. When the field
5496
metadata values are captured on the slave, they are stored in an array of
5497
type uint16_t. This allows the least number of casts to prevent casting bugs
5498
when the field metadata is used in comparisons of field attributes. When
5499
the field metadata is used for calculating addresses in pointer math, the
5500
type used is uint32_t.
5504
Save the field metadata based on the real_type of the field.
5505
The metadata saved depends on the type of the field. Some fields
5506
store a single byte for pack_length() while others store two bytes
5507
for field_length (max length).
5512
We may want to consider changing the encoding of the information.
5513
Currently, the code attempts to minimize the number of bytes written to
5514
the tablemap. There are at least two other alternatives; 1) using
5515
net_store_length() to store the data allowing it to choose the number of
5516
bytes that are appropriate thereby making the code much easier to
5517
maintain (only 1 place to change the encoding), or 2) use a fixed number
5518
of bytes for each field. The problem with option 1 is that net_store_length()
5519
will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
5520
for fields like CHAR which can be no larger than 255 characters, the method
5521
will use 3 bytes when the value is > 250. Further, every value that is
5522
encoded using 2 parts (e.g., pack_length, field_length) will be numerically
5523
> 250 therefore will use 3 bytes for eah value. The problem with option 2
5524
is less wasteful for space but does waste 1 byte for every field that does
5527
int Table_map_log_event::save_field_metadata()
5530
for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
5531
index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
5536
Constructor used to build an event for writing to the binary log.
5537
Mats says tbl->s lives longer than this event so it's ok to copy pointers
5538
(tbl->s->db etc) and not pointer content.
5540
Table_map_log_event::Table_map_log_event(Session *session, Table *tbl,
5541
ulong tid, bool, uint16_t flags)
5542
: Log_event(session, 0, true),
5544
m_dbnam(tbl->s->db.str),
5545
m_dblen(m_dbnam ? tbl->s->db.length : 0),
5546
m_tblnam(tbl->s->table_name.str),
5547
m_tbllen(tbl->s->table_name.length),
5548
m_colcnt(tbl->s->fields),
5553
m_field_metadata(0),
5554
m_field_metadata_size(0),
5558
assert(m_table_id != UINT32_MAX);
5560
In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
5561
table.cc / alloc_table_share():
5562
Use the fact the key is db/0/table_name/0
5563
As we rely on this let's assert it.
5565
assert((tbl->s->db.str == 0) ||
5566
(tbl->s->db.str[tbl->s->db.length] == 0));
5567
assert(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
5570
m_data_size= TABLE_MAP_HEADER_LEN;
5571
m_data_size+= m_dblen + 2; // Include length and terminating \0
5572
m_data_size+= m_tbllen + 2; // Include length and terminating \0
5573
m_data_size+= 1 + m_colcnt; // COLCNT and column types
5575
/* If malloc fails, caught in is_valid() */
5576
if ((m_memory= (unsigned char*) my_malloc(m_colcnt, MYF(MY_WME))))
5578
m_coltype= reinterpret_cast<unsigned char*>(m_memory);
5579
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
5580
m_coltype[i]= m_table->field[i]->type();
5584
Calculate a bitmap for the results of maybe_null() for all columns.
5585
The bitmap is used to determine when there is a column from the master
5586
that is not on the slave and is null and thus not in the row data during
5589
uint32_t num_null_bytes= (m_table->s->fields + 7) / 8;
5590
m_data_size+= num_null_bytes;
5591
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
5592
&m_null_bits, num_null_bytes,
5593
&m_field_metadata, (m_colcnt * 2),
5596
memset(m_field_metadata, 0, (m_colcnt * 2));
5599
Create an array for the field metadata and store it.
5601
m_field_metadata_size= save_field_metadata();
5602
assert(m_field_metadata_size <= (m_colcnt * 2));
5605
Now set the size of the data to the size of the field metadata array
5606
plus one or two bytes for number of elements in the field metadata array.
5608
if (m_field_metadata_size > 255)
5609
m_data_size+= m_field_metadata_size + 2;
5611
m_data_size+= m_field_metadata_size + 1;
5613
memset(m_null_bits, 0, num_null_bytes);
5614
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
5615
if (m_table->field[i]->maybe_null())
5616
m_null_bits[(i / 8)]+= 1 << (i % 8);
5622
Constructor used by slave to read the event from the binary log.
5624
Table_map_log_event::Table_map_log_event(const char *buf, uint32_t event_len,
5625
const Format_description_log_event
5628
: Log_event(buf, description_event),
5630
m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
5631
m_colcnt(0), m_coltype(0),
5632
m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
5633
m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
5634
m_null_bits(0), m_meta_memory(NULL)
5636
unsigned int bytes_read= 0;
5638
uint8_t common_header_len= description_event->common_header_len;
5639
uint8_t post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
5641
/* Read the post-header */
5642
const char *post_start= buf + common_header_len;
5644
post_start+= TM_MAPID_OFFSET;
5645
if (post_header_len == 6)
5647
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
5648
m_table_id= uint4korr(post_start);
5653
assert(post_header_len == TABLE_MAP_HEADER_LEN);
5654
m_table_id= (ulong) uint6korr(post_start);
5655
post_start+= TM_FLAGS_OFFSET;
5658
assert(m_table_id != UINT32_MAX);
5660
m_flags= uint2korr(post_start);
5662
/* Read the variable part of the event */
5663
const char *const vpart= buf + common_header_len + post_header_len;
5665
/* Extract the length of the various parts from the buffer */
5666
unsigned char const *const ptr_dblen= (unsigned char const*)vpart + 0;
5667
m_dblen= *(unsigned char*) ptr_dblen;
5669
/* Length of database name + counter + terminating null */
5670
unsigned char const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
5671
m_tbllen= *(unsigned char*) ptr_tbllen;
5673
/* Length of table name + counter + terminating null */
5674
unsigned char const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
5675
unsigned char *ptr_after_colcnt= (unsigned char*) ptr_colcnt;
5676
m_colcnt= net_field_length(&ptr_after_colcnt);
5678
/* Allocate mem for all fields in one go. If fails, caught in is_valid() */
5679
m_memory= (unsigned char*) my_multi_malloc(MYF(MY_WME),
5680
&m_dbnam, (uint) m_dblen + 1,
5681
&m_tblnam, (uint) m_tbllen + 1,
5682
&m_coltype, (uint) m_colcnt,
5687
/* Copy the different parts into their memory */
5688
strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
5689
strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
5690
memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
5692
ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
5693
bytes_read= ptr_after_colcnt - (unsigned char *)buf;
5694
if (bytes_read < event_len)
5696
m_field_metadata_size= net_field_length(&ptr_after_colcnt);
5697
assert(m_field_metadata_size <= (m_colcnt * 2));
5698
uint32_t num_null_bytes= (m_colcnt + 7) / 8;
5699
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
5700
&m_null_bits, num_null_bytes,
5701
&m_field_metadata, m_field_metadata_size,
5703
memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
5704
ptr_after_colcnt= (unsigned char*)ptr_after_colcnt + m_field_metadata_size;
5705
memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
5712
Table_map_log_event::~Table_map_log_event()
5714
free(m_meta_memory);
5719
Return value is an error code, one of:
5721
-1 Failure to open table [from open_tables()]
5723
1 No room for more tables [from set_table()]
5724
2 Out of memory [from set_table()]
5725
3 Wrong table definition
5726
4 Daisy-chaining RBR with SBR not possible
5729
int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
5731
RPL_TableList *table_list;
5732
char *db_mem, *tname_mem;
5733
Query_id &query_id= Query_id::get_query_id();
5736
assert(rli->sql_session == session);
5738
/* Step the query id to mark what columns that are actually used. */
5739
session->query_id= query_id.next();
5741
if (!(memory= my_multi_malloc(MYF(MY_WME),
5742
&table_list, (uint) sizeof(RPL_TableList),
5743
&db_mem, (uint) NAME_LEN + 1,
5744
&tname_mem, (uint) NAME_LEN + 1,
5746
return(HA_ERR_OUT_OF_MEM);
5748
memset(table_list, 0, sizeof(*table_list));
5749
table_list->db = db_mem;
5750
table_list->alias= table_list->table_name = tname_mem;
5751
table_list->lock_type= TL_WRITE;
5752
table_list->next_global= table_list->next_local= 0;
5753
table_list->table_id= m_table_id;
5754
table_list->updating= 1;
5755
my_stpcpy(table_list->db, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
5756
my_stpcpy(table_list->table_name, m_tblnam);
5760
if (!rpl_filter->db_ok(table_list->db) ||
5761
(rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list)))
5768
open_tables() reads the contents of session->lex, so they must be
5769
initialized, so we should call lex_start(); to be even safer, we
5770
call mysql_init_query() which does a more complete set of inits.
5773
mysql_reset_session_for_next_command(session);
5775
Check if the slave is set to use SBR. If so, it should switch
5776
to using RBR until the end of the "statement", i.e., next
5777
STMT_END_F or next error.
5779
if (!session->current_stmt_binlog_row_based &&
5780
mysql_bin_log.is_open() && (session->options & OPTION_BIN_LOG))
5782
session->set_current_stmt_binlog_row_based();
5786
Open the table if it is not already open and add the table to
5787
table map. Note that for any table that should not be
5788
replicated, a filter is needed.
5790
The creation of a new TableList is used to up-cast the
5791
table_list consisting of RPL_TableList items. This will work
5792
since the only case where the argument to open_tables() is
5793
changed, is when session->lex->query_tables == table_list, i.e.,
5794
when the statement requires prelocking. Since this is not
5795
executed when a statement is executed, this case will not occur.
5796
As a precaution, an assertion is added to ensure that the bad
5799
Either way, the memory in the list is *never* released
5800
internally in the open_tables() function, hence we take a copy
5801
of the pointer to make sure that it's not lost.
5804
assert(session->lex->query_tables != table_list);
5805
TableList *tmp_table_list= table_list;
5806
if ((error= open_tables(session, &tmp_table_list, &count, 0)))
5808
if (session->is_slave_error || session->is_fatal_error)
5811
Error reporting borrowed from Query_log_event with many excessive
5812
simplifications (we don't honour --slave-skip-errors)
5814
uint32_t actual_error= session->main_da.sql_errno();
5815
rli->report(ERROR_LEVEL, actual_error,
5816
_("Error '%s' on opening table `%s`.`%s`"),
5818
? session->main_da.message()
5819
: _("unexpected success or fatal error")),
5820
table_list->db, table_list->table_name);
5821
session->is_slave_error= 1;
5826
m_table= table_list->table;
5829
This will fail later otherwise, the 'in_use' field should be
5830
set to the current thread.
5832
assert(m_table->in_use);
5835
Use placement new to construct the table_def instance in the
5836
memory allocated for it inside table_list.
5838
The memory allocated by the table_def structure (i.e., not the
5839
memory allocated *for* the table_def structure) is released
5840
inside Relay_log_info::clear_tables_to_lock() by calling the
5841
table_def destructor explicitly.
5843
new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt,
5844
m_field_metadata, m_field_metadata_size, m_null_bits);
5845
table_list->m_tabledef_valid= true;
5848
We record in the slave's information that the table should be
5849
locked by linking the table into the list of tables to lock.
5851
table_list->next_global= table_list->next_local= rli->tables_to_lock;
5852
const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
5853
const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
5854
/* 'memory' is freed in clear_tables_to_lock */
5864
Log_event::enum_skip_reason
5865
Table_map_log_event::do_shall_skip(Relay_log_info *rli)
5868
If the slave skip counter is 1, then we should not start executing
5871
return continue_group(rli);
5874
int Table_map_log_event::do_update_pos(Relay_log_info *rli)
5876
rli->inc_event_relay_log_pos();
5881
bool Table_map_log_event::write_data_header(IO_CACHE *file)
5883
assert(m_table_id != UINT32_MAX);
5884
unsigned char buf[TABLE_MAP_HEADER_LEN];
5885
int6store(buf + TM_MAPID_OFFSET, (uint64_t)m_table_id);
5886
int2store(buf + TM_FLAGS_OFFSET, m_flags);
5887
return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
5890
bool Table_map_log_event::write_data_body(IO_CACHE *file)
5892
assert(m_dbnam != NULL);
5893
assert(m_tblnam != NULL);
5894
/* We use only one byte per length for storage in event: */
5895
assert(m_dblen < 128);
5896
assert(m_tbllen < 128);
5898
unsigned char const dbuf[]= { (unsigned char) m_dblen };
5899
unsigned char const tbuf[]= { (unsigned char) m_tbllen };
5901
unsigned char cbuf[sizeof(m_colcnt)];
5902
unsigned char *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
5903
assert(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
5906
Store the size of the field metadata.
5908
unsigned char mbuf[sizeof(m_field_metadata_size)];
5909
unsigned char *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
5911
return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
5912
my_b_safe_write(file, (const unsigned char*)m_dbnam, m_dblen+1) ||
5913
my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
5914
my_b_safe_write(file, (const unsigned char*)m_tblnam, m_tbllen+1) ||
5915
my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
5916
my_b_safe_write(file, m_coltype, m_colcnt) ||
5917
my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
5918
my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
5919
my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
5924
Print some useful information for the SHOW BINARY LOG information
5928
void Table_map_log_event::pack_info(Protocol *protocol)
5931
size_t bytes= snprintf(buf, sizeof(buf),
5932
"table_id: %lu (%s.%s)",
5933
m_table_id, m_dbnam, m_tblnam);
5934
protocol->store(buf, bytes, &my_charset_bin);
5938
/**************************************************************************
5939
Write_rows_log_event member functions
5940
**************************************************************************/
5943
Constructor used to build an event for writing to the binary log.
5945
Write_rows_log_event::Write_rows_log_event(Session *session_arg, Table *tbl_arg,
5947
bool is_transactional)
5948
: Rows_log_event(session_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
5953
Constructor used by slave to read the event from the binary log.
5955
Write_rows_log_event::Write_rows_log_event(const char *buf, uint32_t event_len,
5956
const Format_description_log_event
5958
: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
5963
Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
5968
todo: to introduce a property for the event (handler?) which forces
5969
applying the event in the replace (idempotent) fashion.
5971
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5974
We are using REPLACE semantics and not INSERT IGNORE semantics
5975
when writing rows, that is: new rows replace old rows. We need to
5976
inform the storage engine that it should use this behaviour.
5979
/* Tell the storage engine that we are using REPLACE semantics. */
5980
session->lex->duplicates= DUP_REPLACE;
5983
Pretend we're executing a REPLACE command: this is needed for
5984
InnoDB since it is not (properly) checking the
5985
lex->duplicates flag.
5987
session->lex->sql_command= SQLCOM_REPLACE;
5989
Do not raise the error flag in case of hitting to an unique attribute
5991
m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
5994
m_table->file->ha_start_bulk_insert(0);
5996
We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
5997
any TIMESTAMP column with data from the row but instead will use
5998
the event's current time.
5999
As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
6000
columns, we know that all TIMESTAMP columns on slave will receive explicit
6001
data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
6002
When we allow a table without TIMESTAMP to be replicated to a table having
6003
more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
6004
column to be replicated into a BIGINT column and the slave's table has a
6005
TIMESTAMP column, then the slave's TIMESTAMP column will take its value
6006
from set_time() which we called earlier (consistent with SBR). And then in
6007
some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
6008
analyze if explicit data is provided for slave's TIMESTAMP columns).
6010
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
6016
Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6020
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
6022
m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
6023
m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
6025
resetting the extra with
6026
table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
6028
explanation: file->reset() performs this duty
6029
ultimately. Still todo: fix
6032
if ((local_error= m_table->file->ha_end_bulk_insert()))
6034
m_table->file->print_error(local_error, MYF(0));
6036
return error? error : local_error;
6041
Check if there are more UNIQUE keys after the given key.
6044
last_uniq_key(Table *table, uint32_t keyno)
6046
while (++keyno < table->s->keys)
6047
if (table->key_info[keyno].flags & HA_NOSAME)
6053
Check if an error is a duplicate key error.
6055
This function is used to check if an error code is one of the
6056
duplicate key error, i.e., and error code for which it is sensible
6057
to do a <code>get_dup_key()</code> to retrieve the duplicate key.
6059
@param errcode The error code to check.
6061
@return <code>true</code> if the error code is such that
6062
<code>get_dup_key()</code> will return true, <code>false</code>
6066
is_duplicate_key_error(int errcode)
6070
case HA_ERR_FOUND_DUPP_KEY:
6071
case HA_ERR_FOUND_DUPP_UNIQUE:
6078
Write the current row into event's table.
6080
The row is located in the row buffer, pointed by @c m_curr_row member.
6081
Number of columns of the row is stored in @c m_width member (it can be
6082
different from the number of columns in the table to which we insert).
6083
Bitmap @c m_cols indicates which columns are present in the row. It is assumed
6084
that event's table is already open and pointed by @c m_table.
6086
If the same record already exists in the table it can be either overwritten
6087
or an error is reported depending on the value of @c overwrite flag
6088
(error reporting not yet implemented). Note that the matching record can be
6089
different from the row we insert if we use primary keys to identify records in
6092
The row to be inserted can contain values only for selected columns. The
6093
missing columns are filled with default values using @c prepare_record()
6094
function. If a matching record is found in the table and @c overwritte is
6095
true, the missing columns are taken from it.
6097
@param rli Relay log info (needed for row unpacking).
6099
Shall we overwrite if the row already exists or signal
6100
error (currently ignored).
6102
@returns Error code on failure, 0 on success.
6104
This method, if successful, sets @c m_curr_row_end pointer to point at the
6105
next row in the rows buffer. This is done when unpacking the row to be
6108
@note If a matching record is found, it is either updated using
6109
@c ha_update_row() or first deleted and then new record written.
6113
Rows_log_event::write_row(const Relay_log_info *const rli,
6114
const bool overwrite)
6116
assert(m_table != NULL && session != NULL);
6118
Table *table= m_table; // pointer to event's table
6121
auto_afree_ptr<char> key(NULL);
6123
/* fill table->record[0] with default values */
6126
We only check if the columns have default values for non-NDB
6127
engines, for NDB we ignore the check since updates are sent as
6128
writes, causing errors when trying to prepare the record.
6130
TODO[ndb]: Elimiate this hard-coded dependency on NDB. Ideally,
6131
the engine should be able to set a flag that it want the default
6132
values filled in and one flag to handle the case that the default
6133
values should be checked. Maybe these two flags can be combined.
6135
if ((error= prepare_record(table, &m_cols, m_width, true)))
6138
/* unpack row into table->record[0] */
6139
error= unpack_current_row(rli, &m_cols);
6141
// Temporary fix to find out why it fails [/Matz]
6142
memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
6145
Try to write record. If a corresponding record already exists in the table,
6146
we try to change it using ha_update_row() if possible. Otherwise we delete
6147
it and repeat the whole process again.
6149
TODO: Add safety measures against infinite looping.
6152
while ((error= table->file->ha_write_row(table->record[0])))
6154
if (error == HA_ERR_LOCK_DEADLOCK ||
6155
error == HA_ERR_LOCK_WAIT_TIMEOUT ||
6156
(keynum= table->file->get_dup_key(error)) < 0 ||
6160
Deadlock, waiting for lock or just an error from the handler
6161
such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
6162
Retrieval of the duplicate key number may fail
6163
- either because the error was not "duplicate key" error
6164
- or because the information which key is not available
6166
table->file->print_error(error, MYF(0));
6170
We need to retrieve the old row into record[1] to be able to
6171
either update or delete the offending record. We either:
6173
- use rnd_pos() with a row-id (available as dupp_row) to the
6174
offending row, if that is possible (MyISAM and Blackhole), or else
6176
- use index_read_idx() with the key that is duplicated, to
6177
retrieve the offending row.
6179
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
6181
if (table->file->inited && (error= table->file->ha_index_end()))
6183
if ((error= table->file->ha_rnd_init(false)))
6186
error= table->file->rnd_pos(table->record[1], table->file->dup_ref);
6187
table->file->ha_rnd_end();
6190
table->file->print_error(error, MYF(0));
6196
if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
6201
if (key.get() == NULL)
6203
key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
6204
if (key.get() == NULL)
6210
key_copy((unsigned char*)key.get(), table->record[0], table->key_info + keynum,
6212
error= table->file->index_read_idx_map(table->record[1], keynum,
6213
(const unsigned char*)key.get(),
6218
table->file->print_error(error, MYF(0));
6224
Now, record[1] should contain the offending row. That
6225
will enable us to update it or, alternatively, delete it (so
6226
that we can insert the new row afterwards).
6230
If row is incomplete we will use the record found to fill
6233
if (!get_flags(COMPLETE_ROWS_F))
6235
restore_record(table,record[1]);
6236
error= unpack_current_row(rli, &m_cols);
6240
REPLACE is defined as either INSERT or DELETE + INSERT. If
6241
possible, we can replace it with an UPDATE, but that will not
6242
work on InnoDB if FOREIGN KEY checks are necessary.
6244
I (Matz) am not sure of the reason for the last_uniq_key()
6245
check as, but I'm guessing that it's something along the
6248
Suppose that we got the duplicate key to be a key that is not
6249
the last unique key for the table and we perform an update:
6250
then there might be another key for which the unique check will
6251
fail, so we're better off just deleting the row and inserting
6254
if (last_uniq_key(table, keynum) &&
6255
!table->file->referenced_by_foreign_key())
6257
error=table->file->ha_update_row(table->record[1],
6261
case HA_ERR_RECORD_IS_THE_SAME:
6268
table->file->print_error(error, MYF(0));
6275
if ((error= table->file->ha_delete_row(table->record[1])))
6277
table->file->print_error(error, MYF(0));
6280
/* Will retry ha_write_row() with the offending row removed. */
6288
int Rows_log_event::unpack_current_row(const Relay_log_info *const rli,
6289
MY_BITMAP const *cols)
6292
ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT);
6293
int const result= ::unpack_row(rli, m_table, m_width, m_curr_row, cols,
6294
&m_curr_row_end, &m_master_reclength);
6295
if (m_curr_row_end > m_rows_end)
6296
my_error(ER_SLAVE_CORRUPT_EVENT, MYF(0));
6297
ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT);
6303
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6305
assert(m_table != NULL);
6307
write_row(rli, /* if 1 then overwrite */
6308
bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
6310
if (error && !session->is_error())
6313
my_error(ER_UNKNOWN_ERROR, MYF(0));
6320
/**************************************************************************
6321
Delete_rows_log_event member functions
6322
**************************************************************************/
6325
Compares table->record[0] and table->record[1]
6327
Returns TRUE if different.
6329
static bool record_compare(Table *table)
6332
Need to set the X bit and the filler bits in both records since
6333
there are engines that do not set it correctly.
6335
In addition, since MyISAM checks that one hasn't tampered with the
6336
record, it is necessary to restore the old bytes into the record
6337
after doing the comparison.
6339
TODO[record format ndb]: Remove it once NDB returns correct
6340
records. Check that the other engines also return correct records.
6343
unsigned char saved_x[2], saved_filler[2];
6345
if (table->s->null_bytes > 0)
6347
for (int i = 0 ; i < 2 ; ++i)
6349
saved_x[i]= table->record[i][0];
6350
saved_filler[i]= table->record[i][table->s->null_bytes - 1];
6351
table->record[i][0]|= 1U;
6352
table->record[i][table->s->null_bytes - 1]|=
6353
256U - (1U << table->s->last_null_bit_pos);
6357
if (table->s->blob_fields + table->s->varchar_fields == 0)
6359
result= cmp_record(table,record[1]);
6360
goto record_compare_exit;
6363
/* Compare null bits */
6364
if (memcmp(table->null_flags,
6365
table->null_flags+table->s->rec_buff_length,
6366
table->s->null_bytes))
6368
result= true; // Diff in NULL value
6369
goto record_compare_exit;
6372
/* Compare updated fields */
6373
for (Field **ptr=table->field ; *ptr ; ptr++)
6375
if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
6378
goto record_compare_exit;
6382
record_compare_exit:
6384
Restore the saved bytes.
6386
TODO[record format ndb]: Remove this code once NDB returns the
6387
correct record format.
6389
if (table->s->null_bytes > 0)
6391
for (int i = 0 ; i < 2 ; ++i)
6393
table->record[i][0]= saved_x[i];
6394
table->record[i][table->s->null_bytes - 1]= saved_filler[i];
6402
Locate the current row in event's table.
6404
The current row is pointed by @c m_curr_row. Member @c m_width tells how many
6405
columns are there in the row (this can be differnet from the number of columns
6406
in the table). It is assumed that event's table is already open and pointed
6409
If a corresponding record is found in the table it is stored in
6410
@c m_table->record[0]. Note that when record is located based on a primary
6411
key, it is possible that the record found differs from the row being located.
6413
If no key is specified or table does not have keys, a table scan is used to
6414
find the row. In that case the row should be complete and contain values for
6415
all columns. However, it can still be shorter than the table, i.e. the table
6416
can contain extra columns not present in the row. It is also possible that
6417
the table has fewer columns than the row being located.
6419
@returns Error code on failure, 0 on success.
6421
@post In case of success @c m_table->record[0] contains the record found.
6422
Also, the internal "cursor" of the table is positioned at the record found.
6424
@note If the engine allows random access of the records, a combination of
6425
@c position() and @c rnd_pos() will be used.
6428
int Rows_log_event::find_row(const Relay_log_info *rli)
6430
assert(m_table && m_table->in_use != NULL);
6432
Table *table= m_table;
6435
/* unpack row - missing fields get default values */
6436
prepare_record(table, &m_cols, m_width, false/* don't check errors */);
6437
error= unpack_current_row(rli, &m_cols);
6439
// Temporary fix to find out why it fails [/Matz]
6440
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
6442
if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
6443
table->s->primary_key < MAX_KEY)
6446
Use a more efficient method to fetch the record given by
6447
table->record[0] if the engine allows it. We first compute a
6448
row reference using the position() member function (it will be
6449
stored in table->file->ref) and the use rnd_pos() to position
6450
the "cursor" (i.e., record[0] in this case) at the correct row.
6452
TODO: Add a check that the correct record has been fetched by
6453
comparing with the original record. Take into account that the
6454
record on the master and slave can be of different
6455
length. Something along these lines should work:
6457
ADD>>> store_record(table,record[1]);
6458
int error= table->file->rnd_pos(table->record[0], table->file->ref);
6459
ADD>>> assert(memcmp(table->record[1], table->record[0],
6460
table->s->reclength) == 0);
6463
int error= table->file->rnd_pos_by_record(table->record[0]);
6464
table->file->ha_rnd_end();
6467
table->file->print_error(error, MYF(0));
6472
// We can't use position() - try other methods.
6475
Save copy of the record in table->record[1]. It might be needed
6476
later if linear search is used to find exact match.
6478
store_record(table,record[1]);
6480
if (table->s->keys > 0)
6482
/* We have a key: search the table using the index */
6483
if (!table->file->inited && (error= table->file->ha_index_init(0, false)))
6485
table->file->print_error(error, MYF(0));
6489
/* Fill key data for the row */
6492
key_copy(m_key, table->record[0], table->key_info, 0);
6495
We need to set the null bytes to ensure that the filler bit are
6496
all set when returning. There are storage engines that just set
6497
the necessary bits on the bytes and don't set the filler bits
6500
my_ptrdiff_t const pos=
6501
table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
6502
table->record[0][pos]= 0xFF;
6504
if ((error= table->file->index_read_map(table->record[0], m_key,
6506
HA_READ_KEY_EXACT)))
6508
table->file->print_error(error, MYF(0));
6509
table->file->ha_index_end();
6514
Below is a minor "optimization". If the key (i.e., key number
6515
0) has the HA_NOSAME flag set, we know that we have found the
6516
correct record (since there can be no duplicates); otherwise, we
6517
have to compare the record with the one found to see if it is
6520
CAVEAT! This behaviour is essential for the replication of,
6521
e.g., the mysql.proc table since the correct record *shall* be
6522
found using the primary key *only*. There shall be no
6523
comparison of non-PK columns to decide if the correct record is
6524
found. I can see no scenario where it would be incorrect to
6525
chose the row to change only using a PK or an UNNI.
6527
if (table->key_info->flags & HA_NOSAME)
6529
table->file->ha_index_end();
6534
In case key is not unique, we still have to iterate over records found
6535
and find the one which is identical to the row given. A copy of the
6536
record we are looking for is stored in record[1].
6538
while (record_compare(table))
6541
We need to set the null bytes to ensure that the filler bit
6542
are all set when returning. There are storage engines that
6543
just set the necessary bits on the bytes and don't set the
6544
filler bits correctly.
6546
TODO[record format ndb]: Remove this code once NDB returns the
6547
correct record format.
6549
if (table->s->null_bytes > 0)
6551
table->record[0][table->s->null_bytes - 1]|=
6552
256U - (1U << table->s->last_null_bit_pos);
6555
if ((error= table->file->index_next(table->record[0])))
6557
table->file->print_error(error, MYF(0));
6558
table->file->ha_index_end();
6564
Have to restart the scan to be able to fetch the next row.
6566
table->file->ha_index_end();
6570
int restart_count= 0; // Number of times scanning has restarted from top
6572
/* We don't have a key: search the table using rnd_next() */
6573
if ((error= table->file->ha_rnd_init(1)))
6575
table->file->print_error(error, MYF(0));
6579
/* Continue until we find the right record or have made a full loop */
6582
error= table->file->rnd_next(table->record[0]);
6587
case HA_ERR_RECORD_DELETED:
6590
case HA_ERR_END_OF_FILE:
6591
if (++restart_count < 2)
6592
table->file->ha_rnd_init(1);
6596
table->file->print_error(error, MYF(0));
6597
table->file->ha_rnd_end();
6601
while (restart_count < 2 && record_compare(table));
6604
Note: above record_compare will take into accout all record fields
6605
which might be incorrect in case a partial row was given in the event
6607
table->file->ha_rnd_end();
6609
assert(error == HA_ERR_END_OF_FILE || error == HA_ERR_RECORD_DELETED || error == 0);
6613
table->default_column_bitmaps();
6616
table->default_column_bitmaps();
6622
Constructor used to build an event for writing to the binary log.
6625
Delete_rows_log_event::Delete_rows_log_event(Session *session_arg, Table *tbl_arg,
6627
bool is_transactional)
6628
: Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
6633
Constructor used by slave to read the event from the binary log.
6635
Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint32_t event_len,
6636
const Format_description_log_event
6638
: Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
6644
Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6646
if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
6647
m_table->s->primary_key < MAX_KEY)
6650
We don't need to allocate any memory for m_key since it is not used.
6655
if (m_table->s->keys > 0)
6657
// Allocate buffer for key searches
6658
m_key= (unsigned char*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
6660
return HA_ERR_OUT_OF_MEM;
6667
Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6670
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6671
m_table->file->ha_index_or_rnd_end();
6678
int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6681
assert(m_table != NULL);
6683
if (!(error= find_row(rli)))
6686
Delete the record found, located in record[0]
6688
error= m_table->file->ha_delete_row(m_table->record[0]);
6694
/**************************************************************************
6695
Update_rows_log_event member functions
6696
**************************************************************************/
6699
Constructor used to build an event for writing to the binary log.
6701
Update_rows_log_event::Update_rows_log_event(Session *session_arg, Table *tbl_arg,
6703
bool is_transactional)
6704
: Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
6706
init(tbl_arg->write_set);
6709
void Update_rows_log_event::init(MY_BITMAP const *cols)
6711
/* if bitmap_init fails, caught in is_valid() */
6712
if (likely(!bitmap_init(&m_cols_ai,
6713
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
6717
/* Cols can be zero if this is a dummy binrows event */
6718
if (likely(cols != NULL))
6720
memcpy(m_cols_ai.bitmap, cols->bitmap, no_bytes_in_map(cols));
6721
create_last_word_mask(&m_cols_ai);
6727
Update_rows_log_event::~Update_rows_log_event()
6729
if (m_cols_ai.bitmap == m_bitbuf_ai) // no my_malloc happened
6730
m_cols_ai.bitmap= 0; // so no free in bitmap_free
6731
bitmap_free(&m_cols_ai); // To pair with bitmap_init().
6736
Constructor used by slave to read the event from the binary log.
6738
Update_rows_log_event::Update_rows_log_event(const char *buf, uint32_t event_len,
6740
Format_description_log_event
6742
: Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
6748
Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6750
if (m_table->s->keys > 0)
6752
// Allocate buffer for key searches
6753
m_key= (unsigned char*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
6755
return HA_ERR_OUT_OF_MEM;
6758
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
6764
Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6767
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6768
m_table->file->ha_index_or_rnd_end();
6769
free(m_key); // Free for multi_malloc
6776
Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6778
assert(m_table != NULL);
6780
int error= find_row(rli);
6784
We need to read the second image in the event of error to be
6785
able to skip to the next pair of updates
6787
m_curr_row= m_curr_row_end;
6788
unpack_current_row(rli, &m_cols_ai);
6793
This is the situation after locating BI:
6795
===|=== before image ====|=== after image ===|===
6797
m_curr_row m_curr_row_end
6799
BI found in the table is stored in record[0]. We copy it to record[1]
6800
and unpack AI to record[0].
6803
store_record(m_table,record[1]);
6805
m_curr_row= m_curr_row_end;
6806
error= unpack_current_row(rli, &m_cols_ai); // this also updates m_curr_row_end
6809
Now we have the right row to update. The old row (the one we're
6810
looking for) is in record[1] and the new row is in record[0].
6813
// Temporary fix to find out why it fails [/Matz]
6814
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
6815
memcpy(m_table->write_set->bitmap, m_cols_ai.bitmap, (m_table->write_set->n_bits + 7) / 8);
6817
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
6818
if (error == HA_ERR_RECORD_IS_THE_SAME)
6825
Incident_log_event::Incident_log_event(const char *buf, uint32_t event_len,
6826
const Format_description_log_event *descr_event)
6827
: Log_event(buf, descr_event)
6829
uint8_t const common_header_len=
6830
descr_event->common_header_len;
6831
uint8_t const post_header_len=
6832
descr_event->post_header_len[INCIDENT_EVENT-1];
6834
m_incident= static_cast<Incident>(uint2korr(buf + common_header_len));
6835
char const *ptr= buf + common_header_len + post_header_len;
6836
char const *const str_end= buf + event_len;
6837
uint8_t len= 0; // Assignment to keep compiler happy
6838
const char *str= NULL; // Assignment to keep compiler happy
6839
read_str(&ptr, str_end, &str, &len);
6840
m_message.str= const_cast<char*>(str);
6841
m_message.length= len;
6846
Incident_log_event::~Incident_log_event()
6852
Incident_log_event::description() const
6854
static const char *const description[]= {
6855
"NOTHING", // Not used
6859
assert(0 <= m_incident);
6860
assert((size_t) m_incident <= sizeof(description)/sizeof(*description));
6862
return description[m_incident];
6866
void Incident_log_event::pack_info(Protocol *protocol)
6870
if (m_message.length > 0)
6871
bytes= snprintf(buf, sizeof(buf), "#%d (%s)",
6872
m_incident, description());
6874
bytes= snprintf(buf, sizeof(buf), "#%d (%s): %s",
6875
m_incident, description(), m_message.str);
6876
protocol->store(buf, bytes, &my_charset_bin);
6881
Incident_log_event::do_apply_event(Relay_log_info const *rli)
6883
rli->report(ERROR_LEVEL, ER_SLAVE_INCIDENT,
6884
ER(ER_SLAVE_INCIDENT),
6886
m_message.length > 0 ? m_message.str : "<none>");
6892
Incident_log_event::write_data_header(IO_CACHE *file)
6894
unsigned char buf[sizeof(int16_t)];
6895
int2store(buf, (int16_t) m_incident);
6896
return(my_b_safe_write(file, buf, sizeof(buf)));
6900
Incident_log_event::write_data_body(IO_CACHE *file)
6902
return(write_str(file, m_message.str, m_message.length));
6905
Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint32_t event_len,
6906
const Format_description_log_event* description_event)
6907
:Log_event(buf, description_event)
6909
uint8_t header_size= description_event->common_header_len;
6910
ident_len = event_len - header_size;
6911
set_if_smaller(ident_len,FN_REFLEN-1);
6912
log_ident= buf + header_size;