1
/* Copyright (C) 2000-2004 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
#include <drizzled/server_includes.h>
20
#include "rpl_filter.h"
21
#include "rpl_utility.h"
22
#include "rpl_record.h"
23
#include <mysys/my_dir.h>
24
#include <drizzled/drizzled_error_messages.h>
28
#include <mysys/base64.h>
29
#include <mysys/my_bitmap.h>
31
#include <libdrizzle/gettext.h>
32
#include <libdrizzle/libdrizzle.h>
34
#define log_cs &my_charset_utf8_general_ci
37
static const char *HA_ERR(int i)
40
case HA_ERR_KEY_NOT_FOUND: return "HA_ERR_KEY_NOT_FOUND";
41
case HA_ERR_FOUND_DUPP_KEY: return "HA_ERR_FOUND_DUPP_KEY";
42
case HA_ERR_RECORD_CHANGED: return "HA_ERR_RECORD_CHANGED";
43
case HA_ERR_WRONG_INDEX: return "HA_ERR_WRONG_INDEX";
44
case HA_ERR_CRASHED: return "HA_ERR_CRASHED";
45
case HA_ERR_WRONG_IN_RECORD: return "HA_ERR_WRONG_IN_RECORD";
46
case HA_ERR_OUT_OF_MEM: return "HA_ERR_OUT_OF_MEM";
47
case HA_ERR_NOT_A_TABLE: return "HA_ERR_NOT_A_TABLE";
48
case HA_ERR_WRONG_COMMAND: return "HA_ERR_WRONG_COMMAND";
49
case HA_ERR_OLD_FILE: return "HA_ERR_OLD_FILE";
50
case HA_ERR_NO_ACTIVE_RECORD: return "HA_ERR_NO_ACTIVE_RECORD";
51
case HA_ERR_RECORD_DELETED: return "HA_ERR_RECORD_DELETED";
52
case HA_ERR_RECORD_FILE_FULL: return "HA_ERR_RECORD_FILE_FULL";
53
case HA_ERR_INDEX_FILE_FULL: return "HA_ERR_INDEX_FILE_FULL";
54
case HA_ERR_END_OF_FILE: return "HA_ERR_END_OF_FILE";
55
case HA_ERR_UNSUPPORTED: return "HA_ERR_UNSUPPORTED";
56
case HA_ERR_TO_BIG_ROW: return "HA_ERR_TO_BIG_ROW";
57
case HA_WRONG_CREATE_OPTION: return "HA_WRONG_CREATE_OPTION";
58
case HA_ERR_FOUND_DUPP_UNIQUE: return "HA_ERR_FOUND_DUPP_UNIQUE";
59
case HA_ERR_UNKNOWN_CHARSET: return "HA_ERR_UNKNOWN_CHARSET";
60
case HA_ERR_WRONG_MRG_TABLE_DEF: return "HA_ERR_WRONG_MRG_TABLE_DEF";
61
case HA_ERR_CRASHED_ON_REPAIR: return "HA_ERR_CRASHED_ON_REPAIR";
62
case HA_ERR_CRASHED_ON_USAGE: return "HA_ERR_CRASHED_ON_USAGE";
63
case HA_ERR_LOCK_WAIT_TIMEOUT: return "HA_ERR_LOCK_WAIT_TIMEOUT";
64
case HA_ERR_LOCK_TABLE_FULL: return "HA_ERR_LOCK_TABLE_FULL";
65
case HA_ERR_READ_ONLY_TRANSACTION: return "HA_ERR_READ_ONLY_TRANSACTION";
66
case HA_ERR_LOCK_DEADLOCK: return "HA_ERR_LOCK_DEADLOCK";
67
case HA_ERR_CANNOT_ADD_FOREIGN: return "HA_ERR_CANNOT_ADD_FOREIGN";
68
case HA_ERR_NO_REFERENCED_ROW: return "HA_ERR_NO_REFERENCED_ROW";
69
case HA_ERR_ROW_IS_REFERENCED: return "HA_ERR_ROW_IS_REFERENCED";
70
case HA_ERR_NO_SAVEPOINT: return "HA_ERR_NO_SAVEPOINT";
71
case HA_ERR_NON_UNIQUE_BLOCK_SIZE: return "HA_ERR_NON_UNIQUE_BLOCK_SIZE";
72
case HA_ERR_NO_SUCH_TABLE: return "HA_ERR_NO_SUCH_TABLE";
73
case HA_ERR_TABLE_EXIST: return "HA_ERR_TABLE_EXIST";
74
case HA_ERR_NO_CONNECTION: return "HA_ERR_NO_CONNECTION";
75
case HA_ERR_NULL_IN_SPATIAL: return "HA_ERR_NULL_IN_SPATIAL";
76
case HA_ERR_TABLE_DEF_CHANGED: return "HA_ERR_TABLE_DEF_CHANGED";
77
case HA_ERR_NO_PARTITION_FOUND: return "HA_ERR_NO_PARTITION_FOUND";
78
case HA_ERR_RBR_LOGGING_FAILED: return "HA_ERR_RBR_LOGGING_FAILED";
79
case HA_ERR_DROP_INDEX_FK: return "HA_ERR_DROP_INDEX_FK";
80
case HA_ERR_FOREIGN_DUPLICATE_KEY: return "HA_ERR_FOREIGN_DUPLICATE_KEY";
81
case HA_ERR_TABLE_NEEDS_UPGRADE: return "HA_ERR_TABLE_NEEDS_UPGRADE";
82
case HA_ERR_TABLE_READONLY: return "HA_ERR_TABLE_READONLY";
83
case HA_ERR_AUTOINC_READ_FAILED: return "HA_ERR_AUTOINC_READ_FAILED";
84
case HA_ERR_AUTOINC_ERANGE: return "HA_ERR_AUTOINC_ERANGE";
85
case HA_ERR_GENERIC: return "HA_ERR_GENERIC";
86
case HA_ERR_RECORD_IS_THE_SAME: return "HA_ERR_RECORD_IS_THE_SAME";
87
case HA_ERR_LOGGING_IMPOSSIBLE: return "HA_ERR_LOGGING_IMPOSSIBLE";
88
case HA_ERR_CORRUPT_EVENT: return "HA_ERR_CORRUPT_EVENT";
89
case HA_ERR_ROWS_EVENT_APPLY : return "HA_ERR_ROWS_EVENT_APPLY";
95
Error reporting facility for Rows_log_event::do_apply_event
97
@param level error, warning or info
98
@param ha_error HA_ERR_ code
99
@param rli pointer to the active Relay_log_info instance
100
@param thd pointer to the slave thread's thd
101
@param table pointer to the event's table object
102
@param type the type of the event
103
@param log_name the master binlog file name
104
@param pos the master binlog file pos (the next after the event)
107
static void inline slave_rows_error_report(enum loglevel level, int ha_error,
108
Relay_log_info const *rli, THD *thd,
109
Table *table, const char * type,
110
const char *log_name, ulong pos)
112
const char *handler_error= HA_ERR(ha_error);
113
char buff[MAX_SLAVE_ERRMSG], *slider;
114
const char *buff_end= buff + sizeof(buff);
116
List_iterator_fast<DRIZZLE_ERROR> it(thd->warn_list);
120
for (err= it++, slider= buff; err && slider < buff_end - 1;
121
slider += len, err= it++)
123
len= snprintf(slider, buff_end - slider,
124
_(" %s, Error_code: %d;"), err->msg, err->code);
127
rli->report(level, thd->is_error()? thd->main_da.sql_errno() : 0,
128
_("Could not execute %s event on table %s.%s;"
129
"%s handler error %s; "
130
"the event's master log %s, end_log_pos %lu"),
131
type, table->s->db.str,
132
table->s->table_name.str,
134
handler_error == NULL? _("<unknown>") : handler_error,
140
Cache that will automatically be written to a dedicated file on
146
class Write_on_release_cache
154
typedef unsigned short flag_set;
160
Write_on_release_cache
161
cache Pointer to cache to use
162
file File to write cache to upon destruction
163
flags Flags for the cache
167
Class used to guarantee copy of cache to file before exiting the
168
current block. On successful copy of the cache, the cache will
169
be reinited as a WRITE_CACHE.
171
Currently, a pointer to the cache is provided in the
172
constructor, but it would be possible to create a subclass
173
holding the IO_CACHE itself.
175
Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0)
176
: m_cache(cache), m_file(file), m_flags(flags)
178
reinit_io_cache(m_cache, WRITE_CACHE, 0L, false, true);
181
~Write_on_release_cache()
183
copy_event_cache_to_file_and_reinit(m_cache, m_file);
184
if (m_flags | FLUSH_F)
189
Return a pointer to the internal IO_CACHE.
196
Function to return a pointer to the internal cache, so that the
197
object can be treated as a IO_CACHE and used with the my_b_*
201
A pointer to the internal IO_CACHE.
203
IO_CACHE *operator&()
209
// Hidden, to prevent usage.
210
Write_on_release_cache(Write_on_release_cache const&);
217
uint32_t debug_not_change_ts_if_art_event= 1; // bug#29309 simulation
223
static void clear_all_errors(THD *thd, Relay_log_info *rli)
225
thd->is_slave_error = 0;
232
Ignore error code specified on command line.
235
inline int ignored_error_code(int err_code)
237
return ((err_code == ER_SLAVE_IGNORED_TABLE) ||
238
(use_slave_mask && bitmap_is_set(&slave_error_mask, err_code)));
246
static char *pretty_print_str(char *packet, const char *str, int len)
248
const char *end= str + len;
254
switch ((c=*str++)) {
255
case '\n': *pos++= '\\'; *pos++= 'n'; break;
256
case '\r': *pos++= '\\'; *pos++= 'r'; break;
257
case '\\': *pos++= '\\'; *pos++= '\\'; break;
258
case '\b': *pos++= '\\'; *pos++= 'b'; break;
259
case '\t': *pos++= '\\'; *pos++= 't'; break;
260
case '\'': *pos++= '\\'; *pos++= '\''; break;
261
case 0 : *pos++= '\\'; *pos++= '0'; break;
273
Creates a temporary name for load data infile:.
275
@param buf Store new filename here
276
@param file_id File_id (part of file name)
277
@param event_server_id Event_id (part of file name)
278
@param ext Extension for file name
281
Pointer to start of extension
284
static char *slave_load_file_stem(char *buf, uint32_t file_id,
285
int event_server_id, const char *ext)
288
fn_format(buf,"SQL_LOAD-",slave_load_tmpdir, "", MY_UNPACK_FILENAME);
291
buf= strchr(buf, '\0');
292
buf= int10_to_str(::server_id, buf, 10);
294
buf= int10_to_str(event_server_id, buf, 10);
296
res= int10_to_str(file_id, buf, 10);
297
my_stpcpy(res, ext); // Add extension last
298
return res; // Pointer to extension
303
Delete all temporary files used for SQL_LOAD.
306
static void cleanup_load_tmpdir()
311
char fname[FN_REFLEN], prefbuf[31], *p;
313
if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
317
When we are deleting temporary files, we should only remove
318
the files associated with the server id of our server.
319
We don't use event_server_id here because since we've disabled
320
direct binlogging of Create_file/Append_file/Exec_load events
321
we cannot meet Start_log event in the middle of events from one
324
p= strmake(prefbuf, STRING_WITH_LEN("SQL_LOAD-"));
325
p= int10_to_str(::server_id, p, 10);
329
for (i=0 ; i < (uint)dirp->number_off_files; i++)
331
file=dirp->dir_entry+i;
332
if (is_prefix(file->name, prefbuf))
334
fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME);
335
my_delete(fname, MYF(0));
347
static bool write_str(IO_CACHE *file, const char *str, uint32_t length)
349
unsigned char tmp[1];
350
tmp[0]= (unsigned char) length;
351
return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
352
my_b_safe_write(file, (unsigned char*) str, length));
360
static inline int read_str(const char **buf, const char *buf_end,
361
const char **str, uint8_t *len)
363
if (*buf + ((uint) (unsigned char) **buf) >= buf_end)
365
*len= (uint8_t) **buf;
367
(*buf)+= (uint) *len+1;
373
Transforms a string into "" or its expression in 0x... form.
376
char *str_to_hex(char *to, const char *from, uint32_t len)
382
to= octet2hex(to, from, len);
385
to= my_stpcpy(to, "\"\"");
386
return to; // pointer to end 0 of 'to'
391
Append a version of the 'from' string suitable for use in a query to
392
the 'to' string. To generate a correct escaping, the character set
393
information in 'csinfo' is used.
397
append_query_string(const CHARSET_INFO * const csinfo,
398
String const *from, String *to)
401
uint32_t const orig_len= to->length();
402
if (to->reserve(orig_len + from->length()*2+3))
405
beg= to->c_ptr_quick() + to->length();
407
if (csinfo->escape_with_backslash_is_dangerous)
408
ptr= str_to_hex(ptr, from->ptr(), from->length());
412
ptr+= drizzle_escape_string(ptr, from->ptr(), from->length());
415
to->length(orig_len + ptr - beg);
420
/**************************************************************************
421
Log_event methods (= the parent class of all events)
422
**************************************************************************/
426
returns the human readable name of the event's type
429
const char* Log_event::get_type_str(Log_event_type type)
432
case START_EVENT_V3: return "Start_v3";
433
case STOP_EVENT: return "Stop";
434
case QUERY_EVENT: return "Query";
435
case ROTATE_EVENT: return "Rotate";
436
case INTVAR_EVENT: return "Intvar";
437
case LOAD_EVENT: return "Load";
438
case NEW_LOAD_EVENT: return "New_load";
439
case SLAVE_EVENT: return "Slave";
440
case CREATE_FILE_EVENT: return "Create_file";
441
case APPEND_BLOCK_EVENT: return "Append_block";
442
case DELETE_FILE_EVENT: return "Delete_file";
443
case EXEC_LOAD_EVENT: return "Exec_load";
444
case RAND_EVENT: return "RAND";
445
case XID_EVENT: return "Xid";
446
case USER_VAR_EVENT: return "User var";
447
case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
448
case TABLE_MAP_EVENT: return "Table_map";
449
case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old";
450
case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old";
451
case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old";
452
case WRITE_ROWS_EVENT: return "Write_rows";
453
case UPDATE_ROWS_EVENT: return "Update_rows";
454
case DELETE_ROWS_EVENT: return "Delete_rows";
455
case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
456
case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
457
case INCIDENT_EVENT: return "Incident";
458
default: return "Unknown"; /* impossible */
462
const char* Log_event::get_type_str()
464
return get_type_str(get_type_code());
469
Log_event::Log_event()
472
Log_event::Log_event(THD* thd_arg, uint16_t flags_arg, bool using_trans)
473
:log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), thd(thd_arg)
475
server_id= thd->server_id;
476
when= thd->start_time;
477
cache_stmt= using_trans;
482
This minimal constructor is for when you are not even sure that there
483
is a valid THD. For example in the server when we are shutting down or
484
flushing logs after receiving a SIGHUP (then we must write a Rotate to
485
the binlog but we have no THD, so we need this minimal constructor).
488
Log_event::Log_event()
489
:temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
492
server_id= ::server_id;
494
We can't call my_time() here as this would cause a call before
503
Log_event::Log_event()
506
Log_event::Log_event(const char* buf,
507
const Format_description_log_event* description_event)
508
:temp_buf(0), cache_stmt(0)
511
when= uint4korr(buf);
512
server_id= uint4korr(buf + SERVER_ID_OFFSET);
513
data_written= uint4korr(buf + EVENT_LEN_OFFSET);
514
if (description_event->binlog_version==1)
521
log_pos= uint4korr(buf + LOG_POS_OFFSET);
523
If the log is 4.0 (so here it can only be a 4.0 relay log read by
524
the SQL thread or a 4.0 master binlog read by the I/O thread),
525
log_pos is the beginning of the event: we transform it into the end
526
of the event, which is more useful.
527
But how do you know that the log is 4.0: you know it if
528
description_event is version 3 *and* you are not reading a
529
Format_desc (remember that mysqlbinlog starts by assuming that 5.0
530
logs are in 4.0 format, until it finds a Format_desc).
532
if (description_event->binlog_version==3 &&
533
buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
536
If log_pos=0, don't change it. log_pos==0 is a marker to mean
537
"don't change rli->group_master_log_pos" (see
538
inc_group_relay_log_pos()). As it is unreal log_pos, adding the
539
event len's is nonsense. For example, a fake Rotate event should
540
not have its log_pos (which is 0) changed or it will modify
541
Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense
542
value of (a non-zero offset which does not exist in the master's
543
binlog, so which will cause problems if the user uses this value
546
log_pos+= data_written; /* purecov: inspected */
549
flags= uint2korr(buf + FLAGS_OFFSET);
550
if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
551
(buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
554
These events always have a header which stops here (i.e. their
558
Initialization to zero of all other Log_event members as they're
559
not specified. Currently there are no such members; in the future
560
there will be an event UID (but Format_description and Rotate
561
don't need this UID, as they are not propagated through
562
--log-slave-updates (remember the UID is used to not play a query
563
twice when you have two masters which are slaves of a 3rd master).
568
/* otherwise, go on with reading the header from buf (nothing now) */
572
int Log_event::do_update_pos(Relay_log_info *rli)
575
rli is null when (as far as I (Guilhem) know) the caller is
576
Load_log_event::do_apply_event *and* that one is called from
577
Execute_load_log_event::do_apply_event. In this case, we don't
578
do anything here ; Execute_load_log_event::do_apply_event will
579
call Log_event::do_apply_event again later with the proper rli.
580
Strictly speaking, if we were sure that rli is null only in the
581
case discussed above, 'if (rli)' is useless here. But as we are
582
not 100% sure, keep it for now.
584
Matz: I don't think we will need this check with this refactoring.
589
bug#29309 simulation: resetting the flag to force
590
wrong behaviour of artificial event to update
591
rli->last_master_timestamp for only one time -
592
the first FLUSH LOGS in the test.
594
if (debug_not_change_ts_if_art_event == 1
595
&& is_artificial_event())
596
debug_not_change_ts_if_art_event= 0;
597
rli->stmt_done(log_pos,
598
is_artificial_event() &&
599
debug_not_change_ts_if_art_event > 0 ? 0 : when);
600
if (debug_not_change_ts_if_art_event == 0)
601
debug_not_change_ts_if_art_event= 2;
603
return 0; // Cannot fail currently
607
Log_event::enum_skip_reason
608
Log_event::do_shall_skip(Relay_log_info *rli)
610
if ((server_id == ::server_id && !rli->replicate_same_server_id) || (rli->slave_skip_counter == 1 && rli->is_in_group()))
611
return EVENT_SKIP_IGNORE;
612
else if (rli->slave_skip_counter > 0)
613
return EVENT_SKIP_COUNT;
615
return EVENT_SKIP_NOT;
620
Log_event::pack_info()
623
void Log_event::pack_info(Protocol *protocol)
625
protocol->store("", &my_charset_bin);
630
init_show_field_list() prepares the column names and types for the
631
output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
635
void Log_event::init_show_field_list(List<Item>* field_list)
637
field_list->push_back(new Item_empty_string("Log_name", 20));
638
field_list->push_back(new Item_return_int("Pos", MY_INT32_NUM_DECIMAL_DIGITS,
639
DRIZZLE_TYPE_LONGLONG));
640
field_list->push_back(new Item_empty_string("Event_type", 20));
641
field_list->push_back(new Item_return_int("Server_id", 10,
643
field_list->push_back(new Item_return_int("End_log_pos",
644
MY_INT32_NUM_DECIMAL_DIGITS,
645
DRIZZLE_TYPE_LONGLONG));
646
field_list->push_back(new Item_empty_string("Info", 20));
653
bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
655
unsigned char header[LOG_EVENT_HEADER_LEN];
658
/* Store number of bytes that will be written by this event */
659
data_written= event_data_length + sizeof(header);
662
log_pos != 0 if this is relay-log event. In this case we should not
666
if (is_artificial_event())
669
We should not do any cleanup on slave when reading this. We
670
mark this by setting log_pos to 0. Start_log_event_v3() will
671
detect this on reading and set artificial_event=1 for the event.
678
Calculate position of end of event
680
Note that with a SEQ_READ_APPEND cache, my_b_tell() does not
681
work well. So this will give slightly wrong positions for the
682
Format_desc/Rotate/Stop events which the slave writes to its
683
relay log. For example, the initial Format_desc will have
684
end_log_pos=91 instead of 95. Because after writing the first 4
685
bytes of the relay log, my_b_tell() still reports 0. Because
686
my_b_append() does not update the counter which my_b_tell()
687
later uses (one should probably use my_b_append_tell() to work
688
around this). To get right positions even when writing to the
689
relay log, we use the (new) my_b_safe_tell().
691
Note that this raises a question on the correctness of all these
692
assert(my_b_tell()=rli->event_relay_log_pos).
694
If in a transaction, the log_pos which we calculate below is not
695
very good (because then my_b_safe_tell() returns start position
696
of the BEGIN, so it's like the statement was at the BEGIN's
697
place), but it's not a very serious problem (as the slave, when
698
it is in a transaction, does not take those end_log_pos into
699
account (as it calls inc_event_relay_log_pos()). To be fixed
700
later, so that it looks less strange. But not bug.
703
log_pos= my_b_safe_tell(file)+data_written;
706
now= (ulong) get_time(); // Query start time
709
Header will be of size LOG_EVENT_HEADER_LEN for all events, except for
710
FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT, where it will be
711
LOG_EVENT_MINIMAL_HEADER_LEN (remember these 2 have a frozen header,
712
because we read them before knowing the format).
715
int4store(header, now); // timestamp
716
header[EVENT_TYPE_OFFSET]= get_type_code();
717
int4store(header+ SERVER_ID_OFFSET, server_id);
718
int4store(header+ EVENT_LEN_OFFSET, data_written);
719
int4store(header+ LOG_POS_OFFSET, log_pos);
720
int2store(header+ FLAGS_OFFSET, flags);
722
return(my_b_safe_write(file, header, sizeof(header)) != 0);
727
This needn't be format-tolerant, because we only read
728
LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length).
731
int Log_event::read_log_event(IO_CACHE* file, String* packet,
732
pthread_mutex_t* log_lock)
736
char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
739
pthread_mutex_lock(log_lock);
740
if (my_b_read(file, (unsigned char*) buf, sizeof(buf)))
743
If the read hits eof, we must report it as eof so the caller
744
will know it can go into cond_wait to be woken up on the next
748
result= LOG_READ_EOF;
750
result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
753
data_len= uint4korr(buf + EVENT_LEN_OFFSET);
754
if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
755
data_len > current_thd->variables.max_allowed_packet)
757
result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
762
/* Append the log event header to packet */
763
if (packet->append(buf, sizeof(buf)))
765
/* Failed to allocate packet */
766
result= LOG_READ_MEM;
769
data_len-= LOG_EVENT_MINIMAL_HEADER_LEN;
772
/* Append rest of event, read directly from file into packet */
773
if (packet->append(file, data_len))
776
Fatal error occured when appending rest of the event
777
to packet, possible failures:
778
1. EOF occured when reading from file, it's really an error
779
as data_len is >=0 there's supposed to be more bytes available.
780
file->error will have been set to number of bytes left to read
781
2. Read was interrupted, file->error would normally be set to -1
782
3. Failed to allocate memory for packet, my_errno
783
will be ENOMEM(file->error shuold be 0, but since the
784
memory allocation occurs before the call to read it might
787
result= (my_errno == ENOMEM ? LOG_READ_MEM :
788
(file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
789
/* Implicit goto end; */
795
pthread_mutex_unlock(log_lock);
799
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
800
#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
804
Allocates memory; The caller is responsible for clean-up.
806
Log_event* Log_event::read_log_event(IO_CACHE* file,
807
pthread_mutex_t* log_lock,
808
const Format_description_log_event
811
assert(description_event != 0);
812
char head[LOG_EVENT_MINIMAL_HEADER_LEN];
814
First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
815
check the event for sanity and to know its length; no need to really parse
816
it. We say "at most" because this could be a 3.23 master, which has header
817
of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's
818
"minimal" over the set {MySQL >=4.0}).
820
uint32_t header_size= cmin(description_event->common_header_len,
821
LOG_EVENT_MINIMAL_HEADER_LEN);
824
if (my_b_read(file, (unsigned char *) head, header_size))
828
No error here; it could be that we are at the file's end. However
829
if the next my_b_read() fails (below), it will be an error as we
830
were able to read the first bytes.
834
uint32_t data_len = uint4korr(head + EVENT_LEN_OFFSET);
836
const char *error= 0;
838
#ifndef max_allowed_packet
839
THD *thd=current_thd;
840
uint32_t max_allowed_packet= thd ? thd->variables.max_allowed_packet : ~(ulong)0;
843
if (data_len > max_allowed_packet)
845
error = "Event too big";
849
if (data_len < header_size)
851
error = "Event too small";
855
// some events use the extra byte to null-terminate strings
856
if (!(buf = (char*) my_malloc(data_len+1, MYF(MY_WME))))
858
error = "Out of memory";
862
memcpy(buf, head, header_size);
863
if (my_b_read(file, (unsigned char*) buf + header_size, data_len - header_size))
865
error = "read error";
868
if ((res= read_log_event(buf, data_len, &error, description_event)))
869
res->register_temp_buf(buf);
876
sql_print_error(_("Error in Log_event::read_log_event(): "
877
"'%s', data_len: %d, event_type: %d"),
878
error,data_len,head[EVENT_TYPE_OFFSET]);
881
The SQL slave thread will check if file->error<0 to know
882
if there was an I/O error. Even if there is no "low-level" I/O errors
883
with 'file', any of the high-level above errors is worrying
884
enough to stop the SQL thread now ; as we are skipping the current event,
885
going on with reading and successfully executing other events can
886
only corrupt the slave's databases. So stop.
895
Binlog format tolerance is in (buf, event_len, description_event)
899
Log_event* Log_event::read_log_event(const char* buf, uint32_t event_len,
901
const Format_description_log_event *description_event)
904
assert(description_event != 0);
906
/* Check the integrity */
907
if (event_len < EVENT_LEN_OFFSET ||
908
buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
909
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
911
*error="Sanity check failed"; // Needed to free buffer
912
return(NULL); // general sanity check - will fail on a partial read
915
uint32_t event_type= buf[EVENT_TYPE_OFFSET];
916
if (event_type > description_event->number_of_event_types &&
917
event_type != FORMAT_DESCRIPTION_EVENT)
920
It is unsafe to use the description_event if its post_header_len
921
array does not include the event type.
928
In some previuos versions (see comment in
929
Format_description_log_event::Format_description_log_event(char*,...)),
930
event types were assigned different id numbers than in the
931
present version. In order to replicate from such versions to the
932
present version, we must map those event type id's to our event
933
type id's. The mapping is done with the event_type_permutation
934
array, which was set up when the Format_description_log_event
937
if (description_event->event_type_permutation)
938
event_type= description_event->event_type_permutation[event_type];
942
ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
945
ev = new Load_log_event(buf, event_len, description_event);
948
ev = new Load_log_event(buf, event_len, description_event);
951
ev = new Rotate_log_event(buf, event_len, description_event);
953
case CREATE_FILE_EVENT:
954
ev = new Create_file_log_event(buf, event_len, description_event);
956
case APPEND_BLOCK_EVENT:
957
ev = new Append_block_log_event(buf, event_len, description_event);
959
case DELETE_FILE_EVENT:
960
ev = new Delete_file_log_event(buf, event_len, description_event);
962
case EXEC_LOAD_EVENT:
963
ev = new Execute_load_log_event(buf, event_len, description_event);
965
case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
966
ev = new Start_log_event_v3(buf, description_event);
969
ev = new Stop_log_event(buf, description_event);
972
ev = new Intvar_log_event(buf, description_event);
975
ev = new Xid_log_event(buf, description_event);
978
ev = new Rand_log_event(buf, description_event);
981
ev = new User_var_log_event(buf, description_event);
983
case FORMAT_DESCRIPTION_EVENT:
984
ev = new Format_description_log_event(buf, event_len, description_event);
986
case WRITE_ROWS_EVENT:
987
ev = new Write_rows_log_event(buf, event_len, description_event);
989
case UPDATE_ROWS_EVENT:
990
ev = new Update_rows_log_event(buf, event_len, description_event);
992
case DELETE_ROWS_EVENT:
993
ev = new Delete_rows_log_event(buf, event_len, description_event);
995
case TABLE_MAP_EVENT:
996
ev = new Table_map_log_event(buf, event_len, description_event);
998
case BEGIN_LOAD_QUERY_EVENT:
999
ev = new Begin_load_query_log_event(buf, event_len, description_event);
1001
case EXECUTE_LOAD_QUERY_EVENT:
1002
ev= new Execute_load_query_log_event(buf, event_len, description_event);
1004
case INCIDENT_EVENT:
1005
ev = new Incident_log_event(buf, event_len, description_event);
1014
is_valid() are small event-specific sanity tests which are
1015
important; for example there are some my_malloc() in constructors
1016
(e.g. Query_log_event::Query_log_event(char*...)); when these
1017
my_malloc() fail we can't return an error out of the constructor
1018
(because constructor is "void") ; so instead we leave the pointer we
1019
wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
1020
Same for Format_description_log_event, member 'post_header_len'.
1022
if (!ev || !ev->is_valid())
1025
*error= "Found invalid event in binary log";
1031
inline Log_event::enum_skip_reason
1032
Log_event::continue_group(Relay_log_info *rli)
1034
if (rli->slave_skip_counter == 1)
1035
return Log_event::EVENT_SKIP_IGNORE;
1036
return Log_event::do_shall_skip(rli);
1039
/**************************************************************************
1040
Query_log_event methods
1041
**************************************************************************/
1044
This (which is used only for SHOW BINLOG EVENTS) could be updated to
1045
print SET @@session_var=. But this is not urgent, as SHOW BINLOG EVENTS is
1046
only an information, it does not produce suitable queries to replay (for
1047
example it does not print LOAD DATA INFILE).
1052
void Query_log_event::pack_info(Protocol *protocol)
1054
// TODO: show the catalog ??
1056
if (!(buf= (char*) my_malloc(9 + db_len + q_len, MYF(MY_WME))))
1059
if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
1062
pos= my_stpcpy(buf, "use `");
1063
memcpy(pos, db, db_len);
1064
pos= my_stpcpy(pos+db_len, "`; ");
1068
memcpy(pos, query, q_len);
1071
protocol->store(buf, pos-buf, &my_charset_bin);
1077
Utility function for the next method (Query_log_event::write()) .
1079
static void write_str_with_code_and_len(char **dst, const char *src,
1080
int len, uint32_t code)
1084
*((*dst)++)= (unsigned char) len;
1085
memcpy(*dst, src, len);
1091
Query_log_event::write().
1094
In this event we have to modify the header to have the correct
1095
EVENT_LEN_OFFSET as we don't yet know how many status variables we
1099
bool Query_log_event::write(IO_CACHE* file)
1102
@todo if catalog can be of length FN_REFLEN==512, then we are not
1103
replicating it correctly, since the length is stored in a byte
1106
unsigned char buf[QUERY_HEADER_LEN+
1107
1+4+ // code of flags2 and flags2
1108
1+8+ // code of sql_mode and sql_mode
1109
1+1+FN_REFLEN+ // code of catalog and catalog length and catalog
1110
1+4+ // code of autoinc and the 2 autoinc variables
1111
1+6+ // code of charset and charset
1112
1+1+MAX_TIME_ZONE_NAME_LENGTH+ // code of tz and tz length and tz name
1113
1+2+ // code of lc_time_names and lc_time_names_number
1114
1+2 // code of charset_database and charset_database_number
1115
], *start, *start_of_status;
1119
return 1; // Something wrong with event
1122
We want to store the thread id:
1123
(- as an information for the user when he reads the binlog)
1124
- if the query uses temporary table: for the slave SQL thread to know to
1125
which master connection the temp table belongs.
1126
Now imagine we (write()) are called by the slave SQL thread (we are
1127
logging a query executed by this thread; the slave runs with
1128
--log-slave-updates). Then this query will be logged with
1129
thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of
1130
the same name were created simultaneously on the master (in the master
1132
CREATE TEMPORARY TABLE t; (thread 1)
1133
CREATE TEMPORARY TABLE t; (thread 2)
1135
then in the slave's binlog there will be
1136
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1137
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1138
which is bad (same thread id!).
1140
To avoid this, we log the thread's thread id EXCEPT for the SQL
1141
slave thread for which we log the original (master's) thread id.
1142
Now this moves the bug: what happens if the thread id on the
1143
master was 10 and when the slave replicates the query, a
1144
connection number 10 is opened by a normal client on the slave,
1145
and updates a temp table of the same name? We get a problem
1146
again. To avoid this, in the handling of temp tables (sql_base.cc)
1147
we use thread_id AND server_id. TODO when this is merged into
1148
4.1: in 4.1, slave_proxy_id has been renamed to pseudo_thread_id
1149
and is a session variable: that's to make mysqlbinlog work with
1150
temp tables. We probably need to introduce
1152
SET PSEUDO_SERVER_ID
1153
for mysqlbinlog in 4.1. mysqlbinlog would print:
1154
SET PSEUDO_SERVER_ID=
1155
SET PSEUDO_THREAD_ID=
1156
for each query using temp tables.
1158
int4store(buf + Q_THREAD_ID_OFFSET, slave_proxy_id);
1159
int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
1160
buf[Q_DB_LEN_OFFSET] = (char) db_len;
1161
int2store(buf + Q_ERR_CODE_OFFSET, error_code);
1164
You MUST always write status vars in increasing order of code. This
1165
guarantees that a slightly older slave will be able to parse those he
1168
start_of_status= start= buf+QUERY_HEADER_LEN;
1171
*start++= Q_FLAGS2_CODE;
1172
int4store(start, flags2);
1175
if (sql_mode_inited)
1177
*start++= Q_SQL_MODE_CODE;
1178
int8store(start, (uint64_t)sql_mode);
1181
if (catalog_len) // i.e. this var is inited (false for 4.0 events)
1183
write_str_with_code_and_len((char **)(&start),
1184
catalog, catalog_len, Q_CATALOG_NZ_CODE);
1186
In 5.0.x where x<4 masters we used to store the end zero here. This was
1187
a waste of one byte so we don't do it in x>=4 masters. We change code to
1188
Q_CATALOG_NZ_CODE, because re-using the old code would make x<4 slaves
1189
of this x>=4 master segfault (expecting a zero when there is
1190
none). Remaining compatibility problems are: the older slave will not
1191
find the catalog; but it is will not crash, and it's not an issue
1192
that it does not find the catalog as catalogs were not used in these
1193
older MySQL versions (we store it in binlog and read it from relay log
1194
but do nothing useful with it). What is an issue is that the older slave
1195
will stop processing the Q_* blocks (and jumps to the db/query) as soon
1196
as it sees unknown Q_CATALOG_NZ_CODE; so it will not be able to read
1197
Q_AUTO_INCREMENT*, Q_CHARSET and so replication will fail silently in
1198
various ways. Documented that you should not mix alpha/beta versions if
1199
they are not exactly the same version, with example of 5.0.3->5.0.2 and
1200
5.0.4->5.0.3. If replication is from older to new, the new will
1201
recognize Q_CATALOG_CODE and have no problem.
1204
if (auto_increment_increment != 1 || auto_increment_offset != 1)
1206
*start++= Q_AUTO_INCREMENT;
1207
int2store(start, auto_increment_increment);
1208
int2store(start+2, auto_increment_offset);
1213
*start++= Q_CHARSET_CODE;
1214
memcpy(start, charset, 6);
1219
/* In the TZ sys table, column Name is of length 64 so this should be ok */
1220
assert(time_zone_len <= MAX_TIME_ZONE_NAME_LENGTH);
1221
*start++= Q_TIME_ZONE_CODE;
1222
*start++= time_zone_len;
1223
memcpy(start, time_zone_str, time_zone_len);
1224
start+= time_zone_len;
1226
if (lc_time_names_number)
1228
assert(lc_time_names_number <= 0xFFFF);
1229
*start++= Q_LC_TIME_NAMES_CODE;
1230
int2store(start, lc_time_names_number);
1233
if (charset_database_number)
1235
assert(charset_database_number <= 0xFFFF);
1236
*start++= Q_CHARSET_DATABASE_CODE;
1237
int2store(start, charset_database_number);
1241
Here there could be code like
1242
if (command-line-option-which-says-"log_this_variable" && inited)
1244
*start++= Q_THIS_VARIABLE_CODE;
1245
int4store(start, this_variable);
1250
/* Store length of status variables */
1251
status_vars_len= (uint) (start-start_of_status);
1252
assert(status_vars_len <= MAX_SIZE_LOG_EVENT_STATUS);
1253
int2store(buf + Q_STATUS_VARS_LEN_OFFSET, status_vars_len);
1256
Calculate length of whole event
1257
The "1" below is the \0 in the db's length
1259
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
1261
return (write_header(file, event_length) ||
1262
my_b_safe_write(file, (unsigned char*) buf, QUERY_HEADER_LEN) ||
1263
write_post_header_for_derived(file) ||
1264
my_b_safe_write(file, (unsigned char*) start_of_status,
1265
(uint) (start-start_of_status)) ||
1266
my_b_safe_write(file, (db) ? (unsigned char*) db : (unsigned char*)"", db_len + 1) ||
1267
my_b_safe_write(file, (unsigned char*) query, q_len)) ? 1 : 0;
1271
The simplest constructor that could possibly work. This is used for
1272
creating static objects that have a special meaning and are invisible
1275
Query_log_event::Query_log_event()
1276
:Log_event(), data_buf(0)
1283
Query_log_event::Query_log_event()
1284
thd_arg - thread handle
1285
query_arg - array of char representing the query
1286
query_length - size of the `query_arg' array
1287
using_trans - there is a modified transactional table
1288
suppress_use - suppress the generation of 'USE' statements
1289
killed_status_arg - an optional with default to THD::KILLED_NO_VALUE
1290
if the value is different from the default, the arg
1291
is set to the current thd->killed value.
1292
A caller might need to masquerade thd->killed with
1295
Creates an event for binlogging
1296
The value for local `killed_status' can be supplied by caller.
1298
Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
1299
ulong query_length, bool using_trans,
1301
THD::killed_state killed_status_arg)
1303
(thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
1305
(suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1307
data_buf(0), query(query_arg), catalog(thd_arg->catalog),
1308
db(thd_arg->db), q_len((uint32_t) query_length),
1309
thread_id(thd_arg->thread_id),
1310
/* save the original thread id; we already know the server id */
1311
slave_proxy_id(thd_arg->variables.pseudo_thread_id),
1312
flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1314
auto_increment_increment(thd_arg->variables.auto_increment_increment),
1315
auto_increment_offset(thd_arg->variables.auto_increment_offset),
1316
lc_time_names_number(thd_arg->variables.lc_time_names->number),
1317
charset_database_number(0)
1321
if (killed_status_arg == THD::KILLED_NO_VALUE)
1322
killed_status_arg= thd_arg->killed;
1325
(killed_status_arg == THD::NOT_KILLED) ?
1326
(thd_arg->is_error() ? thd_arg->main_da.sql_errno() : 0) :
1327
(thd_arg->killed_errno());
1330
exec_time = (ulong) (end_time - thd_arg->start_time);
1332
@todo this means that if we have no catalog, then it is replicated
1333
as an existing catalog of length zero. is that safe? /sven
1335
catalog_len = (catalog) ? (uint32_t) strlen(catalog) : 0;
1336
/* status_vars_len is set just before writing the event */
1337
db_len = (db) ? (uint32_t) strlen(db) : 0;
1338
if (thd_arg->variables.collation_database != thd_arg->db_charset)
1339
charset_database_number= thd_arg->variables.collation_database->number;
1342
If we don't use flags2 for anything else than options contained in
1343
thd_arg->options, it would be more efficient to flags2=thd_arg->options
1344
(OPTIONS_WRITTEN_TO_BIN_LOG would be used only at reading time).
1345
But it's likely that we don't want to use 32 bits for 3 bits; in the future
1346
we will probably want to reclaim the 29 bits. So we need the &.
1348
flags2= (uint32_t) (thd_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
1349
assert(thd_arg->variables.character_set_client->number < 256*256);
1350
assert(thd_arg->variables.collation_connection->number < 256*256);
1351
assert(thd_arg->variables.collation_server->number < 256*256);
1352
assert(thd_arg->variables.character_set_client->mbminlen == 1);
1353
int2store(charset, thd_arg->variables.character_set_client->number);
1354
int2store(charset+2, thd_arg->variables.collation_connection->number);
1355
int2store(charset+4, thd_arg->variables.collation_server->number);
1356
if (thd_arg->time_zone_used)
1359
Note that our event becomes dependent on the Time_zone object
1360
representing the time zone. Fortunately such objects are never deleted
1361
or changed during mysqld's lifetime.
1363
time_zone_len= thd_arg->variables.time_zone->get_name()->length();
1364
time_zone_str= thd_arg->variables.time_zone->get_name()->ptr();
1371
/* 2 utility functions for the next method */
1374
Read a string with length from memory.
1376
This function reads the string-with-length stored at
1377
<code>src</code> and extract the length into <code>*len</code> and
1378
a pointer to the start of the string into <code>*dst</code>. The
1379
string can then be copied using <code>memcpy()</code> with the
1380
number of bytes given in <code>*len</code>.
1382
@param src Pointer to variable holding a pointer to the memory to
1383
read the string from.
1384
@param dst Pointer to variable holding a pointer where the actual
1385
string starts. Starting from this position, the string
1386
can be copied using @c memcpy().
1387
@param len Pointer to variable where the length will be stored.
1388
@param end One-past-the-end of the memory where the string is
1391
@return Zero if the entire string can be copied successfully,
1392
@c UINT_MAX if the length could not be read from memory
1393
(that is, if <code>*src >= end</code>), otherwise the
1394
number of bytes that are missing to read the full
1395
string, which happends <code>*dst + *len >= end</code>.
1398
get_str_len_and_pointer(const Log_event::Byte **src,
1401
const Log_event::Byte *end)
1404
return -1; // Will be UINT_MAX in two-complement arithmetics
1405
uint32_t length= **src;
1408
if (*src + length >= end)
1409
return *src + length - end + 1; // Number of bytes missing
1410
*dst= (char *)*src + 1; // Will be copied later
1417
static void copy_str_and_move(const char **src,
1418
Log_event::Byte **dst,
1421
memcpy(*dst, *src, len);
1422
*src= (const char *)*dst;
1429
Macro to check that there is enough space to read from memory.
1431
@param PTR Pointer to memory
1432
@param END End of memory
1433
@param CNT Number of bytes that should be read.
1435
#define CHECK_SPACE(PTR,END,CNT) \
1437
assert((PTR) + (CNT) <= (END)); \
1438
if ((PTR) + (CNT) > (END)) { \
1446
This is used by the SQL slave thread to prepare the event before execution.
1448
Query_log_event::Query_log_event(const char* buf, uint32_t event_len,
1449
const Format_description_log_event
1451
Log_event_type event_type)
1452
:Log_event(buf, description_event), data_buf(0), query(NULL),
1453
db(NULL), catalog_len(0), status_vars_len(0),
1454
flags2_inited(0), sql_mode_inited(0), charset_inited(0),
1455
auto_increment_increment(1), auto_increment_offset(1),
1456
time_zone_len(0), lc_time_names_number(0), charset_database_number(0)
1460
uint8_t common_header_len, post_header_len;
1461
Log_event::Byte *start;
1462
const Log_event::Byte *end;
1465
common_header_len= description_event->common_header_len;
1466
post_header_len= description_event->post_header_len[event_type-1];
1469
We test if the event's length is sensible, and if so we compute data_len.
1470
We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
1471
We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
1473
if (event_len < (uint)(common_header_len + post_header_len))
1475
data_len = event_len - (common_header_len + post_header_len);
1476
buf+= common_header_len;
1478
slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
1479
exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
1480
db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
1481
error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
1484
5.0 format starts here.
1485
Depending on the format, we may or not have affected/warnings etc
1486
The remnent post-header to be parsed has length:
1488
tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
1491
status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
1493
Check if status variable length is corrupt and will lead to very
1494
wrong data. We could be even more strict and require data_len to
1495
be even bigger, but this will suffice to catch most corruption
1496
errors that can lead to a crash.
1498
if (status_vars_len > cmin(data_len, (uint32_t)MAX_SIZE_LOG_EVENT_STATUS))
1503
data_len-= status_vars_len;
1507
We have parsed everything we know in the post header for QUERY_EVENT,
1508
the rest of post header is either comes from older version MySQL or
1509
dedicated to derived events (e.g. Execute_load_query...)
1512
/* variable-part: the status vars; only in MySQL 5.0 */
1514
start= (Log_event::Byte*) (buf+post_header_len);
1515
end= (const Log_event::Byte*) (start+status_vars_len);
1516
for (const Log_event::Byte* pos= start; pos < end;)
1520
CHECK_SPACE(pos, end, 4);
1522
flags2= uint4korr(pos);
1525
case Q_SQL_MODE_CODE:
1527
CHECK_SPACE(pos, end, 8);
1529
sql_mode= (ulong) uint8korr(pos); // QQ: Fix when sql_mode is uint64_t
1533
case Q_CATALOG_NZ_CODE:
1534
if (get_str_len_and_pointer(&pos, &catalog, &catalog_len, end))
1540
case Q_AUTO_INCREMENT:
1541
CHECK_SPACE(pos, end, 4);
1542
auto_increment_increment= uint2korr(pos);
1543
auto_increment_offset= uint2korr(pos+2);
1546
case Q_TIME_ZONE_CODE:
1548
if (get_str_len_and_pointer(&pos, &time_zone_str, &time_zone_len, end))
1555
case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
1556
CHECK_SPACE(pos, end, 1);
1557
if ((catalog_len= *pos))
1558
catalog= (char*) pos+1; // Will be copied later
1559
CHECK_SPACE(pos, end, catalog_len + 2);
1560
pos+= catalog_len+2; // leap over end 0
1561
catalog_nz= 0; // catalog has end 0 in event
1563
case Q_LC_TIME_NAMES_CODE:
1564
CHECK_SPACE(pos, end, 2);
1565
lc_time_names_number= uint2korr(pos);
1568
case Q_CHARSET_DATABASE_CODE:
1569
CHECK_SPACE(pos, end, 2);
1570
charset_database_number= uint2korr(pos);
1574
/* That's why you must write status vars in growing order of code */
1575
pos= (const unsigned char*) end; // Break loop
1579
if (!(start= data_buf = (Log_event::Byte*) my_malloc(catalog_len + 1 +
1584
if (catalog_len) // If catalog is given
1587
@todo we should clean up and do only copy_str_and_move; it
1588
works for both cases. Then we can remove the catalog_nz
1591
if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
1592
copy_str_and_move(&catalog, &start, catalog_len);
1595
memcpy(start, catalog, catalog_len+1); // copy end 0
1596
catalog= (const char *)start;
1597
start+= catalog_len+1;
1601
copy_str_and_move(&time_zone_str, &start, time_zone_len);
1604
if time_zone_len or catalog_len are 0, then time_zone and catalog
1605
are uninitialized at this point. shouldn't they point to the
1606
zero-length null-terminated strings we allocated space for in the
1607
my_alloc call above? /sven
1610
/* A 2nd variable part; this is common to all versions */
1611
memcpy(start, end, data_len); // Copy db and query
1612
start[data_len]= '\0'; // End query with \0 (For safetly)
1614
query= (char *)(start + db_len + 1);
1615
q_len= data_len - db_len -1;
1621
Query_log_event::do_apply_event()
1623
int Query_log_event::do_apply_event(Relay_log_info const *rli)
1625
return do_apply_event(rli, query, q_len);
1631
Compare the values of "affected rows" around here. Something
1634
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1636
sql_print_error("Slave: did not get the expected number of affected \
1637
rows running query from master - expected %d, got %d (this numbers \
1638
should have matched modulo 4294967296).", 0, ...);
1639
thd->query_error = 1;
1642
We may also want an option to tell the slave to ignore "affected"
1643
mismatch. This mismatch could be implemented with a new ER_ code, and
1644
to ignore it you would use --slave-skip-errors...
1646
int Query_log_event::do_apply_event(Relay_log_info const *rli,
1647
const char *query_arg, uint32_t q_len_arg)
1650
int expected_error,actual_error= 0;
1652
Colleagues: please never free(thd->catalog) in MySQL. This would
1653
lead to bugs as here thd->catalog is a part of an alloced block,
1654
not an entire alloced block (see
1655
Query_log_event::do_apply_event()). Same for thd->db. Thank
1658
thd->catalog= catalog_len ? (char *) catalog : (char *)"";
1659
new_db.length= db_len;
1660
new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
1661
thd->set_db(new_db.str, new_db.length); /* allocates a copy of 'db' */
1662
thd->variables.auto_increment_increment= auto_increment_increment;
1663
thd->variables.auto_increment_offset= auto_increment_offset;
1666
InnoDB internally stores the master log position it has executed so far,
1667
i.e. the position just after the COMMIT event.
1668
When InnoDB will want to store, the positions in rli won't have
1669
been updated yet, so group_master_log_* will point to old BEGIN
1670
and event_master_log* will point to the beginning of current COMMIT.
1671
But log_pos of the COMMIT Query event is what we want, i.e. the pos of the
1672
END of the current log event (COMMIT). We save it in rli so that InnoDB can
1675
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
1677
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
1678
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
1681
Note: We do not need to execute reset_one_shot_variables() if this
1683
Reason: The db stored in binlog events is the same for SET and for
1684
its companion query. If the SET is ignored because of
1685
db_ok(), the companion query will also be ignored, and if
1686
the companion query is ignored in the db_ok() test of
1687
::do_apply_event(), then the companion SET also have so
1688
we don't need to reset_one_shot_variables().
1690
if (rpl_filter->db_ok(thd->db))
1692
thd->set_time((time_t)when);
1693
thd->query_length= q_len_arg;
1694
thd->query= (char*)query_arg;
1695
pthread_mutex_lock(&LOCK_thread_count);
1696
thd->query_id = next_query_id();
1697
pthread_mutex_unlock(&LOCK_thread_count);
1698
thd->variables.pseudo_thread_id= thread_id; // for temp tables
1700
if (ignored_error_code((expected_error= error_code)) ||
1701
!check_expected_error(thd,rli,expected_error))
1705
all bits of thd->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
1706
must take their value from flags2.
1708
thd->options= flags2|(thd->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
1711
String tmp(time_zone_str, time_zone_len, &my_charset_bin);
1712
if (!(thd->variables.time_zone= my_tz_find(thd, &tmp)))
1714
my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr());
1715
thd->variables.time_zone= global_system_variables.time_zone;
1716
goto compare_errors;
1719
if (lc_time_names_number)
1721
if (!(thd->variables.lc_time_names=
1722
my_locale_by_number(lc_time_names_number)))
1724
my_printf_error(ER_UNKNOWN_ERROR,
1725
"Unknown locale: '%d'", MYF(0), lc_time_names_number);
1726
thd->variables.lc_time_names= &my_locale_en_US;
1727
goto compare_errors;
1731
thd->variables.lc_time_names= &my_locale_en_US;
1732
if (charset_database_number)
1734
const CHARSET_INFO *cs;
1735
if (!(cs= get_charset(charset_database_number, MYF(0))))
1738
int10_to_str((int) charset_database_number, buf, -10);
1739
my_error(ER_UNKNOWN_COLLATION, MYF(0), buf);
1740
goto compare_errors;
1742
thd->variables.collation_database= cs;
1745
thd->variables.collation_database= thd->db_charset;
1747
/* Execute the query (note that we bypass dispatch_command()) */
1748
const char* found_semicolon= NULL;
1749
mysql_parse(thd, thd->query, thd->query_length, &found_semicolon);
1750
log_slow_statement(thd);
1755
The query got a really bad error on the master (thread killed etc),
1756
which could be inconsistent. Parse it to test the table names: if the
1757
replicate-*-do|ignore-table rules say "this query must be ignored" then
1758
we exit gracefully; otherwise we warn about the bad error and tell DBA
1761
if (mysql_test_parse_for_slave(thd, thd->query, thd->query_length))
1762
clear_all_errors(thd, const_cast<Relay_log_info*>(rli)); /* Can ignore query */
1765
rli->report(ERROR_LEVEL, expected_error,
1766
_("Query partially completed on the master "
1767
"(error on master: %d) and was aborted. There is a "
1768
"chance that your master is inconsistent at this "
1769
"point. If you are sure that your master is ok, run "
1770
"this query manually on the slave and then restart the "
1771
"slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1; "
1772
"START SLAVE; . Query: '%s'"),
1773
expected_error, thd->query);
1774
thd->is_slave_error= 1;
1782
If we expected a non-zero error code, and we don't get the same error
1783
code, and none of them should be ignored.
1785
actual_error= thd->is_error() ? thd->main_da.sql_errno() : 0;
1786
if ((expected_error != actual_error) &&
1788
!ignored_error_code(actual_error) &&
1789
!ignored_error_code(expected_error))
1791
rli->report(ERROR_LEVEL, 0,
1792
_("Query caused differenxt errors on master and slave.\n"
1793
"Error on master: '%s' (%d), Error on slave: '%s' (%d).\n"
1794
"Default database: '%s'. Query: '%s'"),
1795
ER_SAFE(expected_error),
1797
actual_error ? thd->main_da.message() : _("no error"),
1799
print_slave_db_safe(db), query_arg);
1800
thd->is_slave_error= 1;
1803
If we get the same error code as expected, or they should be ignored.
1805
else if (expected_error == actual_error ||
1806
ignored_error_code(actual_error))
1808
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
1809
thd->killed= THD::NOT_KILLED;
1812
Other cases: mostly we expected no error and get one.
1814
else if (thd->is_slave_error || thd->is_fatal_error)
1816
rli->report(ERROR_LEVEL, actual_error,
1817
_("Error '%s' on query. Default database: '%s'. Query: '%s'"),
1818
(actual_error ? thd->main_da.message() :
1819
_("unexpected success or fatal error")),
1820
print_slave_db_safe(thd->db), query_arg);
1821
thd->is_slave_error= 1;
1825
TODO: compare the values of "affected rows" around here. Something
1827
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1829
sql_print_error("Slave: did not get the expected number of affected \
1830
rows running query from master - expected %d, got %d (this numbers \
1831
should have matched modulo 4294967296).", 0, ...);
1832
thd->is_slave_error = 1;
1834
We may also want an option to tell the slave to ignore "affected"
1835
mismatch. This mismatch could be implemented with a new ER_ code, and
1836
to ignore it you would use --slave-skip-errors...
1838
To do the comparison we need to know the value of "affected" which the
1839
above mysql_parse() computed. And we need to know the value of
1840
"affected" in the master's binlog. Both will be implemented later. The
1841
important thing is that we now have the format ready to log the values
1842
of "affected" in the binlog. So we can release 5.0.0 before effectively
1843
logging "affected" and effectively comparing it.
1845
} /* End of if (db_ok(... */
1848
pthread_mutex_lock(&LOCK_thread_count);
1850
Probably we have set thd->query, thd->db, thd->catalog to point to places
1851
in the data_buf of this event. Now the event is going to be deleted
1852
probably, so data_buf will be freed, so the thd->... listed above will be
1853
pointers to freed memory.
1854
So we must set them to 0, so that those bad pointers values are not later
1855
used. Note that "cleanup" queries like automatic DROP TEMPORARY Table
1856
don't suffer from these assignments to 0 as DROP TEMPORARY
1857
Table uses the db.table syntax.
1860
thd->set_db(NULL, 0); /* will free the current database */
1861
thd->query= 0; // just to be sure
1862
thd->query_length= 0;
1863
pthread_mutex_unlock(&LOCK_thread_count);
1864
close_thread_tables(thd);
1866
As a disk space optimization, future masters will not log an event for
1867
LAST_INSERT_ID() if that function returned 0 (and thus they will be able
1868
to replace the THD::stmt_depends_on_first_successful_insert_id_in_prev_stmt
1869
variable by (THD->first_successful_insert_id_in_prev_stmt > 0) ; with the
1870
resetting below we are ready to support that.
1872
thd->first_successful_insert_id_in_prev_stmt_for_binlog= 0;
1873
thd->first_successful_insert_id_in_prev_stmt= 0;
1874
thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
1875
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
1876
return thd->is_slave_error;
1879
int Query_log_event::do_update_pos(Relay_log_info *rli)
1882
Note that we will not increment group* positions if we are just
1883
after a SET ONE_SHOT, because SET ONE_SHOT should not be separated
1884
from its following updating query.
1886
if (thd->one_shot_set)
1888
rli->inc_event_relay_log_pos();
1892
return Log_event::do_update_pos(rli);
1896
Log_event::enum_skip_reason
1897
Query_log_event::do_shall_skip(Relay_log_info *rli)
1899
assert(query && q_len > 0);
1901
if (rli->slave_skip_counter > 0)
1903
if (strcmp("BEGIN", query) == 0)
1905
thd->options|= OPTION_BEGIN;
1906
return(Log_event::continue_group(rli));
1909
if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
1911
thd->options&= ~OPTION_BEGIN;
1912
return(Log_event::EVENT_SKIP_COUNT);
1915
return(Log_event::do_shall_skip(rli));
1919
/**************************************************************************
1920
Start_log_event_v3 methods
1921
**************************************************************************/
1923
Start_log_event_v3::Start_log_event_v3()
1924
:Log_event(), created(0), binlog_version(BINLOG_VERSION),
1925
artificial_event(0), dont_set_created(0)
1927
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
1931
Start_log_event_v3::pack_info()
1934
void Start_log_event_v3::pack_info(Protocol *protocol)
1936
char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
1937
pos= my_stpcpy(buf, "Server ver: ");
1938
pos= my_stpcpy(pos, server_version);
1939
pos= my_stpcpy(pos, ", Binlog ver: ");
1940
pos= int10_to_str(binlog_version, pos, 10);
1941
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
1946
Start_log_event_v3::Start_log_event_v3()
1949
Start_log_event_v3::Start_log_event_v3(const char* buf,
1950
const Format_description_log_event
1952
:Log_event(buf, description_event)
1954
buf+= description_event->common_header_len;
1955
binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET);
1956
memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
1958
// prevent overrun if log is corrupted on disk
1959
server_version[ST_SERVER_VER_LEN-1]= 0;
1960
created= uint4korr(buf+ST_CREATED_OFFSET);
1961
/* We use log_pos to mark if this was an artificial event or not */
1962
artificial_event= (log_pos == 0);
1963
dont_set_created= 1;
1968
Start_log_event_v3::write()
1971
bool Start_log_event_v3::write(IO_CACHE* file)
1973
char buff[START_V3_HEADER_LEN];
1974
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
1975
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
1976
if (!dont_set_created)
1977
created= when= get_time();
1978
int4store(buff + ST_CREATED_OFFSET,created);
1979
return (write_header(file, sizeof(buff)) ||
1980
my_b_safe_write(file, (unsigned char*) buff, sizeof(buff)));
1985
Start_log_event_v3::do_apply_event() .
1989
- To handle the case where the master died without having time to write
1990
DROP TEMPORARY Table, DO RELEASE_LOCK (prepared statements' deletion is
1991
TODO), we clean up all temporary tables that we got, if we are sure we
1995
- Remove all active user locks.
1996
Guilhem 2003-06: this is true but not urgent: the worst it can cause is
1997
the use of a bit of memory for a user lock which will not be used
1998
anymore. If the user lock is later used, the old one will be released. In
1999
other words, no deadlock problem.
2002
int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
2004
switch (binlog_version)
2009
This can either be 4.x (then a Start_log_event_v3 is only at master
2010
startup so we are sure the master has restarted and cleared his temp
2011
tables; the event always has 'created'>0) or 5.0 (then we have to test
2016
close_temporary_tables(thd);
2017
cleanup_load_tmpdir();
2022
Now the older formats; in that case load_tmpdir is cleaned up by the I/O
2026
if (strncmp(rli->relay_log.description_event_for_exec->server_version,
2027
"3.23.57",7) >= 0 && created)
2030
Can distinguish, based on the value of 'created': this event was
2031
generated at master startup.
2033
close_temporary_tables(thd);
2036
Otherwise, can't distinguish a Start_log_event generated at
2037
master startup and one generated by master FLUSH LOGS, so cannot
2038
be sure temp tables have to be dropped. So do nothing.
2042
/* this case is impossible */
2048
/***************************************************************************
2049
Format_description_log_event methods
2050
****************************************************************************/
2053
Format_description_log_event 1st ctor.
2055
Ctor. Can be used to create the event to write to the binary log (when the
2056
server starts or when FLUSH LOGS), or to create artificial events to parse
2057
binlogs from MySQL 3.23 or 4.x.
2058
When in a client, only the 2nd use is possible.
2060
@param binlog_version the binlog version for which we want to build
2061
an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x
2062
x>=2 and 4.1) or 4 (MySQL 5.0). Note that the
2063
old 4.0 (binlog version 2) is not supported;
2064
it should not be used for replication with
2068
Format_description_log_event::
2069
Format_description_log_event(uint8_t binlog_ver, const char* server_ver)
2070
:Start_log_event_v3(), event_type_permutation(0)
2072
binlog_version= binlog_ver;
2073
switch (binlog_ver) {
2074
case 4: /* MySQL 5.0 */
2075
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
2076
common_header_len= LOG_EVENT_HEADER_LEN;
2077
number_of_event_types= LOG_EVENT_TYPES;
2078
/* we'll catch my_malloc() error in is_valid() */
2079
post_header_len=(uint8_t*) my_malloc(number_of_event_types*sizeof(uint8_t),
2082
This long list of assignments is not beautiful, but I see no way to
2083
make it nicer, as the right members are #defines, not array members, so
2084
it's impossible to write a loop.
2086
if (post_header_len)
2088
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2089
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN;
2090
post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
2091
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2092
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2093
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2094
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2095
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2096
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
2097
post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
2098
post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN;
2099
post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2100
post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2101
post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2102
post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1];
2103
post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
2104
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
2105
post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
2110
case 3: /* 4.0.x x>=2 */
2112
We build an artificial (i.e. not sent by the master) event, which
2113
describes what those old master versions send.
2116
my_stpcpy(server_version, server_ver ? server_ver : "3.23");
2118
my_stpcpy(server_version, server_ver ? server_ver : "4.0");
2119
common_header_len= binlog_ver==1 ? OLD_HEADER_LEN :
2120
LOG_EVENT_MINIMAL_HEADER_LEN;
2122
The first new event in binlog version 4 is Format_desc. So any event type
2123
after that does not exist in older versions. We use the events known by
2124
version 3, even if version 1 had only a subset of them (this is not a
2125
problem: it uses a few bytes for nothing but unifies code; it does not
2126
make the slave detect less corruptions).
2128
number_of_event_types= FORMAT_DESCRIPTION_EVENT - 1;
2129
post_header_len=(uint8_t*) my_malloc(number_of_event_types*sizeof(uint8_t),
2131
if (post_header_len)
2133
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2134
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_MINIMAL_LEN;
2135
post_header_len[STOP_EVENT-1]= 0;
2136
post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN;
2137
post_header_len[INTVAR_EVENT-1]= 0;
2138
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2139
post_header_len[SLAVE_EVENT-1]= 0;
2140
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2141
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2142
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2143
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2144
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
2145
post_header_len[RAND_EVENT-1]= 0;
2146
post_header_len[USER_VAR_EVENT-1]= 0;
2149
default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
2150
post_header_len= 0; /* will make is_valid() fail */
2153
calc_server_version_split();
2158
The problem with this constructor is that the fixed header may have a
2159
length different from this version, but we don't know this length as we
2160
have not read the Format_description_log_event which says it, yet. This
2161
length is in the post-header of the event, but we don't know where the
2164
So this type of event HAS to:
2165
- either have the header's length at the beginning (in the header, at a
2166
fixed position which will never be changed), not in the post-header. That
2167
would make the header be "shifted" compared to other events.
2168
- or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future
2169
versions, so that we know for sure.
2171
I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because
2172
it is sent before Format_description_log_event).
2175
Format_description_log_event::
2176
Format_description_log_event(const char* buf,
2179
Format_description_log_event*
2181
:Start_log_event_v3(buf, description_event), event_type_permutation(0)
2183
buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
2184
if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
2185
return; /* sanity check */
2186
number_of_event_types=
2187
event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1);
2188
/* If alloc fails, we'll detect it in is_valid() */
2189
post_header_len= (uint8_t*) my_memdup((unsigned char*)buf+ST_COMMON_HEADER_LEN_OFFSET+1,
2190
number_of_event_types*
2191
sizeof(*post_header_len), MYF(0));
2192
calc_server_version_split();
2195
In some previous versions, the events were given other event type
2196
id numbers than in the present version. When replicating from such
2197
a version, we therefore set up an array that maps those id numbers
2198
to the id numbers of the present server.
2200
If post_header_len is null, it means malloc failed, and is_valid
2201
will fail, so there is no need to do anything.
2203
The trees in which events have wrong id's are:
2205
mysql-5.1-wl1012.old mysql-5.1-wl2325-5.0-drop6p13-alpha
2206
mysql-5.1-wl2325-5.0-drop6 mysql-5.1-wl2325-5.0
2207
mysql-5.1-wl2325-no-dd
2209
(this was found by grepping for two lines in sequence where the
2210
first matches "FORMAT_DESCRIPTION_EVENT," and the second matches
2211
"TABLE_MAP_EVENT," in log_event.h in all trees)
2213
In these trees, the following server_versions existed since
2214
TABLE_MAP_EVENT was introduced:
2216
5.1.1-a_drop5p3 5.1.1-a_drop5p4 5.1.1-alpha
2217
5.1.2-a_drop5p10 5.1.2-a_drop5p11 5.1.2-a_drop5p12
2218
5.1.2-a_drop5p13 5.1.2-a_drop5p14 5.1.2-a_drop5p15
2219
5.1.2-a_drop5p16 5.1.2-a_drop5p16b 5.1.2-a_drop5p16c
2220
5.1.2-a_drop5p17 5.1.2-a_drop5p4 5.1.2-a_drop5p5
2221
5.1.2-a_drop5p6 5.1.2-a_drop5p7 5.1.2-a_drop5p8
2222
5.1.2-a_drop5p9 5.1.3-a_drop5p17 5.1.3-a_drop5p17b
2223
5.1.3-a_drop5p17c 5.1.4-a_drop5p18 5.1.4-a_drop5p19
2224
5.1.4-a_drop5p20 5.1.4-a_drop6p0 5.1.4-a_drop6p1
2225
5.1.4-a_drop6p2 5.1.5-a_drop5p20 5.2.0-a_drop6p3
2226
5.2.0-a_drop6p4 5.2.0-a_drop6p5 5.2.0-a_drop6p6
2227
5.2.1-a_drop6p10 5.2.1-a_drop6p11 5.2.1-a_drop6p12
2228
5.2.1-a_drop6p6 5.2.1-a_drop6p7 5.2.1-a_drop6p8
2229
5.2.2-a_drop6p13 5.2.2-a_drop6p13-alpha 5.2.2-a_drop6p13b
2232
(this was found by grepping for "mysql," in all historical
2233
versions of configure.in in the trees listed above).
2235
There are 5.1.1-alpha versions that use the new event id's, so we
2236
do not test that version string. So replication from 5.1.1-alpha
2237
with the other event id's to a new version does not work.
2238
Moreover, we can safely ignore the part after drop[56]. This
2239
allows us to simplify the big list above to the following regexes:
2241
5\.1\.[1-5]-a_drop5.*
2243
5\.2\.[0-2]-a_drop6.*
2245
This is what we test for in the 'if' below.
2247
if (post_header_len &&
2248
server_version[0] == '5' && server_version[1] == '.' &&
2249
server_version[3] == '.' &&
2250
strncmp(server_version + 5, "-a_drop", 7) == 0 &&
2251
((server_version[2] == '1' &&
2252
server_version[4] >= '1' && server_version[4] <= '5' &&
2253
server_version[12] == '5') ||
2254
(server_version[2] == '1' &&
2255
server_version[4] == '4' &&
2256
server_version[12] == '6') ||
2257
(server_version[2] == '2' &&
2258
server_version[4] >= '0' && server_version[4] <= '2' &&
2259
server_version[12] == '6')))
2261
if (number_of_event_types != 22)
2263
/* this makes is_valid() return false. */
2264
free(post_header_len);
2265
post_header_len= NULL;
2268
static const uint8_t perm[23]=
2270
UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
2271
INTVAR_EVENT, LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
2272
APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
2274
RAND_EVENT, USER_VAR_EVENT,
2275
FORMAT_DESCRIPTION_EVENT,
2277
PRE_GA_WRITE_ROWS_EVENT,
2278
PRE_GA_UPDATE_ROWS_EVENT,
2279
PRE_GA_DELETE_ROWS_EVENT,
2281
BEGIN_LOAD_QUERY_EVENT,
2282
EXECUTE_LOAD_QUERY_EVENT,
2284
event_type_permutation= perm;
2286
Since we use (permuted) event id's to index the post_header_len
2287
array, we need to permute the post_header_len array too.
2289
uint8_t post_header_len_temp[23];
2290
for (int i= 1; i < 23; i++)
2291
post_header_len_temp[perm[i] - 1]= post_header_len[i - 1];
2292
for (int i= 0; i < 22; i++)
2293
post_header_len[i] = post_header_len_temp[i];
2298
bool Format_description_log_event::write(IO_CACHE* file)
2301
We don't call Start_log_event_v3::write() because this would make 2
2304
unsigned char buff[FORMAT_DESCRIPTION_HEADER_LEN];
2305
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
2306
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
2307
if (!dont_set_created)
2308
created= when= get_time();
2309
int4store(buff + ST_CREATED_OFFSET,created);
2310
buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
2311
memcpy(buff+ST_COMMON_HEADER_LEN_OFFSET+1, post_header_len,
2313
return (write_header(file, sizeof(buff)) ||
2314
my_b_safe_write(file, buff, sizeof(buff)));
2318
int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
2321
As a transaction NEVER spans on 2 or more binlogs:
2322
if we have an active transaction at this point, the master died
2323
while writing the transaction to the binary log, i.e. while
2324
flushing the binlog cache to the binlog. XA guarantees that master has
2325
rolled back. So we roll back.
2326
Note: this event could be sent by the master to inform us of the
2327
format of its binlog; in other words maybe it is not at its
2328
original place when it comes to us; we'll know this by checking
2329
log_pos ("artificial" events have log_pos == 0).
2331
if (!artificial_event && created && thd->transaction.all.ha_list)
2333
/* This is not an error (XA is safe), just an information */
2334
rli->report(INFORMATION_LEVEL, 0,
2335
_("Rolling back unfinished transaction (no COMMIT "
2336
"or ROLLBACK in relay log). A probable cause is that "
2337
"the master died while writing the transaction to "
2338
"its binary log, thus rolled back too."));
2339
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 1);
2342
If this event comes from ourselves, there is no cleaning task to
2343
perform, we don't call Start_log_event_v3::do_apply_event()
2344
(this was just to update the log's description event).
2346
if (server_id != (uint32_t) ::server_id)
2349
If the event was not requested by the slave i.e. the master sent
2350
it while the slave asked for a position >4, the event will make
2351
rli->group_master_log_pos advance. Say that the slave asked for
2352
position 1000, and the Format_desc event's end is 96. Then in
2353
the beginning of replication rli->group_master_log_pos will be
2354
0, then 96, then jump to first really asked event (which is
2355
>96). So this is ok.
2357
return(Start_log_event_v3::do_apply_event(rli));
2362
int Format_description_log_event::do_update_pos(Relay_log_info *rli)
2364
/* save the information describing this binlog */
2365
delete rli->relay_log.description_event_for_exec;
2366
rli->relay_log.description_event_for_exec= this;
2368
if (server_id == (uint32_t) ::server_id)
2371
We only increase the relay log position if we are skipping
2372
events and do not touch any group_* variables, nor flush the
2373
relay log info. If there is a crash, we will have to re-skip
2374
the events again, but that is a minor issue.
2376
If we do not skip stepping the group log position (and the
2377
server id was changed when restarting the server), it might well
2378
be that we start executing at a position that is invalid, e.g.,
2379
at a Rows_log_event or a Query_log_event preceeded by a
2380
Intvar_log_event instead of starting at a Table_map_log_event or
2381
the Intvar_log_event respectively.
2383
rli->inc_event_relay_log_pos();
2388
return Log_event::do_update_pos(rli);
2392
Log_event::enum_skip_reason
2393
Format_description_log_event::do_shall_skip(Relay_log_info *rli __attribute__((unused)))
2395
return Log_event::EVENT_SKIP_NOT;
2400
Splits the event's 'server_version' string into three numeric pieces stored
2401
into 'server_version_split':
2402
X.Y.Zabc (X,Y,Z numbers, a not a digit) -> {X,Y,Z}
2405
'server_version_split' is then used for lookups to find if the server which
2406
created this event has some known bug.
2408
void Format_description_log_event::calc_server_version_split()
2410
char *p= server_version, *r;
2412
for (uint32_t i= 0; i<=2; i++)
2414
number= strtoul(p, &r, 10);
2415
server_version_split[i]= (unsigned char)number;
2416
assert(number < 256); // fit in unsigned char
2418
assert(!((i == 0) && (*r != '.'))); // should be true in practice
2420
p++; // skip the dot
2425
/**************************************************************************
2426
Load_log_event methods
2427
General note about Load_log_event: the binlogging of LOAD DATA INFILE is
2428
going to be changed in 5.0 (or maybe in 5.1; not decided yet).
2429
However, the 5.0 slave could still have to read such events (from a 4.x
2430
master), convert them (which just means maybe expand the header, when 5.0
2431
servers have a UID in events) (remember that whatever is after the header
2432
will be like in 4.x, as this event's format is not modified in 5.0 as we
2433
will use new types of events to log the new LOAD DATA INFILE features).
2434
To be able to read/convert, we just need to not assume that the common
2435
header is of length LOG_EVENT_HEADER_LEN (we must use the description
2437
Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE
2438
between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the
2439
positions displayed in SHOW SLAVE STATUS then are fine too).
2440
**************************************************************************/
2443
Load_log_event::pack_info()
2446
uint32_t Load_log_event::get_query_buffer_length()
2449
5 + db_len + 3 + // "use DB; "
2450
18 + fname_len + 2 + // "LOAD DATA INFILE 'file''"
2452
9 + // " REPLACE or IGNORE "
2453
13 + table_name_len*2 + // "INTO Table `table`"
2454
21 + sql_ex.field_term_len*4 + 2 + // " FIELDS TERMINATED BY 'str'"
2455
23 + sql_ex.enclosed_len*4 + 2 + // " OPTIONALLY ENCLOSED BY 'str'"
2456
12 + sql_ex.escaped_len*4 + 2 + // " ESCAPED BY 'str'"
2457
21 + sql_ex.line_term_len*4 + 2 + // " LINES TERMINATED BY 'str'"
2458
19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'"
2459
15 + 22 + // " IGNORE xxx LINES"
2460
3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)"
2464
void Load_log_event::print_query(bool need_db, char *buf,
2465
char **end, char **fn_start, char **fn_end)
2469
if (need_db && db && db_len)
2471
pos= my_stpcpy(pos, "use `");
2472
memcpy(pos, db, db_len);
2473
pos= my_stpcpy(pos+db_len, "`; ");
2476
pos= my_stpcpy(pos, "LOAD DATA ");
2481
if (check_fname_outside_temp_buf())
2482
pos= my_stpcpy(pos, "LOCAL ");
2483
pos= my_stpcpy(pos, "INFILE '");
2484
memcpy(pos, fname, fname_len);
2485
pos= my_stpcpy(pos+fname_len, "' ");
2487
if (sql_ex.opt_flags & REPLACE_FLAG)
2488
pos= my_stpcpy(pos, " REPLACE ");
2489
else if (sql_ex.opt_flags & IGNORE_FLAG)
2490
pos= my_stpcpy(pos, " IGNORE ");
2492
pos= my_stpcpy(pos ,"INTO");
2497
pos= my_stpcpy(pos ," Table `");
2498
memcpy(pos, table_name, table_name_len);
2499
pos+= table_name_len;
2501
/* We have to create all optinal fields as the default is not empty */
2502
pos= my_stpcpy(pos, "` FIELDS TERMINATED BY ");
2503
pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len);
2504
if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
2505
pos= my_stpcpy(pos, " OPTIONALLY ");
2506
pos= my_stpcpy(pos, " ENCLOSED BY ");
2507
pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len);
2509
pos= my_stpcpy(pos, " ESCAPED BY ");
2510
pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
2512
pos= my_stpcpy(pos, " LINES TERMINATED BY ");
2513
pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
2514
if (sql_ex.line_start_len)
2516
pos= my_stpcpy(pos, " STARTING BY ");
2517
pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
2520
if ((long) skip_lines > 0)
2522
pos= my_stpcpy(pos, " IGNORE ");
2523
pos= int64_t10_to_str((int64_t) skip_lines, pos, 10);
2524
pos= my_stpcpy(pos," LINES ");
2530
const char *field= fields;
2531
pos= my_stpcpy(pos, " (");
2532
for (i = 0; i < num_fields; i++)
2539
memcpy(pos, field, field_lens[i]);
2540
pos+= field_lens[i];
2541
field+= field_lens[i] + 1;
2550
void Load_log_event::pack_info(Protocol *protocol)
2554
if (!(buf= (char*) my_malloc(get_query_buffer_length(), MYF(MY_WME))))
2556
print_query(true, buf, &end, 0, 0);
2557
protocol->store(buf, end-buf, &my_charset_bin);
2563
Load_log_event::write_data_header()
2566
bool Load_log_event::write_data_header(IO_CACHE* file)
2568
char buf[LOAD_HEADER_LEN];
2569
int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
2570
int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
2571
int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
2572
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
2573
buf[L_DB_LEN_OFFSET] = (char)db_len;
2574
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
2575
return my_b_safe_write(file, (unsigned char*)buf, LOAD_HEADER_LEN) != 0;
2580
Load_log_event::write_data_body()
2583
bool Load_log_event::write_data_body(IO_CACHE* file)
2585
if (sql_ex.write_data(file))
2587
if (num_fields && fields && field_lens)
2589
if (my_b_safe_write(file, (unsigned char*)field_lens, num_fields) ||
2590
my_b_safe_write(file, (unsigned char*)fields, field_block_len))
2593
return (my_b_safe_write(file, (unsigned char*)table_name, table_name_len + 1) ||
2594
my_b_safe_write(file, (unsigned char*)db, db_len + 1) ||
2595
my_b_safe_write(file, (unsigned char*)fname, fname_len));
2600
Load_log_event::Load_log_event()
2603
Load_log_event::Load_log_event(THD *thd_arg, sql_exchange *ex,
2604
const char *db_arg, const char *table_name_arg,
2605
List<Item> &fields_arg,
2606
enum enum_duplicates handle_dup,
2607
bool ignore, bool using_trans)
2609
thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
2611
thread_id(thd_arg->thread_id),
2612
slave_proxy_id(thd_arg->variables.pseudo_thread_id),
2613
num_fields(0),fields(0),
2614
field_lens(0),field_block_len(0),
2615
table_name(table_name_arg ? table_name_arg : ""),
2616
db(db_arg), fname(ex->file_name), local_fname(false)
2620
exec_time = (ulong) (end_time - thd_arg->start_time);
2621
/* db can never be a zero pointer in 4.0 */
2622
db_len = (uint32_t) strlen(db);
2623
table_name_len = (uint32_t) strlen(table_name);
2624
fname_len = (fname) ? (uint) strlen(fname) : 0;
2625
sql_ex.field_term = (char*) ex->field_term->ptr();
2626
sql_ex.field_term_len = (uint8_t) ex->field_term->length();
2627
sql_ex.enclosed = (char*) ex->enclosed->ptr();
2628
sql_ex.enclosed_len = (uint8_t) ex->enclosed->length();
2629
sql_ex.line_term = (char*) ex->line_term->ptr();
2630
sql_ex.line_term_len = (uint8_t) ex->line_term->length();
2631
sql_ex.line_start = (char*) ex->line_start->ptr();
2632
sql_ex.line_start_len = (uint8_t) ex->line_start->length();
2633
sql_ex.escaped = (char*) ex->escaped->ptr();
2634
sql_ex.escaped_len = (uint8_t) ex->escaped->length();
2635
sql_ex.opt_flags = 0;
2636
sql_ex.cached_new_format = -1;
2639
sql_ex.opt_flags|= DUMPFILE_FLAG;
2640
if (ex->opt_enclosed)
2641
sql_ex.opt_flags|= OPT_ENCLOSED_FLAG;
2643
sql_ex.empty_flags= 0;
2645
switch (handle_dup) {
2647
sql_ex.opt_flags|= REPLACE_FLAG;
2649
case DUP_UPDATE: // Impossible here
2654
sql_ex.opt_flags|= IGNORE_FLAG;
2656
if (!ex->field_term->length())
2657
sql_ex.empty_flags |= FIELD_TERM_EMPTY;
2658
if (!ex->enclosed->length())
2659
sql_ex.empty_flags |= ENCLOSED_EMPTY;
2660
if (!ex->line_term->length())
2661
sql_ex.empty_flags |= LINE_TERM_EMPTY;
2662
if (!ex->line_start->length())
2663
sql_ex.empty_flags |= LINE_START_EMPTY;
2664
if (!ex->escaped->length())
2665
sql_ex.empty_flags |= ESCAPED_EMPTY;
2667
skip_lines = ex->skip_lines;
2669
List_iterator<Item> li(fields_arg);
2670
field_lens_buf.length(0);
2671
fields_buf.length(0);
2673
while ((item = li++))
2676
unsigned char len = (unsigned char) strlen(item->name);
2677
field_block_len += len + 1;
2678
fields_buf.append(item->name, len + 1);
2679
field_lens_buf.append((char*)&len, 1);
2682
field_lens = (const unsigned char*)field_lens_buf.ptr();
2683
fields = fields_buf.ptr();
2689
The caller must do buf[event_len] = 0 before he starts using the
2692
Load_log_event::Load_log_event(const char *buf, uint32_t event_len,
2693
const Format_description_log_event *description_event)
2694
:Log_event(buf, description_event), num_fields(0), fields(0),
2695
field_lens(0),field_block_len(0),
2696
table_name(0), db(0), fname(0), local_fname(false)
2699
I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0,
2700
4.0->5.0 and 5.0->5.0 and it works.
2703
copy_log_event(buf, event_len,
2704
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
2706
description_event->common_header_len :
2707
LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
2709
/* otherwise it's a derived class, will call copy_log_event() itself */
2715
Load_log_event::copy_log_event()
2718
int Load_log_event::copy_log_event(const char *buf, ulong event_len,
2720
const Format_description_log_event *description_event)
2723
char* buf_end = (char*)buf + event_len;
2724
/* this is the beginning of the post-header */
2725
const char* data_head = buf + description_event->common_header_len;
2726
slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
2727
exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
2728
skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
2729
table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
2730
db_len = (uint)data_head[L_DB_LEN_OFFSET];
2731
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
2733
if ((int) event_len < body_offset)
2736
Sql_ex.init() on success returns the pointer to the first byte after
2737
the sql_ex structure, which is the start of field lengths array.
2739
if (!(field_lens= (unsigned char*)sql_ex.init((char*)buf + body_offset,
2741
buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
2744
data_len = event_len - body_offset;
2745
if (num_fields > data_len) // simple sanity check against corruption
2747
for (uint32_t i = 0; i < num_fields; i++)
2748
field_block_len += (uint)field_lens[i] + 1;
2750
fields = (char*)field_lens + num_fields;
2751
table_name = fields + field_block_len;
2752
db = table_name + table_name_len + 1;
2753
fname = db + db_len + 1;
2754
fname_len = strlen(fname);
2755
// null termination is accomplished by the caller doing buf[event_len]=0
2762
Load_log_event::set_fields()
2765
This function can not use the member variable
2766
for the database, since LOAD DATA INFILE on the slave
2767
can be for a different database than the current one.
2768
This is the reason for the affected_db argument to this method.
2771
void Load_log_event::set_fields(const char* affected_db,
2772
List<Item> &field_list,
2773
Name_resolution_context *context)
2776
const char* field = fields;
2777
for (i= 0; i < num_fields; i++)
2779
field_list.push_back(new Item_field(context,
2780
affected_db, table_name, field));
2781
field+= field_lens[i] + 1;
2787
Does the data loading job when executing a LOAD DATA on the slave.
2791
@param use_rli_only_for_errors If set to 1, rli is provided to
2792
Load_log_event::exec_event only for this
2793
function to have RPL_LOG_NAME and
2794
rli->last_slave_error, both being used by
2795
error reports. rli's position advancing
2796
is skipped (done by the caller which is
2797
Execute_load_log_event::exec_event).
2798
If set to 0, rli is provided for full use,
2799
i.e. for error reports and position
2803
fix this; this can be done by testing rules in
2804
Create_file_log_event::exec_event() and then discarding Append_block and
2807
this is a bug - this needs to be moved to the I/O thread
2815
int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
2816
bool use_rli_only_for_errors)
2819
new_db.length= db_len;
2820
new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
2821
thd->set_db(new_db.str, new_db.length);
2822
assert(thd->query == 0);
2823
thd->query_length= 0; // Should not be needed
2824
thd->is_slave_error= 0;
2825
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
2827
/* see Query_log_event::do_apply_event() and BUG#13360 */
2828
assert(!rli->m_table_map.count());
2830
Usually lex_start() is called by mysql_parse(), but we need it here
2831
as the present method does not call mysql_parse().
2834
mysql_reset_thd_for_next_command(thd);
2836
if (!use_rli_only_for_errors)
2839
Saved for InnoDB, see comment in
2840
Query_log_event::do_apply_event()
2842
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
2846
We test replicate_*_db rules. Note that we have already prepared
2847
the file to load, even if we are going to ignore and delete it
2848
now. So it is possible that we did a lot of disk writes for
2849
nothing. In other words, a big LOAD DATA INFILE on the master will
2850
still consume a lot of space on the slave (space in the relay log
2851
+ space of temp files: twice the space of the file to load...)
2852
even if it will finally be ignored. TODO: fix this; this can be
2853
done by testing rules in Create_file_log_event::do_apply_event()
2854
and then discarding Append_block and al. Another way is do the
2855
filtering in the I/O thread (more efficient: no disk writes at
2859
Note: We do not need to execute reset_one_shot_variables() if this
2861
Reason: The db stored in binlog events is the same for SET and for
2862
its companion query. If the SET is ignored because of
2863
db_ok(), the companion query will also be ignored, and if
2864
the companion query is ignored in the db_ok() test of
2865
::do_apply_event(), then the companion SET also have so
2866
we don't need to reset_one_shot_variables().
2868
if (rpl_filter->db_ok(thd->db))
2870
thd->set_time((time_t)when);
2871
pthread_mutex_lock(&LOCK_thread_count);
2872
thd->query_id = next_query_id();
2873
pthread_mutex_unlock(&LOCK_thread_count);
2875
Initing thd->row_count is not necessary in theory as this variable has no
2876
influence in the case of the slave SQL thread (it is used to generate a
2877
"data truncated" warning but which is absorbed and never gets to the
2878
error log); still we init it to avoid a Valgrind message.
2880
drizzle_reset_errors(thd, 0);
2883
memset(&tables, 0, sizeof(tables));
2884
tables.db= thd->strmake(thd->db, thd->db_length);
2885
tables.alias = tables.table_name = (char*) table_name;
2886
tables.lock_type = TL_WRITE;
2889
// the table will be opened in mysql_load
2890
if (rpl_filter->is_on() && !rpl_filter->tables_ok(thd->db, &tables))
2892
// TODO: this is a bug - this needs to be moved to the I/O thread
2894
skip_load_data_infile(net);
2900
enum enum_duplicates handle_dup;
2902
char *load_data_query;
2905
Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST
2906
and written to slave's binlog if binlogging is on.
2908
if (!(load_data_query= (char *)thd->alloc(get_query_buffer_length() + 1)))
2911
This will set thd->fatal_error in case of OOM. So we surely will notice
2912
that something is wrong.
2917
print_query(false, load_data_query, &end, (char **)&thd->lex->fname_start,
2918
(char **)&thd->lex->fname_end);
2920
thd->query_length= end - load_data_query;
2921
thd->query= load_data_query;
2923
if (sql_ex.opt_flags & REPLACE_FLAG)
2925
handle_dup= DUP_REPLACE;
2927
else if (sql_ex.opt_flags & IGNORE_FLAG)
2930
handle_dup= DUP_ERROR;
2935
When replication is running fine, if it was DUP_ERROR on the
2936
master then we could choose IGNORE here, because if DUP_ERROR
2937
suceeded on master, and data is identical on the master and slave,
2938
then there should be no uniqueness errors on slave, so IGNORE is
2939
the same as DUP_ERROR. But in the unlikely case of uniqueness errors
2940
(because the data on the master and slave happen to be different
2941
(user error or bug), we want LOAD DATA to print an error message on
2942
the slave to discover the problem.
2944
If reading from net (a 3.23 master), mysql_load() will change this
2947
handle_dup= DUP_ERROR;
2950
We need to set thd->lex->sql_command and thd->lex->duplicates
2951
since InnoDB tests these variables to decide if this is a LOAD
2952
DATA ... REPLACE INTO ... statement even though mysql_parse()
2953
is not called. This is not needed in 5.0 since there the LOAD
2954
DATA ... statement is replicated using mysql_parse(), which
2955
sets the thd->lex fields correctly.
2957
thd->lex->sql_command= SQLCOM_LOAD;
2958
thd->lex->duplicates= handle_dup;
2960
sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
2961
String field_term(sql_ex.field_term,sql_ex.field_term_len,log_cs);
2962
String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,log_cs);
2963
String line_term(sql_ex.line_term,sql_ex.line_term_len,log_cs);
2964
String line_start(sql_ex.line_start,sql_ex.line_start_len,log_cs);
2965
String escaped(sql_ex.escaped,sql_ex.escaped_len, log_cs);
2966
ex.field_term= &field_term;
2967
ex.enclosed= &enclosed;
2968
ex.line_term= &line_term;
2969
ex.line_start= &line_start;
2970
ex.escaped= &escaped;
2972
ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
2973
if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
2974
ex.field_term->length(0);
2976
ex.skip_lines = skip_lines;
2977
List<Item> field_list;
2978
thd->lex->select_lex.context.resolve_in_table_list_only(&tables);
2979
set_fields(tables.db, field_list, &thd->lex->select_lex.context);
2980
thd->variables.pseudo_thread_id= thread_id;
2983
// mysql_load will use thd->net to read the file
2984
thd->net.vio = net->vio;
2986
Make sure the client does not get confused about the packet sequence
2988
thd->net.pkt_nr = net->pkt_nr;
2991
It is safe to use tmp_list twice because we are not going to
2992
update it inside mysql_load().
2994
List<Item> tmp_list;
2995
if (mysql_load(thd, &ex, &tables, field_list, tmp_list, tmp_list,
2996
handle_dup, ignore, net != 0))
2997
thd->is_slave_error= 1;
2998
if (thd->cuted_fields)
3000
/* log_pos is the position of the LOAD event in the master log */
3001
sql_print_warning(_("Slave: load data infile on table '%s' at "
3002
"log position %s in log '%s' produced %ld "
3003
"warning(s). Default database: '%s'"),
3005
llstr(log_pos,llbuff), RPL_LOG_NAME,
3006
(ulong) thd->cuted_fields,
3007
print_slave_db_safe(thd->db));
3010
net->pkt_nr= thd->net.pkt_nr;
3016
We will just ask the master to send us /dev/null if we do not
3017
want to load the data.
3018
TODO: this a bug - needs to be done in I/O thread
3021
skip_load_data_infile(net);
3026
const char *remember_db= thd->db;
3027
pthread_mutex_lock(&LOCK_thread_count);
3029
thd->set_db(NULL, 0); /* will free the current database */
3031
thd->query_length= 0;
3032
pthread_mutex_unlock(&LOCK_thread_count);
3033
close_thread_tables(thd);
3035
if (thd->is_slave_error)
3037
/* this err/sql_errno code is copy-paste from net_send_error() */
3040
if (thd->is_error())
3042
err= thd->main_da.message();
3043
sql_errno= thd->main_da.sql_errno();
3047
sql_errno=ER_UNKNOWN_ERROR;
3050
rli->report(ERROR_LEVEL, sql_errno,
3051
_("Error '%s' running LOAD DATA INFILE on table '%s'. "
3052
"Default database: '%s'"),
3053
err, (char*)table_name, print_slave_db_safe(remember_db));
3054
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
3057
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
3059
if (thd->is_fatal_error)
3062
snprintf(buf, sizeof(buf),
3063
_("Running LOAD DATA INFILE on table '%-.64s'."
3064
" Default database: '%-.64s'"),
3066
print_slave_db_safe(remember_db));
3068
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
3069
ER(ER_SLAVE_FATAL_ERROR), buf);
3073
return ( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) );
3077
/**************************************************************************
3078
Rotate_log_event methods
3079
**************************************************************************/
3082
Rotate_log_event::pack_info()
3085
void Rotate_log_event::pack_info(Protocol *protocol)
3087
char buf1[256], buf[22];
3088
String tmp(buf1, sizeof(buf1), log_cs);
3090
tmp.append(new_log_ident, ident_len);
3091
tmp.append(STRING_WITH_LEN(";pos="));
3092
tmp.append(llstr(pos,buf));
3093
protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
3098
Rotate_log_event::Rotate_log_event() (2 constructors)
3102
Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
3103
uint32_t ident_len_arg, uint64_t pos_arg,
3105
:Log_event(), new_log_ident(new_log_ident_arg),
3106
pos(pos_arg),ident_len(ident_len_arg ? ident_len_arg :
3107
(uint) strlen(new_log_ident_arg)), flags(flags_arg)
3109
if (flags & DUP_NAME)
3110
new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME));
3115
Rotate_log_event::Rotate_log_event(const char* buf, uint32_t event_len,
3116
const Format_description_log_event* description_event)
3117
:Log_event(buf, description_event) ,new_log_ident(0), flags(DUP_NAME)
3119
// The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
3120
uint8_t header_size= description_event->common_header_len;
3121
uint8_t post_header_len= description_event->post_header_len[ROTATE_EVENT-1];
3122
uint32_t ident_offset;
3123
if (event_len < header_size)
3126
pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
3127
ident_len = (uint)(event_len -
3128
(header_size+post_header_len));
3129
ident_offset = post_header_len;
3130
set_if_smaller(ident_len,FN_REFLEN-1);
3131
new_log_ident= my_strndup(buf + ident_offset, (uint) ident_len, MYF(MY_WME));
3137
Rotate_log_event::write()
3140
bool Rotate_log_event::write(IO_CACHE* file)
3142
char buf[ROTATE_HEADER_LEN];
3143
int8store(buf + R_POS_OFFSET, pos);
3144
return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
3145
my_b_safe_write(file, (unsigned char*)buf, ROTATE_HEADER_LEN) ||
3146
my_b_safe_write(file, (unsigned char*)new_log_ident, (uint) ident_len));
3151
Got a rotate log event from the master.
3153
This is mainly used so that we can later figure out the logname and
3154
position for the master.
3156
We can't rotate the slave's BINlog as this will cause infinitive rotations
3157
in a A -> B -> A setup.
3158
The NOTES below is a wrong comment which will disappear when 4.1 is merged.
3163
int Rotate_log_event::do_update_pos(Relay_log_info *rli)
3165
pthread_mutex_lock(&rli->data_lock);
3166
rli->event_relay_log_pos= my_b_tell(rli->cur_log);
3168
If we are in a transaction or in a group: the only normal case is
3169
when the I/O thread was copying a big transaction, then it was
3170
stopped and restarted: we have this in the relay log:
3178
In that case, we don't want to touch the coordinates which
3179
correspond to the beginning of the transaction. Starting from
3180
5.0.0, there also are some rotates from the slave itself, in the
3181
relay log, which shall not change the group positions.
3183
if ((server_id != ::server_id || rli->replicate_same_server_id) &&
3184
!rli->is_in_group())
3186
rli->group_master_log_name.assign(new_log_ident, ident_len+1);
3187
rli->notify_group_master_log_name_update();
3188
rli->group_master_log_pos= pos;
3189
rli->group_relay_log_name.assign(rli->event_relay_log_name);
3190
rli->notify_group_relay_log_name_update();
3191
rli->group_relay_log_pos= rli->event_relay_log_pos;
3193
Reset thd->options and sql_mode etc, because this could be the signal of
3194
a master's downgrade from 5.0 to 4.0.
3195
However, no need to reset description_event_for_exec: indeed, if the next
3196
master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
3197
master is 4.0 then the events are in the slave's format (conversion).
3199
set_slave_thread_options(thd);
3200
thd->variables.auto_increment_increment=
3201
thd->variables.auto_increment_offset= 1;
3203
pthread_mutex_unlock(&rli->data_lock);
3204
pthread_cond_broadcast(&rli->data_cond);
3205
flush_relay_log_info(rli);
3211
Log_event::enum_skip_reason
3212
Rotate_log_event::do_shall_skip(Relay_log_info *rli)
3214
enum_skip_reason reason= Log_event::do_shall_skip(rli);
3217
case Log_event::EVENT_SKIP_NOT:
3218
case Log_event::EVENT_SKIP_COUNT:
3219
return Log_event::EVENT_SKIP_NOT;
3221
case Log_event::EVENT_SKIP_IGNORE:
3222
return Log_event::EVENT_SKIP_IGNORE;
3225
return Log_event::EVENT_SKIP_NOT; // To keep compiler happy
3229
/**************************************************************************
3230
Intvar_log_event methods
3231
**************************************************************************/
3234
Intvar_log_event::pack_info()
3237
void Intvar_log_event::pack_info(Protocol *protocol)
3239
char buf[256], *pos;
3240
pos= strmake(buf, get_var_type_name(), sizeof(buf)-23);
3242
pos= int64_t10_to_str(val, pos, -10);
3243
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3248
Intvar_log_event::Intvar_log_event()
3251
Intvar_log_event::Intvar_log_event(const char* buf,
3252
const Format_description_log_event* description_event)
3253
:Log_event(buf, description_event)
3255
buf+= description_event->common_header_len;
3256
type= buf[I_TYPE_OFFSET];
3257
val= uint8korr(buf+I_VAL_OFFSET);
3262
Intvar_log_event::get_var_type_name()
3265
const char* Intvar_log_event::get_var_type_name()
3268
case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
3269
case INSERT_ID_EVENT: return "INSERT_ID";
3270
default: /* impossible */ return "UNKNOWN";
3276
Intvar_log_event::write()
3279
bool Intvar_log_event::write(IO_CACHE* file)
3281
unsigned char buf[9];
3282
buf[I_TYPE_OFFSET]= (unsigned char) type;
3283
int8store(buf + I_VAL_OFFSET, val);
3284
return (write_header(file, sizeof(buf)) ||
3285
my_b_safe_write(file, buf, sizeof(buf)));
3290
Intvar_log_event::print()
3294
Intvar_log_event::do_apply_event()
3297
int Intvar_log_event::do_apply_event(Relay_log_info const *rli)
3300
We are now in a statement until the associated query log event has
3303
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3306
case LAST_INSERT_ID_EVENT:
3307
thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1;
3308
thd->first_successful_insert_id_in_prev_stmt= val;
3310
case INSERT_ID_EVENT:
3311
thd->force_one_auto_inc_interval(val);
3317
int Intvar_log_event::do_update_pos(Relay_log_info *rli)
3319
rli->inc_event_relay_log_pos();
3324
Log_event::enum_skip_reason
3325
Intvar_log_event::do_shall_skip(Relay_log_info *rli)
3328
It is a common error to set the slave skip counter to 1 instead of
3329
2 when recovering from an insert which used a auto increment,
3330
rand, or user var. Therefore, if the slave skip counter is 1, we
3331
just say that this event should be skipped by ignoring it, meaning
3332
that we do not change the value of the slave skip counter since it
3333
will be decreased by the following insert event.
3335
return continue_group(rli);
3339
/**************************************************************************
3340
Rand_log_event methods
3341
**************************************************************************/
3343
void Rand_log_event::pack_info(Protocol *protocol)
3345
char buf1[256], *pos;
3346
pos= my_stpcpy(buf1,"rand_seed1=");
3347
pos= int10_to_str((long) seed1, pos, 10);
3348
pos= my_stpcpy(pos, ",rand_seed2=");
3349
pos= int10_to_str((long) seed2, pos, 10);
3350
protocol->store(buf1, (uint) (pos-buf1), &my_charset_bin);
3354
Rand_log_event::Rand_log_event(const char* buf,
3355
const Format_description_log_event* description_event)
3356
:Log_event(buf, description_event)
3358
buf+= description_event->common_header_len;
3359
seed1= uint8korr(buf+RAND_SEED1_OFFSET);
3360
seed2= uint8korr(buf+RAND_SEED2_OFFSET);
3364
bool Rand_log_event::write(IO_CACHE* file)
3366
unsigned char buf[16];
3367
int8store(buf + RAND_SEED1_OFFSET, seed1);
3368
int8store(buf + RAND_SEED2_OFFSET, seed2);
3369
return (write_header(file, sizeof(buf)) ||
3370
my_b_safe_write(file, buf, sizeof(buf)));
3374
int Rand_log_event::do_apply_event(Relay_log_info const *rli)
3377
We are now in a statement until the associated query log event has
3380
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3382
thd->rand.seed1= (ulong) seed1;
3383
thd->rand.seed2= (ulong) seed2;
3387
int Rand_log_event::do_update_pos(Relay_log_info *rli)
3389
rli->inc_event_relay_log_pos();
3394
Log_event::enum_skip_reason
3395
Rand_log_event::do_shall_skip(Relay_log_info *rli)
3398
It is a common error to set the slave skip counter to 1 instead of
3399
2 when recovering from an insert which used a auto increment,
3400
rand, or user var. Therefore, if the slave skip counter is 1, we
3401
just say that this event should be skipped by ignoring it, meaning
3402
that we do not change the value of the slave skip counter since it
3403
will be decreased by the following insert event.
3405
return continue_group(rli);
3409
/**************************************************************************
3410
Xid_log_event methods
3411
**************************************************************************/
3413
void Xid_log_event::pack_info(Protocol *protocol)
3415
char buf[128], *pos;
3416
pos= my_stpcpy(buf, "COMMIT /* xid=");
3417
pos= int64_t10_to_str(xid, pos, 10);
3418
pos= my_stpcpy(pos, " */");
3419
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3424
It's ok not to use int8store here,
3425
as long as xid_t::set(uint64_t) and
3426
xid_t::get_my_xid doesn't do it either.
3427
We don't care about actual values of xids as long as
3428
identical numbers compare identically
3432
Xid_log_event(const char* buf,
3433
const Format_description_log_event *description_event)
3434
:Log_event(buf, description_event)
3436
buf+= description_event->common_header_len;
3437
memcpy(&xid, buf, sizeof(xid));
3441
bool Xid_log_event::write(IO_CACHE* file)
3443
return write_header(file, sizeof(xid)) ||
3444
my_b_safe_write(file, (unsigned char*) &xid, sizeof(xid));
3448
int Xid_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
3450
return end_trans(thd, COMMIT);
3453
Log_event::enum_skip_reason
3454
Xid_log_event::do_shall_skip(Relay_log_info *rli)
3456
if (rli->slave_skip_counter > 0) {
3457
thd->options&= ~OPTION_BEGIN;
3458
return(Log_event::EVENT_SKIP_COUNT);
3460
return(Log_event::do_shall_skip(rli));
3464
/**************************************************************************
3465
User_var_log_event methods
3466
**************************************************************************/
3468
void User_var_log_event::pack_info(Protocol* protocol)
3471
uint32_t val_offset= 4 + name_len;
3472
uint32_t event_len= val_offset;
3476
if (!(buf= (char*) my_malloc(val_offset + 5, MYF(MY_WME))))
3478
my_stpcpy(buf + val_offset, "NULL");
3479
event_len= val_offset + 4;
3486
float8get(real_val, val);
3487
if (!(buf= (char*) my_malloc(val_offset + MY_GCVT_MAX_FIELD_WIDTH + 1,
3490
event_len+= my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, MY_GCVT_MAX_FIELD_WIDTH,
3491
buf + val_offset, NULL);
3494
if (!(buf= (char*) my_malloc(val_offset + 22, MYF(MY_WME))))
3496
event_len= int64_t10_to_str(uint8korr(val), buf + val_offset,-10)-buf;
3498
case DECIMAL_RESULT:
3500
if (!(buf= (char*) my_malloc(val_offset + DECIMAL_MAX_STR_LENGTH,
3503
String str(buf+val_offset, DECIMAL_MAX_STR_LENGTH, &my_charset_bin);
3505
binary2my_decimal(E_DEC_FATAL_ERROR, (unsigned char*) (val+2), &dec, val[0],
3507
my_decimal2string(E_DEC_FATAL_ERROR, &dec, 0, 0, 0, &str);
3508
event_len= str.length() + val_offset;
3512
/* 15 is for 'COLLATE' and other chars */
3513
buf= (char*) my_malloc(event_len+val_len*2+1+2*MY_CS_NAME_SIZE+15,
3515
const CHARSET_INFO *cs;
3518
if (!(cs= get_charset(charset_number, MYF(0))))
3520
my_stpcpy(buf+val_offset, "???");
3525
char *p= strxmov(buf + val_offset, "_", cs->csname, " ", NULL);
3526
p= str_to_hex(p, val, val_len);
3527
p= strxmov(p, " COLLATE ", cs->name, NULL);
3539
memcpy(buf+2, name, name_len);
3540
buf[2+name_len]= '`';
3541
buf[3+name_len]= '=';
3542
protocol->store(buf, event_len, &my_charset_bin);
3547
User_var_log_event::
3548
User_var_log_event(const char* buf,
3549
const Format_description_log_event* description_event)
3550
:Log_event(buf, description_event)
3552
buf+= description_event->common_header_len;
3553
name_len= uint4korr(buf);
3554
name= (char *) buf + UV_NAME_LEN_SIZE;
3555
buf+= UV_NAME_LEN_SIZE + name_len;
3556
is_null= (bool) *buf;
3559
type= STRING_RESULT;
3560
charset_number= my_charset_bin.number;
3566
type= (Item_result) buf[UV_VAL_IS_NULL];
3567
charset_number= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE);
3568
val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3569
UV_CHARSET_NUMBER_SIZE);
3570
val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3571
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE);
3576
bool User_var_log_event::write(IO_CACHE* file)
3578
char buf[UV_NAME_LEN_SIZE];
3579
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3580
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
3581
unsigned char buf2[(8 > DECIMAL_MAX_FIELD_SIZE + 2) ? 8 : DECIMAL_MAX_FIELD_SIZE +2], *pos= buf2;
3582
uint32_t buf1_length;
3585
int4store(buf, name_len);
3587
if ((buf1[0]= is_null))
3590
val_len= 0; // Length of 'pos'
3595
int4store(buf1 + 2, charset_number);
3599
float8store(buf2, *(double*) val);
3602
int8store(buf2, *(int64_t*) val);
3604
case DECIMAL_RESULT:
3606
my_decimal *dec= (my_decimal *)val;
3607
dec->fix_buffer_pointer();
3608
buf2[0]= (char)(dec->intg + dec->frac);
3609
buf2[1]= (char)dec->frac;
3610
decimal2bin((decimal_t*)val, buf2+2, buf2[0], buf2[1]);
3611
val_len= decimal_bin_size(buf2[0], buf2[1]) + 2;
3615
pos= (unsigned char*) val;
3622
int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
3626
/* Length of the whole event */
3627
event_length= sizeof(buf)+ name_len + buf1_length + val_len;
3629
return (write_header(file, event_length) ||
3630
my_b_safe_write(file, (unsigned char*) buf, sizeof(buf)) ||
3631
my_b_safe_write(file, (unsigned char*) name, name_len) ||
3632
my_b_safe_write(file, (unsigned char*) buf1, buf1_length) ||
3633
my_b_safe_write(file, pos, val_len));
3639
User_var_log_event::do_apply_event()
3642
int User_var_log_event::do_apply_event(Relay_log_info const *rli)
3645
const CHARSET_INFO *charset;
3646
if (!(charset= get_charset(charset_number, MYF(MY_WME))))
3648
LEX_STRING user_var_name;
3649
user_var_name.str= name;
3650
user_var_name.length= name_len;
3655
We are now in a statement until the associated query log event has
3658
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3662
it= new Item_null();
3668
float8get(real_val, val);
3669
it= new Item_float(real_val, 0);
3670
val= (char*) &real_val; // Pointer to value in native format
3674
int_val= (int64_t) uint8korr(val);
3675
it= new Item_int(int_val);
3676
val= (char*) &int_val; // Pointer to value in native format
3679
case DECIMAL_RESULT:
3681
Item_decimal *dec= new Item_decimal((unsigned char*) val+2, val[0], val[1]);
3683
val= (char *)dec->val_decimal(NULL);
3684
val_len= sizeof(my_decimal);
3688
it= new Item_string(val, val_len, charset);
3696
Item_func_set_user_var e(user_var_name, it);
3698
Item_func_set_user_var can't substitute something else on its place =>
3699
0 can be passed as last argument (reference on item)
3701
e.fix_fields(thd, 0);
3703
A variable can just be considered as a table with
3704
a single record and with a single column. Thus, like
3705
a column value, it could always have IMPLICIT derivation.
3707
e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0);
3708
free_root(thd->mem_root,0);
3713
int User_var_log_event::do_update_pos(Relay_log_info *rli)
3715
rli->inc_event_relay_log_pos();
3719
Log_event::enum_skip_reason
3720
User_var_log_event::do_shall_skip(Relay_log_info *rli)
3723
It is a common error to set the slave skip counter to 1 instead
3724
of 2 when recovering from an insert which used a auto increment,
3725
rand, or user var. Therefore, if the slave skip counter is 1, we
3726
just say that this event should be skipped by ignoring it, meaning
3727
that we do not change the value of the slave skip counter since it
3728
will be decreased by the following insert event.
3730
return continue_group(rli);
3734
/**************************************************************************
3735
Slave_log_event methods
3736
**************************************************************************/
3738
void Slave_log_event::pack_info(Protocol *protocol)
3740
char buf[256+HOSTNAME_LENGTH], *pos;
3741
pos= my_stpcpy(buf, "host=");
3742
pos= my_stpncpy(pos, master_host.c_str(), HOSTNAME_LENGTH);
3743
pos= my_stpcpy(pos, ",port=");
3744
pos= int10_to_str((long) master_port, pos, 10);
3745
pos= my_stpcpy(pos, ",log=");
3746
pos= my_stpcpy(pos, master_log.c_str());
3747
pos= my_stpcpy(pos, ",pos=");
3748
pos= int64_t10_to_str(master_pos, pos, 10);
3749
protocol->store(buf, pos-buf, &my_charset_bin);
3755
re-write this better without holding both locks at the same time
3757
Slave_log_event::Slave_log_event(THD* thd_arg,
3758
Relay_log_info* rli)
3759
:Log_event(thd_arg, 0, 0) , mem_pool(0), master_host(0)
3761
if (!rli->inited) // QQ When can this happen ?
3764
Master_info* mi = rli->mi;
3765
// TODO: re-write this better without holding both locks at the same time
3766
pthread_mutex_lock(&mi->data_lock);
3767
pthread_mutex_lock(&rli->data_lock);
3768
// on OOM, just do not initialize the structure and print the error
3769
if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
3772
master_host.assign(mi->getHostname());
3773
master_log.assign(rli->group_master_log_name);
3774
master_port = mi->getPort();
3775
master_pos = rli->group_master_log_pos;
3778
sql_print_error(_("Out of memory while recording slave event"));
3779
pthread_mutex_unlock(&rli->data_lock);
3780
pthread_mutex_unlock(&mi->data_lock);
3785
Slave_log_event::~Slave_log_event()
3791
int Slave_log_event::get_data_size()
3793
return master_host.length() + master_log.length() + 1 + SL_MASTER_HOST_OFFSET;
3797
bool Slave_log_event::write(IO_CACHE* file)
3799
ulong event_length= get_data_size();
3800
int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
3801
int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
3802
// log and host are already there
3804
return (write_header(file, event_length) ||
3805
my_b_safe_write(file, (unsigned char*) mem_pool, event_length));
3809
void Slave_log_event::init_from_mem_pool()
3811
master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
3812
master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
3814
/* Assign these correctly */
3815
master_host.assign(mem_pool + SL_MASTER_HOST_OFFSET);
3816
master_log.assign();
3821
int Slave_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
3823
if (mysql_bin_log.is_open())
3824
mysql_bin_log.write(this);
3829
/**************************************************************************
3830
Stop_log_event methods
3831
**************************************************************************/
3834
The master stopped. We used to clean up all temporary tables but
3835
this is useless as, as the master has shut down properly, it has
3836
written all DROP TEMPORARY Table (prepared statements' deletion is
3837
TODO only when we binlog prep stmts). We used to clean up
3838
slave_load_tmpdir, but this is useless as it has been cleared at the
3839
end of LOAD DATA INFILE. So we have nothing to do here. The place
3840
were we must do this cleaning is in
3841
Start_log_event_v3::do_apply_event(), not here. Because if we come
3842
here, the master was sane.
3844
int Stop_log_event::do_update_pos(Relay_log_info *rli)
3847
We do not want to update master_log pos because we get a rotate event
3848
before stop, so by now group_master_log_name is set to the next log.
3849
If we updated it, we will have incorrect master coordinates and this
3850
could give false triggers in MASTER_POS_WAIT() that we have reached
3851
the target position when in fact we have not.
3853
if (thd->options & OPTION_BEGIN)
3854
rli->inc_event_relay_log_pos();
3857
rli->inc_group_relay_log_pos(0);
3858
flush_relay_log_info(rli);
3864
/**************************************************************************
3865
Create_file_log_event methods
3866
**************************************************************************/
3869
Create_file_log_event ctor
3872
Create_file_log_event::
3873
Create_file_log_event(THD* thd_arg, sql_exchange* ex,
3874
const char* db_arg, const char* table_name_arg,
3875
List<Item>& fields_arg, enum enum_duplicates handle_dup,
3877
unsigned char* block_arg, uint32_t block_len_arg, bool using_trans)
3878
:Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
3880
fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
3881
file_id(thd_arg->file_id = mysql_bin_log.next_file_id())
3883
sql_ex.force_new_format();
3889
Create_file_log_event::write_data_body()
3892
bool Create_file_log_event::write_data_body(IO_CACHE* file)
3895
if ((res= Load_log_event::write_data_body(file)) || fake_base)
3897
return (my_b_safe_write(file, (unsigned char*) "", 1) ||
3898
my_b_safe_write(file, (unsigned char*) block, block_len));
3903
Create_file_log_event::write_data_header()
3906
bool Create_file_log_event::write_data_header(IO_CACHE* file)
3909
unsigned char buf[CREATE_FILE_HEADER_LEN];
3910
if ((res= Load_log_event::write_data_header(file)) || fake_base)
3912
int4store(buf + CF_FILE_ID_OFFSET, file_id);
3913
return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
3918
Create_file_log_event::write_base()
3921
bool Create_file_log_event::write_base(IO_CACHE* file)
3924
fake_base= 1; // pretend we are Load event
3931
Create_file_log_event ctor
3934
Create_file_log_event::Create_file_log_event(const char* buf, uint32_t len,
3935
const Format_description_log_event* description_event)
3936
:Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0)
3938
uint32_t block_offset;
3939
uint32_t header_len= description_event->common_header_len;
3940
uint8_t load_header_len= description_event->post_header_len[LOAD_EVENT-1];
3941
uint8_t create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
3942
if (!(event_buf= (char*) my_memdup(buf, len, MYF(MY_WME))) ||
3943
copy_log_event(event_buf,len,
3944
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
3945
load_header_len + header_len :
3946
(fake_base ? (header_len+load_header_len) :
3947
(header_len+load_header_len) +
3948
create_file_header_len)),
3951
if (description_event->binlog_version!=1)
3953
file_id= uint4korr(buf +
3955
load_header_len + CF_FILE_ID_OFFSET);
3957
Note that it's ok to use get_data_size() below, because it is computed
3958
with values we have already read from this event (because we called
3959
copy_log_event()); we are not using slave's format info to decode
3960
master's format, we are really using master's format info.
3961
Anyway, both formats should be identical (except the common_header_len)
3962
as these Load events are not changed between 4.0 and 5.0 (as logging of
3963
LOAD DATA INFILE does not use Load_log_event in 5.0).
3965
The + 1 is for \0 terminating fname
3967
block_offset= (description_event->common_header_len +
3968
Load_log_event::get_data_size() +
3969
create_file_header_len + 1);
3970
if (len < block_offset)
3972
block = (unsigned char*)buf + block_offset;
3973
block_len = len - block_offset;
3977
sql_ex.force_new_format();
3978
inited_from_old = 1;
3985
Create_file_log_event::pack_info()
3988
void Create_file_log_event::pack_info(Protocol *protocol)
3990
char buf[NAME_LEN*2 + 30 + 21*2], *pos;
3991
pos= my_stpcpy(buf, "db=");
3992
memcpy(pos, db, db_len);
3993
pos= my_stpcpy(pos + db_len, ";table=");
3994
memcpy(pos, table_name, table_name_len);
3995
pos= my_stpcpy(pos + table_name_len, ";file_id=");
3996
pos= int10_to_str((long) file_id, pos, 10);
3997
pos= my_stpcpy(pos, ";block_len=");
3998
pos= int10_to_str((long) block_len, pos, 10);
3999
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
4004
Create_file_log_event::do_apply_event()
4007
int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
4009
char proc_info[17+FN_REFLEN+10], *fname_buf;
4015
memset(&file, 0, sizeof(file));
4016
fname_buf= my_stpcpy(proc_info, "Making temp file ");
4017
ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
4018
thd->set_proc_info(proc_info);
4019
my_delete(fname_buf, MYF(0)); // old copy may exist already
4020
if ((fd= my_create(fname_buf, CREATE_MODE,
4022
MYF(MY_WME))) < 0 ||
4023
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
4024
MYF(MY_WME|MY_NABP)))
4026
rli->report(ERROR_LEVEL, my_errno,
4027
_("Error in Create_file event: could not open file '%s'"),
4032
// a trick to avoid allocating another buffer
4034
fname_len= (uint) (my_stpcpy(ext, ".data") - fname);
4035
if (write_base(&file))
4037
my_stpcpy(ext, ".info"); // to have it right in the error message
4038
rli->report(ERROR_LEVEL, my_errno,
4039
_("Error in Create_file event: could not write to file '%s'"),
4043
end_io_cache(&file);
4044
my_close(fd, MYF(0));
4046
// fname_buf now already has .data, not .info, because we did our trick
4047
my_delete(fname_buf, MYF(0)); // old copy may exist already
4048
if ((fd= my_create(fname_buf, CREATE_MODE,
4052
rli->report(ERROR_LEVEL, my_errno,
4053
_("Error in Create_file event: could not open file '%s'"),
4057
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
4059
rli->report(ERROR_LEVEL, my_errno,
4060
_("Error in Create_file event: write to '%s' failed"),
4064
error=0; // Everything is ok
4068
end_io_cache(&file);
4070
my_close(fd, MYF(0));
4071
thd->set_proc_info(0);
4076
/**************************************************************************
4077
Append_block_log_event methods
4078
**************************************************************************/
4081
Append_block_log_event ctor
4084
Append_block_log_event::Append_block_log_event(THD *thd_arg,
4086
unsigned char *block_arg,
4087
uint32_t block_len_arg,
4089
:Log_event(thd_arg,0, using_trans), block(block_arg),
4090
block_len(block_len_arg), file_id(thd_arg->file_id), db(db_arg)
4096
Append_block_log_event ctor
4099
Append_block_log_event::Append_block_log_event(const char* buf, uint32_t len,
4100
const Format_description_log_event* description_event)
4101
:Log_event(buf, description_event),block(0)
4103
uint8_t common_header_len= description_event->common_header_len;
4104
uint8_t append_block_header_len=
4105
description_event->post_header_len[APPEND_BLOCK_EVENT-1];
4106
uint32_t total_header_len= common_header_len+append_block_header_len;
4107
if (len < total_header_len)
4109
file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
4110
block= (unsigned char*)buf + total_header_len;
4111
block_len= len - total_header_len;
4117
Append_block_log_event::write()
4120
bool Append_block_log_event::write(IO_CACHE* file)
4122
unsigned char buf[APPEND_BLOCK_HEADER_LEN];
4123
int4store(buf + AB_FILE_ID_OFFSET, file_id);
4124
return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
4125
my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
4126
my_b_safe_write(file, (unsigned char*) block, block_len));
4131
Append_block_log_event::pack_info()
4134
void Append_block_log_event::pack_info(Protocol *protocol)
4138
length= (uint) sprintf(buf, ";file_id=%u;block_len=%u", file_id,
4140
protocol->store(buf, length, &my_charset_bin);
4145
Append_block_log_event::get_create_or_append()
4148
int Append_block_log_event::get_create_or_append() const
4150
return 0; /* append to the file, fail if not exists */
4154
Append_block_log_event::do_apply_event()
4157
int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
4159
char proc_info[17+FN_REFLEN+10], *fname= proc_info+17;
4163
fname= my_stpcpy(proc_info, "Making temp file ");
4164
slave_load_file_stem(fname, file_id, server_id, ".data");
4165
thd->set_proc_info(proc_info);
4166
if (get_create_or_append())
4168
my_delete(fname, MYF(0)); // old copy may exist already
4169
if ((fd= my_create(fname, CREATE_MODE,
4173
rli->report(ERROR_LEVEL, my_errno,
4174
_("Error in %s event: could not create file '%s'"),
4175
get_type_str(), fname);
4179
else if ((fd = my_open(fname, O_WRONLY | O_APPEND,
4182
rli->report(ERROR_LEVEL, my_errno,
4183
_("Error in %s event: could not open file '%s'"),
4184
get_type_str(), fname);
4187
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
4189
rli->report(ERROR_LEVEL, my_errno,
4190
_("Error in %s event: write to '%s' failed"),
4191
get_type_str(), fname);
4198
my_close(fd, MYF(0));
4199
thd->set_proc_info(0);
4204
/**************************************************************************
4205
Delete_file_log_event methods
4206
**************************************************************************/
4209
Delete_file_log_event ctor
4212
Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg,
4214
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
4219
Delete_file_log_event ctor
4222
Delete_file_log_event::Delete_file_log_event(const char* buf, uint32_t len,
4223
const Format_description_log_event* description_event)
4224
:Log_event(buf, description_event),file_id(0)
4226
uint8_t common_header_len= description_event->common_header_len;
4227
uint8_t delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1];
4228
if (len < (uint)(common_header_len + delete_file_header_len))
4230
file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET);
4235
Delete_file_log_event::write()
4238
bool Delete_file_log_event::write(IO_CACHE* file)
4240
unsigned char buf[DELETE_FILE_HEADER_LEN];
4241
int4store(buf + DF_FILE_ID_OFFSET, file_id);
4242
return (write_header(file, sizeof(buf)) ||
4243
my_b_safe_write(file, buf, sizeof(buf)));
4248
Delete_file_log_event::pack_info()
4251
void Delete_file_log_event::pack_info(Protocol *protocol)
4255
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
4256
protocol->store(buf, (int32_t) length, &my_charset_bin);
4260
Delete_file_log_event::do_apply_event()
4263
int Delete_file_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
4265
char fname[FN_REFLEN+10];
4266
char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
4267
(void) my_delete(fname, MYF(MY_WME));
4268
my_stpcpy(ext, ".info");
4269
(void) my_delete(fname, MYF(MY_WME));
4274
/**************************************************************************
4275
Execute_load_log_event methods
4276
**************************************************************************/
4279
Execute_load_log_event ctor
4282
Execute_load_log_event::Execute_load_log_event(THD *thd_arg,
4285
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
4291
Execute_load_log_event ctor
4294
Execute_load_log_event::Execute_load_log_event(const char* buf, uint32_t len,
4295
const Format_description_log_event* description_event)
4296
:Log_event(buf, description_event), file_id(0)
4298
uint8_t common_header_len= description_event->common_header_len;
4299
uint8_t exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1];
4300
if (len < (uint)(common_header_len+exec_load_header_len))
4302
file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET);
4307
Execute_load_log_event::write()
4310
bool Execute_load_log_event::write(IO_CACHE* file)
4312
unsigned char buf[EXEC_LOAD_HEADER_LEN];
4313
int4store(buf + EL_FILE_ID_OFFSET, file_id);
4314
return (write_header(file, sizeof(buf)) ||
4315
my_b_safe_write(file, buf, sizeof(buf)));
4320
Execute_load_log_event::pack_info()
4323
void Execute_load_log_event::pack_info(Protocol *protocol)
4327
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
4328
protocol->store(buf, (int32_t) length, &my_charset_bin);
4333
Execute_load_log_event::do_apply_event()
4336
int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
4338
char fname[FN_REFLEN+10];
4343
Load_log_event *lev= 0;
4345
ext= slave_load_file_stem(fname, file_id, server_id, ".info");
4346
if ((fd = my_open(fname, O_RDONLY,
4347
MYF(MY_WME))) < 0 ||
4348
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
4349
MYF(MY_WME|MY_NABP)))
4351
rli->report(ERROR_LEVEL, my_errno,
4352
_("Error in Exec_load event: could not open file '%s'"),
4356
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
4357
(pthread_mutex_t*)0,
4358
rli->relay_log.description_event_for_exec)) ||
4359
lev->get_type_code() != NEW_LOAD_EVENT)
4361
rli->report(ERROR_LEVEL, 0,
4362
_("Error in Exec_load event: "
4363
"file '%s' appears corrupted"),
4370
lev->do_apply_event should use rli only for errors i.e. should
4371
not advance rli's position.
4373
lev->do_apply_event is the place where the table is loaded (it
4374
calls mysql_load()).
4377
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
4378
if (lev->do_apply_event(0,rli,1))
4381
We want to indicate the name of the file that could not be loaded
4383
But as we are here we are sure the error is in rli->last_slave_error and
4384
rli->last_slave_errno (example of error: duplicate entry for key), so we
4385
don't want to overwrite it with the filename.
4386
What we want instead is add the filename to the current error message.
4388
char *tmp= my_strdup(rli->last_error().message, MYF(MY_WME));
4391
rli->report(ERROR_LEVEL, rli->last_error().number,
4392
_("%s. Failed executing load from '%s'"),
4399
We have an open file descriptor to the .info file; we need to close it
4400
or Windows will refuse to delete the file in my_delete().
4404
my_close(fd, MYF(0));
4405
end_io_cache(&file);
4408
(void) my_delete(fname, MYF(MY_WME));
4409
memcpy(ext, ".data", 6);
4410
(void) my_delete(fname, MYF(MY_WME));
4417
my_close(fd, MYF(0));
4418
end_io_cache(&file);
4424
/**************************************************************************
4425
Begin_load_query_log_event methods
4426
**************************************************************************/
4428
Begin_load_query_log_event::
4429
Begin_load_query_log_event(THD* thd_arg, const char* db_arg, unsigned char* block_arg,
4430
uint32_t block_len_arg, bool using_trans)
4431
:Append_block_log_event(thd_arg, db_arg, block_arg, block_len_arg,
4434
file_id= thd_arg->file_id= mysql_bin_log.next_file_id();
4438
Begin_load_query_log_event::
4439
Begin_load_query_log_event(const char* buf, uint32_t len,
4440
const Format_description_log_event* desc_event)
4441
:Append_block_log_event(buf, len, desc_event)
4446
int Begin_load_query_log_event::get_create_or_append() const
4448
return 1; /* create the file */
4452
Log_event::enum_skip_reason
4453
Begin_load_query_log_event::do_shall_skip(Relay_log_info *rli)
4456
If the slave skip counter is 1, then we should not start executing
4459
return continue_group(rli);
4463
/**************************************************************************
4464
Execute_load_query_log_event methods
4465
**************************************************************************/
4468
Execute_load_query_log_event::
4469
Execute_load_query_log_event(THD *thd_arg, const char* query_arg,
4470
ulong query_length_arg, uint32_t fn_pos_start_arg,
4471
uint32_t fn_pos_end_arg,
4472
enum_load_dup_handling dup_handling_arg,
4473
bool using_trans, bool suppress_use,
4474
THD::killed_state killed_err_arg):
4475
Query_log_event(thd_arg, query_arg, query_length_arg, using_trans,
4476
suppress_use, killed_err_arg),
4477
file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg),
4478
fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
4483
Execute_load_query_log_event::
4484
Execute_load_query_log_event(const char* buf, uint32_t event_len,
4485
const Format_description_log_event* desc_event):
4486
Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT),
4487
file_id(0), fn_pos_start(0), fn_pos_end(0)
4489
if (!Query_log_event::is_valid())
4492
buf+= desc_event->common_header_len;
4494
fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET);
4495
fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET);
4496
dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET));
4498
if (fn_pos_start > q_len || fn_pos_end > q_len ||
4499
dup_handling > LOAD_DUP_REPLACE)
4502
file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET);
4506
ulong Execute_load_query_log_event::get_post_header_size_for_derived()
4508
return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN;
4513
Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
4515
unsigned char buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
4516
int4store(buf, file_id);
4517
int4store(buf + 4, fn_pos_start);
4518
int4store(buf + 4 + 4, fn_pos_end);
4519
*(buf + 4 + 4 + 4)= (unsigned char) dup_handling;
4520
return my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
4524
void Execute_load_query_log_event::pack_info(Protocol *protocol)
4527
if (!(buf= (char*) my_malloc(9 + db_len + q_len + 10 + 21, MYF(MY_WME))))
4532
pos= my_stpcpy(buf, "use `");
4533
memcpy(pos, db, db_len);
4534
pos= my_stpcpy(pos+db_len, "`; ");
4538
memcpy(pos, query, q_len);
4541
pos= my_stpcpy(pos, " ;file_id=");
4542
pos= int10_to_str((long) file_id, pos, 10);
4543
protocol->store(buf, pos-buf, &my_charset_bin);
4549
Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli)
4557
buf= (char*) my_malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
4558
(FN_REFLEN + 10) + 10 + 8 + 5, MYF(MY_WME));
4560
/* Replace filename and LOCAL keyword in query before executing it */
4563
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
4564
ER(ER_SLAVE_FATAL_ERROR),
4565
_("Not enough memory"));
4570
memcpy(p, query, fn_pos_start);
4572
fname= (p= strmake(p, STRING_WITH_LEN(" INFILE \'")));
4573
p= slave_load_file_stem(p, file_id, server_id, ".data");
4574
fname_end= p= strchr(p, '\0'); // Safer than p=p+5
4576
switch (dup_handling) {
4577
case LOAD_DUP_IGNORE:
4578
p= strmake(p, STRING_WITH_LEN(" IGNORE"));
4580
case LOAD_DUP_REPLACE:
4581
p= strmake(p, STRING_WITH_LEN(" REPLACE"));
4584
/* Ordinary load data */
4587
p= strmake(p, STRING_WITH_LEN(" INTO"));
4588
p= strmake(p, query+fn_pos_end, q_len-fn_pos_end);
4590
error= Query_log_event::do_apply_event(rli, buf, p-buf);
4592
/* Forging file name for deletion in same buffer */
4596
If there was an error the slave is going to stop, leave the
4597
file so that we can re-execute this event at START SLAVE.
4600
(void) my_delete(fname, MYF(MY_WME));
4607
/**************************************************************************
4609
**************************************************************************/
4612
sql_ex_info::write_data()
4615
bool sql_ex_info::write_data(IO_CACHE* file)
4619
return (write_str(file, field_term, (uint) field_term_len) ||
4620
write_str(file, enclosed, (uint) enclosed_len) ||
4621
write_str(file, line_term, (uint) line_term_len) ||
4622
write_str(file, line_start, (uint) line_start_len) ||
4623
write_str(file, escaped, (uint) escaped_len) ||
4624
my_b_safe_write(file,(unsigned char*) &opt_flags,1));
4629
@todo This is sensitive to field padding. We should write a
4630
char[7], not an old_sql_ex. /sven
4633
old_ex.field_term= *field_term;
4634
old_ex.enclosed= *enclosed;
4635
old_ex.line_term= *line_term;
4636
old_ex.line_start= *line_start;
4637
old_ex.escaped= *escaped;
4638
old_ex.opt_flags= opt_flags;
4639
old_ex.empty_flags=empty_flags;
4640
return my_b_safe_write(file, (unsigned char*) &old_ex, sizeof(old_ex)) != 0;
4649
const char *sql_ex_info::init(const char *buf, const char *buf_end,
4650
bool use_new_format)
4652
cached_new_format = use_new_format;
4657
The code below assumes that buf will not disappear from
4658
under our feet during the lifetime of the event. This assumption
4659
holds true in the slave thread if the log is in new format, but is not
4660
the case when we have old format because we will be reusing net buffer
4661
to read the actual file before we write out the Create_file event.
4663
if (read_str(&buf, buf_end, &field_term, &field_term_len) ||
4664
read_str(&buf, buf_end, &enclosed, &enclosed_len) ||
4665
read_str(&buf, buf_end, &line_term, &line_term_len) ||
4666
read_str(&buf, buf_end, &line_start, &line_start_len) ||
4667
read_str(&buf, buf_end, &escaped, &escaped_len))
4673
field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
4674
field_term = buf++; // Use first byte in string
4680
empty_flags= *buf++;
4681
if (empty_flags & FIELD_TERM_EMPTY)
4683
if (empty_flags & ENCLOSED_EMPTY)
4685
if (empty_flags & LINE_TERM_EMPTY)
4687
if (empty_flags & LINE_START_EMPTY)
4689
if (empty_flags & ESCAPED_EMPTY)
4696
/**************************************************************************
4697
Rows_log_event member functions
4698
**************************************************************************/
4700
Rows_log_event::Rows_log_event(THD *thd_arg, Table *tbl_arg, ulong tid,
4701
MY_BITMAP const *cols, bool is_transactional)
4702
: Log_event(thd_arg, 0, is_transactional),
4706
m_width(tbl_arg ? tbl_arg->s->fields : 1),
4707
m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
4708
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4711
We allow a special form of dummy event when the table, and cols
4712
are null and the table id is UINT32_MAX. This is a temporary
4713
solution, to be able to terminate a started statement in the
4714
binary log: the extraneous events will be removed in the future.
4716
assert((tbl_arg && tbl_arg->s && tid != UINT32_MAX) || (!tbl_arg && !cols && tid == UINT32_MAX));
4718
if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
4719
set_flags(NO_FOREIGN_KEY_CHECKS_F);
4720
if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
4721
set_flags(RELAXED_UNIQUE_CHECKS_F);
4722
/* if bitmap_init fails, caught in is_valid() */
4723
if (likely(!bitmap_init(&m_cols,
4724
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4728
/* Cols can be zero if this is a dummy binrows event */
4729
if (likely(cols != NULL))
4731
memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
4732
create_last_word_mask(&m_cols);
4737
// Needed because bitmap_init() does not set it to null on failure
4743
Rows_log_event::Rows_log_event(const char *buf, uint32_t event_len,
4744
Log_event_type event_type,
4745
const Format_description_log_event
4747
: Log_event(buf, description_event),
4750
m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
4751
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4753
uint8_t const common_header_len= description_event->common_header_len;
4754
uint8_t const post_header_len= description_event->post_header_len[event_type-1];
4756
const char *post_start= buf + common_header_len;
4757
post_start+= RW_MAPID_OFFSET;
4758
if (post_header_len == 6)
4760
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
4761
m_table_id= uint4korr(post_start);
4766
m_table_id= (ulong) uint6korr(post_start);
4767
post_start+= RW_FLAGS_OFFSET;
4770
m_flags= uint2korr(post_start);
4772
unsigned char const *const var_start=
4773
(const unsigned char *)buf + common_header_len + post_header_len;
4774
unsigned char const *const ptr_width= var_start;
4775
unsigned char *ptr_after_width= (unsigned char*) ptr_width;
4776
m_width = net_field_length(&ptr_after_width);
4777
/* if bitmap_init fails, catched in is_valid() */
4778
if (likely(!bitmap_init(&m_cols,
4779
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4783
memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
4784
create_last_word_mask(&m_cols);
4785
ptr_after_width+= (m_width + 7) / 8;
4789
// Needed because bitmap_init() does not set it to null on failure
4790
m_cols.bitmap= NULL;
4794
m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
4796
if (event_type == UPDATE_ROWS_EVENT)
4798
/* if bitmap_init fails, caught in is_valid() */
4799
if (likely(!bitmap_init(&m_cols_ai,
4800
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
4804
memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
4805
create_last_word_mask(&m_cols_ai);
4806
ptr_after_width+= (m_width + 7) / 8;
4810
// Needed because bitmap_init() does not set it to null on failure
4811
m_cols_ai.bitmap= 0;
4816
const unsigned char* const ptr_rows_data= (const unsigned char*) ptr_after_width;
4818
size_t const data_size= event_len - (ptr_rows_data - (const unsigned char *) buf);
4820
m_rows_buf= (unsigned char*) my_malloc(data_size, MYF(MY_WME));
4821
if (likely((bool)m_rows_buf))
4823
m_curr_row= m_rows_buf;
4824
m_rows_end= m_rows_buf + data_size;
4825
m_rows_cur= m_rows_end;
4826
memcpy(m_rows_buf, ptr_rows_data, data_size);
4829
m_cols.bitmap= 0; // to not free it
4834
Rows_log_event::~Rows_log_event()
4836
if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
4837
m_cols.bitmap= 0; // so no free in bitmap_free
4838
bitmap_free(&m_cols); // To pair with bitmap_init().
4839
free((unsigned char*)m_rows_buf);
4842
int Rows_log_event::get_data_size()
4844
int const type_code= get_type_code();
4846
unsigned char buf[sizeof(m_width)+1];
4847
unsigned char *end= net_store_length(buf, (m_width + 7) / 8);
4849
int data_size= ROWS_HEADER_LEN;
4850
data_size+= no_bytes_in_map(&m_cols);
4851
data_size+= end - buf;
4853
if (type_code == UPDATE_ROWS_EVENT)
4854
data_size+= no_bytes_in_map(&m_cols_ai);
4856
data_size+= (m_rows_cur - m_rows_buf);
4861
int Rows_log_event::do_add_row_data(unsigned char *row_data, size_t length)
4864
When the table has a primary key, we would probably want, by default, to
4865
log only the primary key value instead of the entire "before image". This
4866
would save binlog space. TODO
4870
If length is zero, there is nothing to write, so we just
4871
return. Note that this is not an optimization, since calling
4872
realloc() with size 0 means free().
4880
assert(m_rows_buf <= m_rows_cur);
4881
assert(!m_rows_buf || (m_rows_end && m_rows_buf <= m_rows_end));
4882
assert(m_rows_cur <= m_rows_end);
4884
/* The cast will always work since m_rows_cur <= m_rows_end */
4885
if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
4887
size_t const block_size= 1024;
4888
my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
4889
my_ptrdiff_t const new_alloc=
4890
block_size * ((cur_size + length + block_size - 1) / block_size);
4892
unsigned char* const new_buf= (unsigned char*)my_realloc((unsigned char*)m_rows_buf, (uint) new_alloc,
4893
MYF(MY_ALLOW_ZERO_PTR|MY_WME));
4894
if (unlikely(!new_buf))
4895
return(HA_ERR_OUT_OF_MEM);
4897
/* If the memory moved, we need to move the pointers */
4898
if (new_buf != m_rows_buf)
4900
m_rows_buf= new_buf;
4901
m_rows_cur= m_rows_buf + cur_size;
4905
The end pointer should always be changed to point to the end of
4906
the allocated memory.
4908
m_rows_end= m_rows_buf + new_alloc;
4911
assert(m_rows_cur + length <= m_rows_end);
4912
memcpy(m_rows_cur, row_data, length);
4913
m_rows_cur+= length;
4918
int Rows_log_event::do_apply_event(Relay_log_info const *rli)
4922
If m_table_id == UINT32_MAX, then we have a dummy event that does not
4923
contain any data. In that case, we just remove all tables in the
4924
tables_to_lock list, close the thread tables, and return with
4927
if (m_table_id == UINT32_MAX)
4930
This one is supposed to be set: just an extra check so that
4931
nothing strange has happened.
4933
assert(get_flags(STMT_END_F));
4935
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4936
close_thread_tables(thd);
4942
'thd' has been set by exec_relay_log_event(), just before calling
4943
do_apply_event(). We still check here to prevent future coding
4946
assert(rli->sql_thd == thd);
4949
If there is no locks taken, this is the first binrow event seen
4950
after the table map events. We should then lock all the tables
4951
used in the transaction and proceed with execution of the actual
4956
bool need_reopen= 1; /* To execute the first lap of the loop below */
4959
lock_tables() reads the contents of thd->lex, so they must be
4960
initialized. Contrary to in
4961
Table_map_log_event::do_apply_event() we don't call
4962
mysql_init_query() as that may reset the binlog format.
4967
There are a few flags that are replicated with each row event.
4968
Make sure to set/clear them before executing the main body of
4971
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
4972
thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
4974
thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
4976
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
4977
thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
4979
thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
4980
/* A small test to verify that objects have consistent types */
4981
assert(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
4984
while ((error= lock_tables(thd, rli->tables_to_lock,
4985
rli->tables_to_lock_count, &need_reopen)))
4989
if (thd->is_slave_error || thd->is_fatal_error)
4992
Error reporting borrowed from Query_log_event with many excessive
4993
simplifications (we don't honour --slave-skip-errors)
4995
uint32_t actual_error= thd->main_da.sql_errno();
4996
rli->report(ERROR_LEVEL, actual_error,
4997
_("Error '%s' in %s event: when locking tables"),
4999
? thd->main_da.message()
5000
: _("unexpected success or fatal error")),
5002
thd->is_fatal_error= 1;
5006
rli->report(ERROR_LEVEL, error,
5007
_("Error in %s event: when locking tables"),
5010
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5015
So we need to reopen the tables.
5017
We need to flush the pending RBR event, since it keeps a
5018
pointer to an open table.
5020
ALTERNATIVE SOLUTION (not implemented): Extract a pointer to
5021
the pending RBR event and reset the table pointer after the
5022
tables has been reopened.
5024
NOTE: For this new scheme there should be no pending event:
5025
need to add code to assert that is the case.
5027
thd->binlog_flush_pending_rows_event(false);
5028
TableList *tables= rli->tables_to_lock;
5029
close_tables_for_reopen(thd, &tables);
5031
uint32_t tables_count= rli->tables_to_lock_count;
5032
if ((error= open_tables(thd, &tables, &tables_count, 0)))
5034
if (thd->is_slave_error || thd->is_fatal_error)
5037
Error reporting borrowed from Query_log_event with many excessive
5038
simplifications (we don't honour --slave-skip-errors)
5040
uint32_t actual_error= thd->main_da.sql_errno();
5041
rli->report(ERROR_LEVEL, actual_error,
5042
_("Error '%s' on reopening tables"),
5044
? thd->main_da.message()
5045
: _("unexpected success or fatal error")));
5046
thd->is_slave_error= 1;
5048
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5054
When the open and locking succeeded, we check all tables to
5055
ensure that they still have the correct type.
5057
We can use a down cast here since we know that every table added
5058
to the tables_to_lock is a RPL_TableList.
5062
RPL_TableList *ptr= rli->tables_to_lock;
5063
for ( ; ptr ; ptr= static_cast<RPL_TableList*>(ptr->next_global))
5065
if (ptr->m_tabledef.compatible_with(rli, ptr->table))
5067
mysql_unlock_tables(thd, thd->lock);
5069
thd->is_slave_error= 1;
5070
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5071
return(ERR_BAD_TABLE_DEF);
5077
... and then we add all the tables to the table map and remove
5078
them from tables to lock.
5080
We also invalidate the query cache for all the tables, since
5081
they will now be changed.
5083
TODO [/Matz]: Maybe the query cache should not be invalidated
5084
here? It might be that a table is not changed, even though it
5085
was locked for the statement. We do know that each
5086
Rows_log_event contain at least one row, so after processing one
5087
Rows_log_event, we can invalidate the query cache for the
5090
for (TableList *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
5092
const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
5098
m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
5103
table == NULL means that this table should not be replicated
5104
(this was set up by Table_map_log_event::do_apply_event()
5105
which tested replicate-* rules).
5109
It's not needed to set_time() but
5110
1) it continues the property that "Time" in SHOW PROCESSLIST shows how
5111
much slave is behind
5112
2) it will be needed when we allow replication from a table with no
5113
TIMESTAMP column to a table with one.
5114
So we call set_time(), like in SBR. Presently it changes nothing.
5116
thd->set_time((time_t)when);
5118
There are a few flags that are replicated with each row event.
5119
Make sure to set/clear them before executing the main body of
5122
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
5123
thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
5125
thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
5127
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
5128
thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
5130
thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
5132
if (slave_allow_batching)
5133
thd->options|= OPTION_ALLOW_BATCH;
5135
thd->options&= ~OPTION_ALLOW_BATCH;
5137
/* A small test to verify that objects have consistent types */
5138
assert(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
5141
Now we are in a statement and will stay in a statement until we
5144
We set this flag here, before actually applying any rows, in
5145
case the SQL thread is stopped and we need to detect that we're
5146
inside a statement and halting abruptly might cause problems
5149
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
5151
if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
5152
set_flags(COMPLETE_ROWS_F);
5155
Set tables write and read sets.
5157
Read_set contains all slave columns (in case we are going to fetch
5158
a complete record from slave)
5160
Write_set equals the m_cols bitmap sent from master but it can be
5161
longer if slave has extra columns.
5164
bitmap_set_all(table->read_set);
5165
bitmap_set_all(table->write_set);
5166
if (!get_flags(COMPLETE_ROWS_F))
5167
bitmap_intersect(table->write_set,&m_cols);
5169
this->slave_exec_mode= slave_exec_mode_options; // fix the mode
5171
// Do event specific preparations
5172
error= do_before_row_operations(rli);
5174
// row processing loop
5176
while (error == 0 && m_curr_row < m_rows_end)
5178
/* in_use can have been set to NULL in close_tables_for_reopen */
5179
THD* old_thd= table->in_use;
5183
error= do_exec_row(rli);
5185
table->in_use = old_thd;
5191
The following list of "idempotent" errors
5192
means that an error from the list might happen
5193
because of idempotent (more than once)
5194
applying of a binlog file.
5195
Notice, that binlog has a ddl operation its
5196
second applying may cause
5198
case HA_ERR_TABLE_DEF_CHANGED:
5199
case HA_ERR_CANNOT_ADD_FOREIGN:
5201
which are not included into to the list.
5203
case HA_ERR_RECORD_CHANGED:
5204
case HA_ERR_RECORD_DELETED:
5205
case HA_ERR_KEY_NOT_FOUND:
5206
case HA_ERR_END_OF_FILE:
5207
case HA_ERR_FOUND_DUPP_KEY:
5208
case HA_ERR_FOUND_DUPP_UNIQUE:
5209
case HA_ERR_FOREIGN_DUPLICATE_KEY:
5210
case HA_ERR_NO_REFERENCED_ROW:
5211
case HA_ERR_ROW_IS_REFERENCED:
5212
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5214
if (global_system_variables.log_warnings)
5215
slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
5217
RPL_LOG_NAME, (ulong) log_pos);
5223
thd->is_slave_error= 1;
5228
If m_curr_row_end was not set during event execution (e.g., because
5229
of errors) we can't proceed to the next row. If the error is transient
5230
(i.e., error==0 at this point) we must call unpack_current_row() to set
5233
if (!m_curr_row_end && !error)
5234
unpack_current_row(rli, &m_cols);
5236
// at this moment m_curr_row_end should be set
5237
assert(error || m_curr_row_end != NULL);
5238
assert(error || m_curr_row < m_curr_row_end);
5239
assert(error || m_curr_row_end <= m_rows_end);
5241
m_curr_row= m_curr_row_end;
5243
} // row processing loop
5245
error= do_after_row_operations(rli, error);
5248
thd->options|= OPTION_KEEP_LOG;
5253
We need to delay this clear until here bacause unpack_current_row() uses
5254
master-side table definitions stored in rli.
5256
if (rli->tables_to_lock && get_flags(STMT_END_F))
5257
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5258
/* reset OPTION_ALLOW_BATCH as not affect later events */
5259
thd->options&= ~OPTION_ALLOW_BATCH;
5262
{ /* error has occured during the transaction */
5263
slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
5264
get_type_str(), RPL_LOG_NAME, (ulong) log_pos);
5269
If one day we honour --skip-slave-errors in row-based replication, and
5270
the error should be skipped, then we would clear mappings, rollback,
5271
close tables, but the slave SQL thread would not stop and then may
5272
assume the mapping is still available, the tables are still open...
5273
So then we should clear mappings/rollback/close here only if this is a
5275
For now we code, knowing that error is not skippable and so slave SQL
5276
thread is certainly going to stop.
5277
rollback at the caller along with sbr.
5279
thd->reset_current_stmt_binlog_row_based();
5280
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
5281
thd->is_slave_error= 1;
5286
This code would ideally be placed in do_update_pos() instead, but
5287
since we have no access to table there, we do the setting of
5288
last_event_start_time here instead.
5290
if (table && (table->s->primary_key == MAX_KEY) &&
5291
!cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS)
5294
------------ Temporary fix until WL#2975 is implemented ---------
5296
This event is not the last one (no STMT_END_F). If we stop now
5297
(in case of terminate_slave_thread()), how will we restart? We
5298
have to restart from Table_map_log_event, but as this table is
5299
not transactional, the rows already inserted will still be
5300
present, and idempotency is not guaranteed (no PK) so we risk
5301
that repeating leads to double insert. So we desperately try to
5302
continue, hope we'll eventually leave this buggy situation (by
5303
executing the final Rows_log_event). If we are in a hopeless
5304
wait (reached end of last relay log and nothing gets appended
5305
there), we timeout after one minute, and notify DBA about the
5306
problem. When WL#2975 is implemented, just remove the member
5307
Relay_log_info::last_event_start_time and all its occurrences.
5309
const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
5315
Log_event::enum_skip_reason
5316
Rows_log_event::do_shall_skip(Relay_log_info *rli)
5319
If the slave skip counter is 1 and this event does not end a
5320
statement, then we should not start executing on the next event.
5321
Otherwise, we defer the decision to the normal skipping logic.
5323
if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
5324
return Log_event::EVENT_SKIP_IGNORE;
5326
return Log_event::do_shall_skip(rli);
5330
Rows_log_event::do_update_pos(Relay_log_info *rli)
5334
if (get_flags(STMT_END_F))
5337
This is the end of a statement or transaction, so close (and
5338
unlock) the tables we opened when processing the
5339
Table_map_log_event starting the statement.
5341
OBSERVER. This will clear *all* mappings, not only those that
5342
are open for the table. There is not good handle for on-close
5345
NOTE. Even if we have no table ('table' == 0) we still need to be
5346
here, so that we increase the group relay log position. If we didn't, we
5347
could have a group relay log position which lags behind "forever"
5348
(assume the last master's transaction is ignored by the slave because of
5349
replicate-ignore rules).
5351
thd->binlog_flush_pending_rows_event(true);
5354
If this event is not in a transaction, the call below will, if some
5355
transactional storage engines are involved, commit the statement into
5356
them and flush the pending event to binlog.
5357
If this event is in a transaction, the call will do nothing, but a
5358
Xid_log_event will come next which will, if some transactional engines
5359
are involved, commit the transaction and flush the pending event to the
5362
error= ha_autocommit_or_rollback(thd, 0);
5365
Now what if this is not a transactional engine? we still need to
5366
flush the pending event to the binlog; we did it with
5367
thd->binlog_flush_pending_rows_event(). Note that we imitate
5368
what is done for real queries: a call to
5369
ha_autocommit_or_rollback() (sometimes only if involves a
5370
transactional engine), and a call to be sure to have the pending
5374
thd->reset_current_stmt_binlog_row_based();
5376
rli->cleanup_context(thd, 0);
5380
Indicate that a statement is finished.
5381
Step the group log position if we are not in a transaction,
5382
otherwise increase the event log position.
5384
rli->stmt_done(log_pos, when);
5387
Clear any errors pushed in thd->net.last_err* if for example "no key
5388
found" (as this is allowed). This is a safety measure; apparently
5389
those errors (e.g. when executing a Delete_rows_log_event of a
5390
non-existing row, like in rpl_row_mystery22.test,
5391
thd->net.last_error = "Can't find record in 't1'" and last_errno=1032)
5392
do not become visible. We still prefer to wipe them out.
5397
rli->report(ERROR_LEVEL, error,
5398
_("Error in %s event: commit of row events failed, "
5400
get_type_str(), m_table->s->db.str,
5401
m_table->s->table_name.str);
5405
rli->inc_event_relay_log_pos();
5411
bool Rows_log_event::write_data_header(IO_CACHE *file)
5413
unsigned char buf[ROWS_HEADER_LEN]; // No need to init the buffer
5414
assert(m_table_id != UINT32_MAX);
5415
int6store(buf + RW_MAPID_OFFSET, (uint64_t)m_table_id);
5416
int2store(buf + RW_FLAGS_OFFSET, m_flags);
5417
return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
5420
bool Rows_log_event::write_data_body(IO_CACHE*file)
5423
Note that this should be the number of *bits*, not the number of
5426
unsigned char sbuf[sizeof(m_width)];
5427
my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
5429
unsigned char *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
5430
assert(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
5432
res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
5434
res= res || my_b_safe_write(file, (unsigned char*) m_cols.bitmap,
5435
no_bytes_in_map(&m_cols));
5437
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
5439
if (get_type_code() == UPDATE_ROWS_EVENT)
5441
res= res || my_b_safe_write(file, (unsigned char*) m_cols_ai.bitmap,
5442
no_bytes_in_map(&m_cols_ai));
5444
res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
5451
void Rows_log_event::pack_info(Protocol *protocol)
5454
char const *const flagstr=
5455
get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
5456
size_t bytes= snprintf(buf, sizeof(buf),
5457
"table_id: %lu%s", m_table_id, flagstr);
5458
protocol->store(buf, bytes, &my_charset_bin);
5462
/**************************************************************************
5463
Table_map_log_event member functions and support functions
5464
**************************************************************************/
5467
@page How replication of field metadata works.
5469
When a table map is created, the master first calls
5470
Table_map_log_event::save_field_metadata() which calculates how many
5471
values will be in the field metadata. Only those fields that require the
5472
extra data are added. The method also loops through all of the fields in
5473
the table calling the method Field::save_field_metadata() which returns the
5474
values for the field that will be saved in the metadata and replicated to
5475
the slave. Once all fields have been processed, the table map is written to
5476
the binlog adding the size of the field metadata and the field metadata to
5477
the end of the body of the table map.
5479
When a table map is read on the slave, the field metadata is read from the
5480
table map and passed to the table_def class constructor which saves the
5481
field metadata from the table map into an array based on the type of the
5482
field. Field metadata values not present (those fields that do not use extra
5483
data) in the table map are initialized as zero (0). The array size is the
5484
same as the columns for the table on the slave.
5486
Additionally, values saved for field metadata on the master are saved as a
5487
string of bytes (unsigned char) in the binlog. A field may require 1 or more bytes
5488
to store the information. In cases where values require multiple bytes
5489
(e.g. values > 255), the endian-safe methods are used to properly encode
5490
the values on the master and decode them on the slave. When the field
5491
metadata values are captured on the slave, they are stored in an array of
5492
type uint16_t. This allows the least number of casts to prevent casting bugs
5493
when the field metadata is used in comparisons of field attributes. When
5494
the field metadata is used for calculating addresses in pointer math, the
5495
type used is uint32_t.
5499
Save the field metadata based on the real_type of the field.
5500
The metadata saved depends on the type of the field. Some fields
5501
store a single byte for pack_length() while others store two bytes
5502
for field_length (max length).
5507
We may want to consider changing the encoding of the information.
5508
Currently, the code attempts to minimize the number of bytes written to
5509
the tablemap. There are at least two other alternatives; 1) using
5510
net_store_length() to store the data allowing it to choose the number of
5511
bytes that are appropriate thereby making the code much easier to
5512
maintain (only 1 place to change the encoding), or 2) use a fixed number
5513
of bytes for each field. The problem with option 1 is that net_store_length()
5514
will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
5515
for fields like CHAR which can be no larger than 255 characters, the method
5516
will use 3 bytes when the value is > 250. Further, every value that is
5517
encoded using 2 parts (e.g., pack_length, field_length) will be numerically
5518
> 250 therefore will use 3 bytes for eah value. The problem with option 2
5519
is less wasteful for space but does waste 1 byte for every field that does
5522
int Table_map_log_event::save_field_metadata()
5525
for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
5526
index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
5531
Constructor used to build an event for writing to the binary log.
5532
Mats says tbl->s lives longer than this event so it's ok to copy pointers
5533
(tbl->s->db etc) and not pointer content.
5535
Table_map_log_event::Table_map_log_event(THD *thd, Table *tbl, ulong tid,
5536
bool is_transactional __attribute__((unused)),
5538
: Log_event(thd, 0, true),
5540
m_dbnam(tbl->s->db.str),
5541
m_dblen(m_dbnam ? tbl->s->db.length : 0),
5542
m_tblnam(tbl->s->table_name.str),
5543
m_tbllen(tbl->s->table_name.length),
5544
m_colcnt(tbl->s->fields),
5549
m_field_metadata(0),
5550
m_field_metadata_size(0),
5554
assert(m_table_id != UINT32_MAX);
5556
In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
5557
table.cc / alloc_table_share():
5558
Use the fact the key is db/0/table_name/0
5559
As we rely on this let's assert it.
5561
assert((tbl->s->db.str == 0) ||
5562
(tbl->s->db.str[tbl->s->db.length] == 0));
5563
assert(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
5566
m_data_size= TABLE_MAP_HEADER_LEN;
5567
m_data_size+= m_dblen + 2; // Include length and terminating \0
5568
m_data_size+= m_tbllen + 2; // Include length and terminating \0
5569
m_data_size+= 1 + m_colcnt; // COLCNT and column types
5571
/* If malloc fails, caught in is_valid() */
5572
if ((m_memory= (unsigned char*) my_malloc(m_colcnt, MYF(MY_WME))))
5574
m_coltype= reinterpret_cast<unsigned char*>(m_memory);
5575
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
5576
m_coltype[i]= m_table->field[i]->type();
5580
Calculate a bitmap for the results of maybe_null() for all columns.
5581
The bitmap is used to determine when there is a column from the master
5582
that is not on the slave and is null and thus not in the row data during
5585
uint32_t num_null_bytes= (m_table->s->fields + 7) / 8;
5586
m_data_size+= num_null_bytes;
5587
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
5588
&m_null_bits, num_null_bytes,
5589
&m_field_metadata, (m_colcnt * 2),
5592
memset(m_field_metadata, 0, (m_colcnt * 2));
5595
Create an array for the field metadata and store it.
5597
m_field_metadata_size= save_field_metadata();
5598
assert(m_field_metadata_size <= (m_colcnt * 2));
5601
Now set the size of the data to the size of the field metadata array
5602
plus one or two bytes for number of elements in the field metadata array.
5604
if (m_field_metadata_size > 255)
5605
m_data_size+= m_field_metadata_size + 2;
5607
m_data_size+= m_field_metadata_size + 1;
5609
memset(m_null_bits, 0, num_null_bytes);
5610
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
5611
if (m_table->field[i]->maybe_null())
5612
m_null_bits[(i / 8)]+= 1 << (i % 8);
5618
Constructor used by slave to read the event from the binary log.
5620
Table_map_log_event::Table_map_log_event(const char *buf, uint32_t event_len,
5621
const Format_description_log_event
5624
: Log_event(buf, description_event),
5626
m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
5627
m_colcnt(0), m_coltype(0),
5628
m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
5629
m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
5630
m_null_bits(0), m_meta_memory(NULL)
5632
unsigned int bytes_read= 0;
5634
uint8_t common_header_len= description_event->common_header_len;
5635
uint8_t post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
5637
/* Read the post-header */
5638
const char *post_start= buf + common_header_len;
5640
post_start+= TM_MAPID_OFFSET;
5641
if (post_header_len == 6)
5643
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
5644
m_table_id= uint4korr(post_start);
5649
assert(post_header_len == TABLE_MAP_HEADER_LEN);
5650
m_table_id= (ulong) uint6korr(post_start);
5651
post_start+= TM_FLAGS_OFFSET;
5654
assert(m_table_id != UINT32_MAX);
5656
m_flags= uint2korr(post_start);
5658
/* Read the variable part of the event */
5659
const char *const vpart= buf + common_header_len + post_header_len;
5661
/* Extract the length of the various parts from the buffer */
5662
unsigned char const *const ptr_dblen= (unsigned char const*)vpart + 0;
5663
m_dblen= *(unsigned char*) ptr_dblen;
5665
/* Length of database name + counter + terminating null */
5666
unsigned char const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
5667
m_tbllen= *(unsigned char*) ptr_tbllen;
5669
/* Length of table name + counter + terminating null */
5670
unsigned char const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
5671
unsigned char *ptr_after_colcnt= (unsigned char*) ptr_colcnt;
5672
m_colcnt= net_field_length(&ptr_after_colcnt);
5674
/* Allocate mem for all fields in one go. If fails, caught in is_valid() */
5675
m_memory= (unsigned char*) my_multi_malloc(MYF(MY_WME),
5676
&m_dbnam, (uint) m_dblen + 1,
5677
&m_tblnam, (uint) m_tbllen + 1,
5678
&m_coltype, (uint) m_colcnt,
5683
/* Copy the different parts into their memory */
5684
strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
5685
strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
5686
memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
5688
ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
5689
bytes_read= ptr_after_colcnt - (unsigned char *)buf;
5690
if (bytes_read < event_len)
5692
m_field_metadata_size= net_field_length(&ptr_after_colcnt);
5693
assert(m_field_metadata_size <= (m_colcnt * 2));
5694
uint32_t num_null_bytes= (m_colcnt + 7) / 8;
5695
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
5696
&m_null_bits, num_null_bytes,
5697
&m_field_metadata, m_field_metadata_size,
5699
memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
5700
ptr_after_colcnt= (unsigned char*)ptr_after_colcnt + m_field_metadata_size;
5701
memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
5708
Table_map_log_event::~Table_map_log_event()
5710
free(m_meta_memory);
5715
Return value is an error code, one of:
5717
-1 Failure to open table [from open_tables()]
5719
1 No room for more tables [from set_table()]
5720
2 Out of memory [from set_table()]
5721
3 Wrong table definition
5722
4 Daisy-chaining RBR with SBR not possible
5725
int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
5727
RPL_TableList *table_list;
5728
char *db_mem, *tname_mem;
5731
assert(rli->sql_thd == thd);
5733
/* Step the query id to mark what columns that are actually used. */
5734
pthread_mutex_lock(&LOCK_thread_count);
5735
thd->query_id= next_query_id();
5736
pthread_mutex_unlock(&LOCK_thread_count);
5738
if (!(memory= my_multi_malloc(MYF(MY_WME),
5739
&table_list, (uint) sizeof(RPL_TableList),
5740
&db_mem, (uint) NAME_LEN + 1,
5741
&tname_mem, (uint) NAME_LEN + 1,
5743
return(HA_ERR_OUT_OF_MEM);
5745
memset(table_list, 0, sizeof(*table_list));
5746
table_list->db = db_mem;
5747
table_list->alias= table_list->table_name = tname_mem;
5748
table_list->lock_type= TL_WRITE;
5749
table_list->next_global= table_list->next_local= 0;
5750
table_list->table_id= m_table_id;
5751
table_list->updating= 1;
5752
my_stpcpy(table_list->db, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
5753
my_stpcpy(table_list->table_name, m_tblnam);
5757
if (!rpl_filter->db_ok(table_list->db) ||
5758
(rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list)))
5765
open_tables() reads the contents of thd->lex, so they must be
5766
initialized, so we should call lex_start(); to be even safer, we
5767
call mysql_init_query() which does a more complete set of inits.
5770
mysql_reset_thd_for_next_command(thd);
5772
Check if the slave is set to use SBR. If so, it should switch
5773
to using RBR until the end of the "statement", i.e., next
5774
STMT_END_F or next error.
5776
if (!thd->current_stmt_binlog_row_based &&
5777
mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG))
5779
thd->set_current_stmt_binlog_row_based();
5783
Open the table if it is not already open and add the table to
5784
table map. Note that for any table that should not be
5785
replicated, a filter is needed.
5787
The creation of a new TableList is used to up-cast the
5788
table_list consisting of RPL_TableList items. This will work
5789
since the only case where the argument to open_tables() is
5790
changed, is when thd->lex->query_tables == table_list, i.e.,
5791
when the statement requires prelocking. Since this is not
5792
executed when a statement is executed, this case will not occur.
5793
As a precaution, an assertion is added to ensure that the bad
5796
Either way, the memory in the list is *never* released
5797
internally in the open_tables() function, hence we take a copy
5798
of the pointer to make sure that it's not lost.
5801
assert(thd->lex->query_tables != table_list);
5802
TableList *tmp_table_list= table_list;
5803
if ((error= open_tables(thd, &tmp_table_list, &count, 0)))
5805
if (thd->is_slave_error || thd->is_fatal_error)
5808
Error reporting borrowed from Query_log_event with many excessive
5809
simplifications (we don't honour --slave-skip-errors)
5811
uint32_t actual_error= thd->main_da.sql_errno();
5812
rli->report(ERROR_LEVEL, actual_error,
5813
_("Error '%s' on opening table `%s`.`%s`"),
5815
? thd->main_da.message()
5816
: _("unexpected success or fatal error")),
5817
table_list->db, table_list->table_name);
5818
thd->is_slave_error= 1;
5823
m_table= table_list->table;
5826
This will fail later otherwise, the 'in_use' field should be
5827
set to the current thread.
5829
assert(m_table->in_use);
5832
Use placement new to construct the table_def instance in the
5833
memory allocated for it inside table_list.
5835
The memory allocated by the table_def structure (i.e., not the
5836
memory allocated *for* the table_def structure) is released
5837
inside Relay_log_info::clear_tables_to_lock() by calling the
5838
table_def destructor explicitly.
5840
new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt,
5841
m_field_metadata, m_field_metadata_size, m_null_bits);
5842
table_list->m_tabledef_valid= true;
5845
We record in the slave's information that the table should be
5846
locked by linking the table into the list of tables to lock.
5848
table_list->next_global= table_list->next_local= rli->tables_to_lock;
5849
const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
5850
const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
5851
/* 'memory' is freed in clear_tables_to_lock */
5861
Log_event::enum_skip_reason
5862
Table_map_log_event::do_shall_skip(Relay_log_info *rli)
5865
If the slave skip counter is 1, then we should not start executing
5868
return continue_group(rli);
5871
int Table_map_log_event::do_update_pos(Relay_log_info *rli)
5873
rli->inc_event_relay_log_pos();
5878
bool Table_map_log_event::write_data_header(IO_CACHE *file)
5880
assert(m_table_id != UINT32_MAX);
5881
unsigned char buf[TABLE_MAP_HEADER_LEN];
5882
int6store(buf + TM_MAPID_OFFSET, (uint64_t)m_table_id);
5883
int2store(buf + TM_FLAGS_OFFSET, m_flags);
5884
return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
5887
bool Table_map_log_event::write_data_body(IO_CACHE *file)
5889
assert(m_dbnam != NULL);
5890
assert(m_tblnam != NULL);
5891
/* We use only one byte per length for storage in event: */
5892
assert(m_dblen < 128);
5893
assert(m_tbllen < 128);
5895
unsigned char const dbuf[]= { (unsigned char) m_dblen };
5896
unsigned char const tbuf[]= { (unsigned char) m_tbllen };
5898
unsigned char cbuf[sizeof(m_colcnt)];
5899
unsigned char *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
5900
assert(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
5903
Store the size of the field metadata.
5905
unsigned char mbuf[sizeof(m_field_metadata_size)];
5906
unsigned char *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
5908
return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
5909
my_b_safe_write(file, (const unsigned char*)m_dbnam, m_dblen+1) ||
5910
my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
5911
my_b_safe_write(file, (const unsigned char*)m_tblnam, m_tbllen+1) ||
5912
my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
5913
my_b_safe_write(file, m_coltype, m_colcnt) ||
5914
my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
5915
my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
5916
my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
5921
Print some useful information for the SHOW BINARY LOG information
5925
void Table_map_log_event::pack_info(Protocol *protocol)
5928
size_t bytes= snprintf(buf, sizeof(buf),
5929
"table_id: %lu (%s.%s)",
5930
m_table_id, m_dbnam, m_tblnam);
5931
protocol->store(buf, bytes, &my_charset_bin);
5935
/**************************************************************************
5936
Write_rows_log_event member functions
5937
**************************************************************************/
5940
Constructor used to build an event for writing to the binary log.
5942
Write_rows_log_event::Write_rows_log_event(THD *thd_arg, Table *tbl_arg,
5944
bool is_transactional)
5945
: Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
5950
Constructor used by slave to read the event from the binary log.
5952
Write_rows_log_event::Write_rows_log_event(const char *buf, uint32_t event_len,
5953
const Format_description_log_event
5955
: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
5960
Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
5965
todo: to introduce a property for the event (handler?) which forces
5966
applying the event in the replace (idempotent) fashion.
5968
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5971
We are using REPLACE semantics and not INSERT IGNORE semantics
5972
when writing rows, that is: new rows replace old rows. We need to
5973
inform the storage engine that it should use this behaviour.
5976
/* Tell the storage engine that we are using REPLACE semantics. */
5977
thd->lex->duplicates= DUP_REPLACE;
5980
Pretend we're executing a REPLACE command: this is needed for
5981
InnoDB since it is not (properly) checking the
5982
lex->duplicates flag.
5984
thd->lex->sql_command= SQLCOM_REPLACE;
5986
Do not raise the error flag in case of hitting to an unique attribute
5988
m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
5991
m_table->file->ha_start_bulk_insert(0);
5993
We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
5994
any TIMESTAMP column with data from the row but instead will use
5995
the event's current time.
5996
As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
5997
columns, we know that all TIMESTAMP columns on slave will receive explicit
5998
data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
5999
When we allow a table without TIMESTAMP to be replicated to a table having
6000
more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
6001
column to be replicated into a BIGINT column and the slave's table has a
6002
TIMESTAMP column, then the slave's TIMESTAMP column will take its value
6003
from set_time() which we called earlier (consistent with SBR). And then in
6004
some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
6005
analyze if explicit data is provided for slave's TIMESTAMP columns).
6007
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
6013
Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6017
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
6019
m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
6020
m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
6022
resetting the extra with
6023
table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
6025
explanation: file->reset() performs this duty
6026
ultimately. Still todo: fix
6029
if ((local_error= m_table->file->ha_end_bulk_insert()))
6031
m_table->file->print_error(local_error, MYF(0));
6033
return error? error : local_error;
6038
Check if there are more UNIQUE keys after the given key.
6041
last_uniq_key(Table *table, uint32_t keyno)
6043
while (++keyno < table->s->keys)
6044
if (table->key_info[keyno].flags & HA_NOSAME)
6050
Check if an error is a duplicate key error.
6052
This function is used to check if an error code is one of the
6053
duplicate key error, i.e., and error code for which it is sensible
6054
to do a <code>get_dup_key()</code> to retrieve the duplicate key.
6056
@param errcode The error code to check.
6058
@return <code>true</code> if the error code is such that
6059
<code>get_dup_key()</code> will return true, <code>false</code>
6063
is_duplicate_key_error(int errcode)
6067
case HA_ERR_FOUND_DUPP_KEY:
6068
case HA_ERR_FOUND_DUPP_UNIQUE:
6075
Write the current row into event's table.
6077
The row is located in the row buffer, pointed by @c m_curr_row member.
6078
Number of columns of the row is stored in @c m_width member (it can be
6079
different from the number of columns in the table to which we insert).
6080
Bitmap @c m_cols indicates which columns are present in the row. It is assumed
6081
that event's table is already open and pointed by @c m_table.
6083
If the same record already exists in the table it can be either overwritten
6084
or an error is reported depending on the value of @c overwrite flag
6085
(error reporting not yet implemented). Note that the matching record can be
6086
different from the row we insert if we use primary keys to identify records in
6089
The row to be inserted can contain values only for selected columns. The
6090
missing columns are filled with default values using @c prepare_record()
6091
function. If a matching record is found in the table and @c overwritte is
6092
true, the missing columns are taken from it.
6094
@param rli Relay log info (needed for row unpacking).
6096
Shall we overwrite if the row already exists or signal
6097
error (currently ignored).
6099
@returns Error code on failure, 0 on success.
6101
This method, if successful, sets @c m_curr_row_end pointer to point at the
6102
next row in the rows buffer. This is done when unpacking the row to be
6105
@note If a matching record is found, it is either updated using
6106
@c ha_update_row() or first deleted and then new record written.
6110
Rows_log_event::write_row(const Relay_log_info *const rli,
6111
const bool overwrite)
6113
assert(m_table != NULL && thd != NULL);
6115
Table *table= m_table; // pointer to event's table
6118
auto_afree_ptr<char> key(NULL);
6120
/* fill table->record[0] with default values */
6123
We only check if the columns have default values for non-NDB
6124
engines, for NDB we ignore the check since updates are sent as
6125
writes, causing errors when trying to prepare the record.
6127
TODO[ndb]: Elimiate this hard-coded dependency on NDB. Ideally,
6128
the engine should be able to set a flag that it want the default
6129
values filled in and one flag to handle the case that the default
6130
values should be checked. Maybe these two flags can be combined.
6132
if ((error= prepare_record(table, &m_cols, m_width, true)))
6135
/* unpack row into table->record[0] */
6136
error= unpack_current_row(rli, &m_cols);
6138
// Temporary fix to find out why it fails [/Matz]
6139
memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
6142
Try to write record. If a corresponding record already exists in the table,
6143
we try to change it using ha_update_row() if possible. Otherwise we delete
6144
it and repeat the whole process again.
6146
TODO: Add safety measures against infinite looping.
6149
while ((error= table->file->ha_write_row(table->record[0])))
6151
if (error == HA_ERR_LOCK_DEADLOCK ||
6152
error == HA_ERR_LOCK_WAIT_TIMEOUT ||
6153
(keynum= table->file->get_dup_key(error)) < 0 ||
6157
Deadlock, waiting for lock or just an error from the handler
6158
such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
6159
Retrieval of the duplicate key number may fail
6160
- either because the error was not "duplicate key" error
6161
- or because the information which key is not available
6163
table->file->print_error(error, MYF(0));
6167
We need to retrieve the old row into record[1] to be able to
6168
either update or delete the offending record. We either:
6170
- use rnd_pos() with a row-id (available as dupp_row) to the
6171
offending row, if that is possible (MyISAM and Blackhole), or else
6173
- use index_read_idx() with the key that is duplicated, to
6174
retrieve the offending row.
6176
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
6178
if (table->file->inited && (error= table->file->ha_index_end()))
6180
if ((error= table->file->ha_rnd_init(false)))
6183
error= table->file->rnd_pos(table->record[1], table->file->dup_ref);
6184
table->file->ha_rnd_end();
6187
table->file->print_error(error, MYF(0));
6193
if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
6198
if (key.get() == NULL)
6200
key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
6201
if (key.get() == NULL)
6207
key_copy((unsigned char*)key.get(), table->record[0], table->key_info + keynum,
6209
error= table->file->index_read_idx_map(table->record[1], keynum,
6210
(const unsigned char*)key.get(),
6215
table->file->print_error(error, MYF(0));
6221
Now, record[1] should contain the offending row. That
6222
will enable us to update it or, alternatively, delete it (so
6223
that we can insert the new row afterwards).
6227
If row is incomplete we will use the record found to fill
6230
if (!get_flags(COMPLETE_ROWS_F))
6232
restore_record(table,record[1]);
6233
error= unpack_current_row(rli, &m_cols);
6237
REPLACE is defined as either INSERT or DELETE + INSERT. If
6238
possible, we can replace it with an UPDATE, but that will not
6239
work on InnoDB if FOREIGN KEY checks are necessary.
6241
I (Matz) am not sure of the reason for the last_uniq_key()
6242
check as, but I'm guessing that it's something along the
6245
Suppose that we got the duplicate key to be a key that is not
6246
the last unique key for the table and we perform an update:
6247
then there might be another key for which the unique check will
6248
fail, so we're better off just deleting the row and inserting
6251
if (last_uniq_key(table, keynum) &&
6252
!table->file->referenced_by_foreign_key())
6254
error=table->file->ha_update_row(table->record[1],
6258
case HA_ERR_RECORD_IS_THE_SAME:
6265
table->file->print_error(error, MYF(0));
6272
if ((error= table->file->ha_delete_row(table->record[1])))
6274
table->file->print_error(error, MYF(0));
6277
/* Will retry ha_write_row() with the offending row removed. */
6286
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6288
assert(m_table != NULL);
6290
write_row(rli, /* if 1 then overwrite */
6291
bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
6293
if (error && !thd->is_error())
6296
my_error(ER_UNKNOWN_ERROR, MYF(0));
6303
/**************************************************************************
6304
Delete_rows_log_event member functions
6305
**************************************************************************/
6308
Compares table->record[0] and table->record[1]
6310
Returns TRUE if different.
6312
static bool record_compare(Table *table)
6315
Need to set the X bit and the filler bits in both records since
6316
there are engines that do not set it correctly.
6318
In addition, since MyISAM checks that one hasn't tampered with the
6319
record, it is necessary to restore the old bytes into the record
6320
after doing the comparison.
6322
TODO[record format ndb]: Remove it once NDB returns correct
6323
records. Check that the other engines also return correct records.
6326
unsigned char saved_x[2], saved_filler[2];
6328
if (table->s->null_bytes > 0)
6330
for (int i = 0 ; i < 2 ; ++i)
6332
saved_x[i]= table->record[i][0];
6333
saved_filler[i]= table->record[i][table->s->null_bytes - 1];
6334
table->record[i][0]|= 1U;
6335
table->record[i][table->s->null_bytes - 1]|=
6336
256U - (1U << table->s->last_null_bit_pos);
6340
if (table->s->blob_fields + table->s->varchar_fields == 0)
6342
result= cmp_record(table,record[1]);
6343
goto record_compare_exit;
6346
/* Compare null bits */
6347
if (memcmp(table->null_flags,
6348
table->null_flags+table->s->rec_buff_length,
6349
table->s->null_bytes))
6351
result= true; // Diff in NULL value
6352
goto record_compare_exit;
6355
/* Compare updated fields */
6356
for (Field **ptr=table->field ; *ptr ; ptr++)
6358
if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
6361
goto record_compare_exit;
6365
record_compare_exit:
6367
Restore the saved bytes.
6369
TODO[record format ndb]: Remove this code once NDB returns the
6370
correct record format.
6372
if (table->s->null_bytes > 0)
6374
for (int i = 0 ; i < 2 ; ++i)
6376
table->record[i][0]= saved_x[i];
6377
table->record[i][table->s->null_bytes - 1]= saved_filler[i];
6385
Locate the current row in event's table.
6387
The current row is pointed by @c m_curr_row. Member @c m_width tells how many
6388
columns are there in the row (this can be differnet from the number of columns
6389
in the table). It is assumed that event's table is already open and pointed
6392
If a corresponding record is found in the table it is stored in
6393
@c m_table->record[0]. Note that when record is located based on a primary
6394
key, it is possible that the record found differs from the row being located.
6396
If no key is specified or table does not have keys, a table scan is used to
6397
find the row. In that case the row should be complete and contain values for
6398
all columns. However, it can still be shorter than the table, i.e. the table
6399
can contain extra columns not present in the row. It is also possible that
6400
the table has fewer columns than the row being located.
6402
@returns Error code on failure, 0 on success.
6404
@post In case of success @c m_table->record[0] contains the record found.
6405
Also, the internal "cursor" of the table is positioned at the record found.
6407
@note If the engine allows random access of the records, a combination of
6408
@c position() and @c rnd_pos() will be used.
6411
int Rows_log_event::find_row(const Relay_log_info *rli)
6413
assert(m_table && m_table->in_use != NULL);
6415
Table *table= m_table;
6418
/* unpack row - missing fields get default values */
6419
prepare_record(table, &m_cols, m_width, false/* don't check errors */);
6420
error= unpack_current_row(rli, &m_cols);
6422
// Temporary fix to find out why it fails [/Matz]
6423
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
6425
if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
6426
table->s->primary_key < MAX_KEY)
6429
Use a more efficient method to fetch the record given by
6430
table->record[0] if the engine allows it. We first compute a
6431
row reference using the position() member function (it will be
6432
stored in table->file->ref) and the use rnd_pos() to position
6433
the "cursor" (i.e., record[0] in this case) at the correct row.
6435
TODO: Add a check that the correct record has been fetched by
6436
comparing with the original record. Take into account that the
6437
record on the master and slave can be of different
6438
length. Something along these lines should work:
6440
ADD>>> store_record(table,record[1]);
6441
int error= table->file->rnd_pos(table->record[0], table->file->ref);
6442
ADD>>> assert(memcmp(table->record[1], table->record[0],
6443
table->s->reclength) == 0);
6446
int error= table->file->rnd_pos_by_record(table->record[0]);
6447
table->file->ha_rnd_end();
6450
table->file->print_error(error, MYF(0));
6455
// We can't use position() - try other methods.
6458
Save copy of the record in table->record[1]. It might be needed
6459
later if linear search is used to find exact match.
6461
store_record(table,record[1]);
6463
if (table->s->keys > 0)
6465
/* We have a key: search the table using the index */
6466
if (!table->file->inited && (error= table->file->ha_index_init(0, false)))
6468
table->file->print_error(error, MYF(0));
6472
/* Fill key data for the row */
6475
key_copy(m_key, table->record[0], table->key_info, 0);
6478
We need to set the null bytes to ensure that the filler bit are
6479
all set when returning. There are storage engines that just set
6480
the necessary bits on the bytes and don't set the filler bits
6483
my_ptrdiff_t const pos=
6484
table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
6485
table->record[0][pos]= 0xFF;
6487
if ((error= table->file->index_read_map(table->record[0], m_key,
6489
HA_READ_KEY_EXACT)))
6491
table->file->print_error(error, MYF(0));
6492
table->file->ha_index_end();
6497
Below is a minor "optimization". If the key (i.e., key number
6498
0) has the HA_NOSAME flag set, we know that we have found the
6499
correct record (since there can be no duplicates); otherwise, we
6500
have to compare the record with the one found to see if it is
6503
CAVEAT! This behaviour is essential for the replication of,
6504
e.g., the mysql.proc table since the correct record *shall* be
6505
found using the primary key *only*. There shall be no
6506
comparison of non-PK columns to decide if the correct record is
6507
found. I can see no scenario where it would be incorrect to
6508
chose the row to change only using a PK or an UNNI.
6510
if (table->key_info->flags & HA_NOSAME)
6512
table->file->ha_index_end();
6517
In case key is not unique, we still have to iterate over records found
6518
and find the one which is identical to the row given. A copy of the
6519
record we are looking for is stored in record[1].
6521
while (record_compare(table))
6524
We need to set the null bytes to ensure that the filler bit
6525
are all set when returning. There are storage engines that
6526
just set the necessary bits on the bytes and don't set the
6527
filler bits correctly.
6529
TODO[record format ndb]: Remove this code once NDB returns the
6530
correct record format.
6532
if (table->s->null_bytes > 0)
6534
table->record[0][table->s->null_bytes - 1]|=
6535
256U - (1U << table->s->last_null_bit_pos);
6538
if ((error= table->file->index_next(table->record[0])))
6540
table->file->print_error(error, MYF(0));
6541
table->file->ha_index_end();
6547
Have to restart the scan to be able to fetch the next row.
6549
table->file->ha_index_end();
6553
int restart_count= 0; // Number of times scanning has restarted from top
6555
/* We don't have a key: search the table using rnd_next() */
6556
if ((error= table->file->ha_rnd_init(1)))
6558
table->file->print_error(error, MYF(0));
6562
/* Continue until we find the right record or have made a full loop */
6565
error= table->file->rnd_next(table->record[0]);
6570
case HA_ERR_RECORD_DELETED:
6573
case HA_ERR_END_OF_FILE:
6574
if (++restart_count < 2)
6575
table->file->ha_rnd_init(1);
6579
table->file->print_error(error, MYF(0));
6580
table->file->ha_rnd_end();
6584
while (restart_count < 2 && record_compare(table));
6587
Note: above record_compare will take into accout all record fields
6588
which might be incorrect in case a partial row was given in the event
6590
table->file->ha_rnd_end();
6592
assert(error == HA_ERR_END_OF_FILE || error == HA_ERR_RECORD_DELETED || error == 0);
6596
table->default_column_bitmaps();
6599
table->default_column_bitmaps();
6605
Constructor used to build an event for writing to the binary log.
6608
Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, Table *tbl_arg,
6610
bool is_transactional)
6611
: Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
6616
Constructor used by slave to read the event from the binary log.
6618
Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint32_t event_len,
6619
const Format_description_log_event
6621
: Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
6627
Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6629
if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
6630
m_table->s->primary_key < MAX_KEY)
6633
We don't need to allocate any memory for m_key since it is not used.
6638
if (m_table->s->keys > 0)
6640
// Allocate buffer for key searches
6641
m_key= (unsigned char*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
6643
return HA_ERR_OUT_OF_MEM;
6650
Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6653
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6654
m_table->file->ha_index_or_rnd_end();
6661
int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6664
assert(m_table != NULL);
6666
if (!(error= find_row(rli)))
6669
Delete the record found, located in record[0]
6671
error= m_table->file->ha_delete_row(m_table->record[0]);
6677
/**************************************************************************
6678
Update_rows_log_event member functions
6679
**************************************************************************/
6682
Constructor used to build an event for writing to the binary log.
6684
Update_rows_log_event::Update_rows_log_event(THD *thd_arg, Table *tbl_arg,
6686
bool is_transactional)
6687
: Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
6689
init(tbl_arg->write_set);
6692
void Update_rows_log_event::init(MY_BITMAP const *cols)
6694
/* if bitmap_init fails, caught in is_valid() */
6695
if (likely(!bitmap_init(&m_cols_ai,
6696
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
6700
/* Cols can be zero if this is a dummy binrows event */
6701
if (likely(cols != NULL))
6703
memcpy(m_cols_ai.bitmap, cols->bitmap, no_bytes_in_map(cols));
6704
create_last_word_mask(&m_cols_ai);
6710
Update_rows_log_event::~Update_rows_log_event()
6712
if (m_cols_ai.bitmap == m_bitbuf_ai) // no my_malloc happened
6713
m_cols_ai.bitmap= 0; // so no free in bitmap_free
6714
bitmap_free(&m_cols_ai); // To pair with bitmap_init().
6719
Constructor used by slave to read the event from the binary log.
6721
Update_rows_log_event::Update_rows_log_event(const char *buf, uint32_t event_len,
6723
Format_description_log_event
6725
: Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
6731
Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6733
if (m_table->s->keys > 0)
6735
// Allocate buffer for key searches
6736
m_key= (unsigned char*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
6738
return HA_ERR_OUT_OF_MEM;
6741
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
6747
Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6750
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6751
m_table->file->ha_index_or_rnd_end();
6752
free(m_key); // Free for multi_malloc
6759
Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6761
assert(m_table != NULL);
6763
int error= find_row(rli);
6767
We need to read the second image in the event of error to be
6768
able to skip to the next pair of updates
6770
m_curr_row= m_curr_row_end;
6771
unpack_current_row(rli, &m_cols_ai);
6776
This is the situation after locating BI:
6778
===|=== before image ====|=== after image ===|===
6780
m_curr_row m_curr_row_end
6782
BI found in the table is stored in record[0]. We copy it to record[1]
6783
and unpack AI to record[0].
6786
store_record(m_table,record[1]);
6788
m_curr_row= m_curr_row_end;
6789
error= unpack_current_row(rli, &m_cols_ai); // this also updates m_curr_row_end
6792
Now we have the right row to update. The old row (the one we're
6793
looking for) is in record[1] and the new row is in record[0].
6796
// Temporary fix to find out why it fails [/Matz]
6797
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
6798
memcpy(m_table->write_set->bitmap, m_cols_ai.bitmap, (m_table->write_set->n_bits + 7) / 8);
6800
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
6801
if (error == HA_ERR_RECORD_IS_THE_SAME)
6808
Incident_log_event::Incident_log_event(const char *buf, uint32_t event_len,
6809
const Format_description_log_event *descr_event)
6810
: Log_event(buf, descr_event)
6812
uint8_t const common_header_len=
6813
descr_event->common_header_len;
6814
uint8_t const post_header_len=
6815
descr_event->post_header_len[INCIDENT_EVENT-1];
6817
m_incident= static_cast<Incident>(uint2korr(buf + common_header_len));
6818
char const *ptr= buf + common_header_len + post_header_len;
6819
char const *const str_end= buf + event_len;
6820
uint8_t len= 0; // Assignment to keep compiler happy
6821
const char *str= NULL; // Assignment to keep compiler happy
6822
read_str(&ptr, str_end, &str, &len);
6823
m_message.str= const_cast<char*>(str);
6824
m_message.length= len;
6829
Incident_log_event::~Incident_log_event()
6835
Incident_log_event::description() const
6837
static const char *const description[]= {
6838
"NOTHING", // Not used
6842
assert(0 <= m_incident);
6843
assert((size_t) m_incident <= sizeof(description)/sizeof(*description));
6845
return description[m_incident];
6849
void Incident_log_event::pack_info(Protocol *protocol)
6853
if (m_message.length > 0)
6854
bytes= snprintf(buf, sizeof(buf), "#%d (%s)",
6855
m_incident, description());
6857
bytes= snprintf(buf, sizeof(buf), "#%d (%s): %s",
6858
m_incident, description(), m_message.str);
6859
protocol->store(buf, bytes, &my_charset_bin);
6864
Incident_log_event::do_apply_event(Relay_log_info const *rli)
6866
rli->report(ERROR_LEVEL, ER_SLAVE_INCIDENT,
6867
ER(ER_SLAVE_INCIDENT),
6869
m_message.length > 0 ? m_message.str : "<none>");
6875
Incident_log_event::write_data_header(IO_CACHE *file)
6877
unsigned char buf[sizeof(int16_t)];
6878
int2store(buf, (int16_t) m_incident);
6879
return(my_b_safe_write(file, buf, sizeof(buf)));
6883
Incident_log_event::write_data_body(IO_CACHE *file)
6885
return(write_str(file, m_message.str, m_message.length));
6888
Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint32_t event_len,
6889
const Format_description_log_event* description_event)
6890
:Log_event(buf, description_event)
6892
uint8_t header_size= description_event->common_header_len;
6893
ident_len = event_len - header_size;
6894
set_if_smaller(ident_len,FN_REFLEN-1);
6895
log_ident= buf + header_size;